Hi there!
Turns out the problem had to do with the version of Flink indeed. As
seen here
[https://nightlies.apache.org/flink/flink-docs-release-1.10/release-notes/flink-1.10.html#java-11-support-flink-10725]
support for Java 11 starts from Flink 1.10, whereas the project had
Flink 1.7.1. So after updating it, the error was gone and the tests run
smoothly. So the tests I have added so far, based on their spark
counterparts are:
CoGroupOperator
GlobalReduceOperator
MapPartitionsOperator
MaterializedGroupByOperator
SortOperator
UnionAllOperator
I also, changed the Flink version on the wayang-tests-integration pom so
as to have the same version throughout the project. The integration
tests also run smoothly. I'm gonna make a pull request with those
changes. Do we need tests for other Flink operators as well?
Note also that the above link also mentions the kyro error/warning we
saw with Zoi on my last pull request, and it says "These warnings are
considered harmless and will be addressed in future Flink releases."
Thanks,
Michalis
On 03-May-23 4:38 PM, Zoi Kaoudi wrote:
Hi Michalis,
first, I suggest to make a pull request with what you have that is working.
Regarding the error, I have not encountered this before. I think there should
be a bug in the Flink operator implementation. By browsing the Flink tests we
currently have, it seems that ReduceBy and Join also throw some errors. Can you
check if the ReduceBy and the Join operator throw the same exception with the
one you have?
If so, I suggest taking one of them and going to the Flink documentation of
that operator. It may be that we are using an older API of Flink.
Best
--
Zoi
Στις Τετάρτη 3 Μαΐου 2023 στις 02:43:44 μ.μ. CEST, ο χρήστης Michalis Vargiamis
<[email protected]> έγραψε:
Hello!
Regarding the Flink operator tests, there is progress. First of all, I
looked at the already implemented Spark operator tests, some of them
were already implemented for Flink and some others did not exist for
Flink (like the BernoulliSampleOperator). So what I have implemented so
far are the UnionAllOperator and the SortOperator which went very
smoothly, and also the MaterializedGroupByOperator for which I had to do
some tweaks in the test code and also add the following line to the
operator code
.returns(this.getOutputType().getDataUnitType().getTypeClass());
Now, regarding the CoGroupOperator, GlobalReduceOperator,
MapPartitionsOperator, I get the following error for all of them, that
has got me kind of stuck:
java.lang.IllegalArgumentException
at
org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.<init>(Unknown
Source)
at
org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.<init>(Unknown
Source)
at
org.apache.flink.shaded.asm5.org.objectweb.asm.ClassReader.<init>(Unknown
Source)
at
org.apache.flink.api.java.ClosureCleaner.getClassReader(ClosureCleaner.java:148)
at
org.apache.flink.api.java.ClosureCleaner.cleanThis0(ClosureCleaner.java:115)
at
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:75)
at org.apache.flink.api.java.DataSet.clean(DataSet.java:186)
at
org.apache.flink.api.java.operators.CoGroupOperator$CoGroupOperatorSets$CoGroupOperatorSetsPredicate$CoGroupOperatorWithoutFunction.with(CoGroupOperator.java:622)
at
org.apache.wayang.flink.operators.FlinkCoGroupOperator.evaluate(FlinkCoGroupOperator.java:116)
at
org.apache.wayang.flink.operators.FlinkOperatorTestBase.evaluate(FlinkOperatorTestBase.java:75)
at
org.apache.wayang.flink.operators.FlinkCoGroupOperatorTest.testExecution(FlinkCoGroupOperatorTest.java:72)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
at
org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:43)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
at
java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
at
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
at
org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:82)
at
org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:73)
at
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:248)
at
org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$5(DefaultLauncher.java:211)
at
org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:226)
at
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:199)
at
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:132)
at
org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformProvider.java:188)
at
org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invokeAllTests(JUnitPlatformProvider.java:154)
at
org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.invoke(JUnitPlatformProvider.java:128)
at
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428)
at
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
at
org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562)
at
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548)
Any help or feedback would be appreciated!
Thanks,
Michalis
On 27-Apr-23 8:45 PM, Jorge Arnulfo Quiané Ruiz wrote:
Hi Michalis,
Sure! It was more of a heads-up than anything else.
Please don’t hesitate to pose your questions here or to call for a meeting if
necessary :)
Best,
Jorge
On 27 Apr 2023, at 16.52, Michalis Vargiamis <[email protected]>
wrote:
Hi!
Thanks! Actually I've lost quite some time re configuring my setup. I currently
have a windows laptop, so for the Linux I initially tried the WSL that windows
provide but it turned out to make the whole development/debugging process quite
inefficient, so i switched to vmware but then again I lost some time with some
other stuff. Anyway, I'd have to ask for a bit more time on this one.
Thank you,
Michalis
On 27-Apr-23 10:41 AM, Jorge Arnulfo Quiané Ruiz wrote:
Hi Michalis,
Please let us know if you need help :)
Best,
Jorge
On 25 Apr 2023, at 14.07, Zoi Kaoudi <[email protected]> wrote:
Hi Michalis,
can you double check that you define the types of the Tuple2 output?
According to the error
"Return type
PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0:
GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]>"it seems
like the Tuple2 does not have specific types but they are java.lang.Objects. Maybe that could be
the problem.
Also to give some context, if you see in the FlinkMaterializedGroupByOperator
code there are some utility functions we use to map the Wayang UDFs to the
Flink (or Spark) UDFs. For example, the line:
final KeySelector<Type, KeyType> keyExtractor =
flinkExecutor.getCompiler().compileKeySelector(this.getKeyDescriptor());
converts the Wayang UDF keyDescriptor to a Flink UDF KeySelector. Maybe if you
check the code of this class you could spot the problem?
Best
--
Zoi
Στις Τρίτη 25 Απριλίου 2023 στις 01:30:20 μ.μ. CEST, ο χρήστης Michalis
Vargiamis <[email protected]> έγραψε:
Hello!
I've been working with the missing operator tests for Flink. I've
successfully done the SortOperator and the UnionAllOperator by seeing
the respective Spark operator tests and modifying RddChannel to
DataSetChannel.
I'm having trouble with the tests for other operators though, for
example the FlinkMaterializedGroupByOperator. I tried starting with
SparkMaterializedGroupByOperatorTest and doing the same RddChannel to
DataSetChannel modifications as before, but I get the following error:
[ERROR]
org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution
Time elapsed: 1.911 s <<< ERROR!
org.apache.flink.api.common.InvalidProgramException: Return type
PojoType<org.apache.wayang.basic.data.Tuple2, fields = [field0:
GenericType<java.lang.Object>, field1: GenericType<java.lang.Object>]>
of KeySelector class
org.apache.wayang.flink.compiler.KeySelectorFunction is not a valid key type
at
org.apache.wayang.flink.operators.FlinkMaterializedGroupByOperatorTest.testExecution(FlinkMaterializedGroupByOperatorTest.java:50)
Digging into the operator code a bit more, the error happens at
dataSetInput.groupBy(keyExtractor);
Any ideas on what should be changed?
Here is a permalink to the respective spark test
[https://github.com/apache/incubator-wayang/blob/6aad4eea8c91a52f2a41e79424491e6c2c5206af/wayang-platforms/wayang-spark/code/test/java/org/apache/wayang/spark/operators/SparkMaterializedGroupByOperatorTest.java]
Thank you,
Michalis Vargiamis