Hi Timo, Thanks, I’ll give the ResultTypeQueryable interface a try - my previous experience registering custom Kryo serializers wasn’t so positive.
Though I’m still curious as to whether java.lang.ClassCastException I got was representative of a bug in Flink, or my doing something wrong. But with the ongoing deprecation of DataSet support, I imagine that’s a low priority issue in any case. Regards, — Ken > On Jun 4, 2021, at 7:05 AM, Timo Walther <twal...@apache.org> wrote: > > Hi Ken, > > non-POJOs are serialized with Kryo. This might not give you optimal > performance. You can register a custom Kryo serializer in ExecutionConfig to > speed up the serialization. > > Alternatively, you can implement `ResultTypeQueryable` provide a custom type > information with a custom serializer. > > I hope this helps. Otherwise can you share a little example how you would > like to cann partitionCustom()? > > Regards, > Timo > > On 04.06.21 15:38, Ken Krugler wrote: >> Hi all, >> I'm using Flink 1.12 and a custom partitioner/partitioning key (batch mode, >> with a DataSet) to do a better job of distributing data to tasks. The >> classes look like: >> public class MyPartitioner implements Partitioner<MyGroupingKey> >> { >> ... >> } >> public class MyGroupingKey implements Comparable<MyGroupingKey> >> { >> ... >> } >> This worked fine, but I noticed a warning logged by Flink about >> MyGroupingKey not having an empty constructor, and thus not being treated as >> a POJO. >> I added that empty constructor, and then I got an error because >> partitionCustom() only works on a single field key. >> So I changed MyGroupingKey to have a single field (a string), with transient >> cached values for the pieces of the key that I need while partitioning. Now >> I get an odd error: >> java.lang.RuntimeException: Error while calling custom partitioner >> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to >> MyGroupingKey >> at MyPartitioner.partition(AdsPinotFilePartitioner.java:11) >> at >> org.apache.flink.runtime.operators.shipping.OutputEmitter.customPartition(OutputEmitter.java:235) >> ... 19 more >> So I've got two questions… >> • Should I just get rid of the empty constructor, and have Flink treat it as >> a non-POJO? This seemed to be working fine. >> • Is it a bug in Flink that the extracted field from the key is being used >> as the expected type for partitioning? >> Thanks! >> — Ken >> -------------------------- >> Ken Krugler >> http://www.scaleunlimited.com <http://www.scaleunlimited.com> >> Custom big data solutions >> Flink, Pinot, Solr, Elasticsearch > -------------------------- Ken Krugler http://www.scaleunlimited.com Custom big data solutions Flink, Pinot, Solr, Elasticsearch