Till Rohrmann wrote
> What was your problem with using Java POJOs with the Scala API? 

Here's a quick example
<https://gist.github.com/AndrewWhitaker/e51308bb4b43f7ddefc3>   that
demonstrates some of the problems I'm having. I used `max` in the example,
but actually I get an exception for most of the operations I try directly on
Java POJOs. 

The "User" class referenced here is just the Avro example schema hydrated
into a Java POJO. I can post that or the entire project if it'd be helpful.

I included the stack trace of the exception in the gist, but I'll post it
here too:

Exception in thread "main" java.lang.UnsupportedOperationException:
Specifying fields by name is onlysupported on Case Classes (for now).
        at 
org.apache.flink.api.scala.package$.fieldNames2Indices(package.scala:62)
        at org.apache.flink.api.scala.DataSet.aggregate(DataSet.scala:466)
        at org.apache.flink.api.scala.DataSet.max(DataSet.scala:503)
        at SampleAvroJob$.main(SampleAvroJob.scala:12)
        at SampleAvroJob.main(SampleAvroJob.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

And if I use field position instead of field name, I get this exception:

Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: Aggregating on field
positions is only possible on tuple data types.
        at
org.apache.flink.api.scala.operators.ScalaAggregateOperator.<init>(ScalaAggregateOperator.java:71)
        at org.apache.flink.api.scala.DataSet.aggregate(DataSet.scala:455)
        at org.apache.flink.api.scala.DataSet.max(DataSet.scala:482)
        at SampleAvroJob$.main(SampleAvroJob.scala:12)
        at SampleAvroJob.main(SampleAvroJob.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

A workaround is to use `.map` to map to tuples first, but this seems a
little clunky.

Thanks!



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-avro-integration-tp3162p3202.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.

Reply via email to