Hi Michalis,
can you double check that you define the types of the Tuple2 output? 


According to the error 
"Return type 
PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0: 
GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]>"it seems 
like the Tuple2 does not have specific types but they are java.lang.Objects. 
Maybe that could be the problem. 

Also to give some context, if you see in the FlinkMaterializedGroupByOperator 
code there are some utility functions we use to map the Wayang UDFs to the 
Flink (or Spark) UDFs. For example, the line:
final KeySelector<Type, KeyType> keyExtractor =        
flinkExecutor.getCompiler().compileKeySelector(this.getKeyDescriptor());
converts the Wayang UDF keyDescriptor to a Flink UDF KeySelector. Maybe if you 
check the code of this class you could spot the problem?
Best
--
Zoi

    Στις Τρίτη 25 Απριλίου 2023 στις 01:30:20 μ.μ. CEST, ο χρήστης Michalis 
Vargiamis <[email protected]> έγραψε:  
 
 Hello!


I've been working with the missing operator tests for Flink. I've 
successfully done the SortOperator and the UnionAllOperator by seeing 
the respective Spark operator tests and modifying RddChannel to 
DataSetChannel.


I'm having trouble with the tests for other operators though, for 
example the FlinkMaterializedGroupByOperator. I tried starting with 
SparkMaterializedGroupByOperatorTest and doing the same RddChannel to 
DataSetChannel modifications as before, but I get the following error:


[ERROR] 
org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution
 
Time elapsed: 1.911 s  <<< ERROR!
org.apache.flink.api.common.InvalidProgramException: Return type 
PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0: 
GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]> 
of KeySelector class 
org.apache.wayang.flink.compiler.KeySelectorFunction is not a valid key type
         at 
org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution(FlinkMaterializedGroupByOperatorTest.java:50)


Digging into the operator code a bit more, the error happens at

dataSetInput.groupBy(keyExtractor);


Any ideas on what should be changed?


Here is a permalink to the respective spark test 
[https://github.com/apache/incubator-wayang/blob/6aad4eea8c91a52f2a41e79424491e6c2c5206af/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/operators/SparkMaterializedGroupByOperatorTest.java]


Thank you,

Michalis Vargiamis

  

Reply via email to