Hi Michalis, 

first, I suggest to make a pull request with what you have that is working.
Regarding the error, I have not encountered this before. I think there should 
be a bug in the Flink operator implementation. By browsing the Flink tests we 
currently have, it seems that ReduceBy and Join also throw some errors. Can you 
check if the ReduceBy and the Join operator throw the same exception with the 
one you have?
If so, I suggest taking one of them and going to the Flink documentation of 
that operator. It may be that we are using an older API of Flink.
Best
--
Zoi

    Στις Τετάρτη 3 Μαΐου 2023 στις 02:43:44 μ.μ. CEST, ο χρήστης Michalis 
Vargiamis <[email protected]> έγραψε:  
 
 Hello!


Regarding the Flink operator tests, there is progress. First of all, I 
looked at the already implemented Spark operator tests, some of them 
were already implemented for Flink and some others did not exist for 
Flink (like the BernoulliSampleOperator). So what I have implemented so 
far are the UnionAllOperator and the SortOperator which went very 
smoothly, and also the MaterializedGroupByOperator for which I had to do 
some tweaks in the test code and also add the following line to the 
operator code

.returns(this.getOutputType().getDataUnitType().getTypeClass());


Now, regarding the CoGroupOperator, GlobalReduceOperator, 
MapPartitionsOperator, I get the following error for all of them, that 
has got me kind of stuck:


java.lang.IllegalArgumentException
         at 
org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.<init>(Unknown 
Source)
         at 
org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.<init>(Unknown 
Source)
         at 
org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.<init>(Unknown 
Source)
         at 
org.apache.flink.api.java.ClosureCleaner.getClassReader(ClosureCleaner.java:148)
         at 
org.apache.flink.api.java.ClosureCleaner.cleanThis0(ClosureCleaner.java:115)
         at 
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:75)
         at org.apache.flink.api.java.DataSet.clean(DataSet.java:186)
         at 
org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets$CoGroupOperatorSetsPredicate$CoGroupOperatorWithoutFunction.with(CoGroupOperator.java:622)
         at 
org.apache.wayang.flink.operators.FlinkCoGroupOperator.evaluate(FlinkCoGroupOperator.java:116)
         at 
org.apache.wayang.flink.operators.FlinkOperatorTestBase.evaluate(FlinkOperatorTestBase.java:75)
         at 
org.apache.wayang.flink.operators.FlinkCoGroupOperatorTest.testExecution(FlinkCoGroupOperatorTest.java:72)
         at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
         at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
         at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.base/java.lang.reflect.Method.invoke(Method.java:566)
         at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
         at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
         at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
         at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
         at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
         at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
         at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
         at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
         at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
         at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
         at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
         at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
         at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
         at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
         at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
         at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
         at 
org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43)
         at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
         at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
         at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
         at 
java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
         at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
         at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
         at 
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
         at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
         at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
         at 
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
         at 
org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:82)
         at 
org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:73)
         at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:248)
         at 
org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$5(DefaultLauncher.java:211)
         at 
org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:226)
         at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:199)
         at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:132)
         at 
org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188)
         at 
org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154)
         at 
org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:128)
         at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428)
         at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
         at 
org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562)
         at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548)


Any help or feedback would be appreciated!


Thanks,

Michalis


On 27-Apr-23 8:45 PM, Jorge Arnulfo Quiané Ruiz wrote:
> Hi Michalis,
>
> Sure! It was more of a heads-up than anything else.
> Please don’t hesitate to pose your questions here or to call for a meeting if 
> necessary :)
>
> Best,
> Jorge
>
>> On 27 Apr 2023, at 16.52, Michalis Vargiamis <[email protected]> 
>> wrote:
>>
>> Hi!
>>
>>
>> Thanks! Actually I've lost quite some time re configuring my setup. I 
>> currently have a windows laptop, so for the Linux I initially tried the WSL 
>> that windows provide but it turned out to make the whole 
>> development/debugging process quite inefficient, so i switched to vmware but 
>> then again I lost some time with some other stuff. Anyway, I'd have to ask 
>> for a bit more time on this one.
>>
>>
>> Thank you,
>>
>> Michalis
>>
>>
>> On 27-Apr-23 10:41 AM, Jorge Arnulfo Quiané Ruiz wrote:
>>> Hi Michalis,
>>>
>>> Please let us know if you need help :)
>>>
>>> Best,
>>> Jorge
>>>
>>>> On 25 Apr 2023, at 14.07, Zoi Kaoudi <[email protected]> wrote:
>>>>
>>>> Hi Michalis,
>>>> can you double check that you define the types of the Tuple2 output?
>>>>
>>>>
>>>> According to the error
>>>> "Return type
>>>> PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0:
>>>> GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]>"it 
>>>> seems like the Tuple2 does not have specific types but they are 
>>>> java.lang.Objects. Maybe that could be the problem.
>>>>
>>>> Also to give some context, if you see in the 
>>>> FlinkMaterializedGroupByOperator code there are some utility functions we 
>>>> use to map the Wayang UDFs to the Flink (or Spark) UDFs. For example, the 
>>>> line:
>>>> final KeySelector<Type, KeyType> keyExtractor =        
>>>> flinkExecutor.getCompiler().compileKeySelector(this.getKeyDescriptor());
>>>> converts the Wayang UDF keyDescriptor to a Flink UDF KeySelector. Maybe if 
>>>> you check the code of this class you could spot the problem?
>>>> Best
>>>> --
>>>> Zoi
>>>>
>>>>    Στις Τρίτη 25 Απριλίου 2023 στις 01:30:20 μ.μ. CEST, ο χρήστης Michalis 
>>>>Vargiamis <[email protected]> έγραψε:
>>>>
>>>> Hello!
>>>>
>>>>
>>>> I've been working with the missing operator tests for Flink. I've
>>>> successfully done the SortOperator and the UnionAllOperator by seeing
>>>> the respective Spark operator tests and modifying RddChannel to
>>>> DataSetChannel.
>>>>
>>>>
>>>> I'm having trouble with the tests for other operators though, for
>>>> example the FlinkMaterializedGroupByOperator. I tried starting with
>>>> SparkMaterializedGroupByOperatorTest and doing the same RddChannel to
>>>> DataSetChannel modifications as before, but I get the following error:
>>>>
>>>>
>>>> [ERROR]
>>>> org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution
>>>> Time elapsed: 1.911 s  <<< ERROR!
>>>> org.apache.flink.api.common.InvalidProgramException: Return type
>>>> PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0:
>>>> GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]>
>>>> of KeySelector class
>>>> org.apache.wayang.flink.compiler.KeySelectorFunction is not a valid key 
>>>> type
>>>>          at
>>>> org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution(FlinkMaterializedGroupByOperatorTest.java:50)
>>>>
>>>>
>>>> Digging into the operator code a bit more, the error happens at
>>>>
>>>> dataSetInput.groupBy(keyExtractor);
>>>>
>>>>
>>>> Any ideas on what should be changed?
>>>>
>>>>
>>>> Here is a permalink to the respective spark test
>>>> [https://github.com/apache/incubator-wayang/blob/6aad4eea8c91a52f2a41e79424491e6c2c5206af/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/operators/SparkMaterializedGroupByOperatorTest.java]
>>>>
>>>>
>>>> Thank you,
>>>>
>>>> Michalis Vargiamis
>>>>
  

Reply via email to