Hi Timo,

Thanks, I’ll give the ResultTypeQueryable interface a try - my previous 
experience registering custom Kryo serializers wasn’t so positive.

Though I’m still curious as to whether java.lang.ClassCastException I got was 
representative of a bug in Flink, or my doing something wrong.

But with the ongoing deprecation of DataSet support, I imagine that’s a low 
priority issue in any case.

Regards,

— Ken


> On Jun 4, 2021, at 7:05 AM, Timo Walther <twal...@apache.org> wrote:
> 
> Hi Ken,
> 
> non-POJOs are serialized with Kryo. This might not give you optimal 
> performance. You can register a custom Kryo serializer in ExecutionConfig to 
> speed up the serialization.
> 
> Alternatively, you can implement `ResultTypeQueryable` provide a custom type 
> information with a custom serializer.
> 
> I hope this helps. Otherwise can you share a little example how you would 
> like to cann partitionCustom()?
> 
> Regards,
> Timo
> 
> On 04.06.21 15:38, Ken Krugler wrote:
>> Hi all,
>> I'm using Flink 1.12 and a custom partitioner/partitioning key (batch mode, 
>> with a DataSet) to do a better job of distributing data to tasks. The 
>> classes look like:
>> public class MyPartitioner implements Partitioner<MyGroupingKey>
>> {
>>     ...
>> }
>> public class MyGroupingKey implements Comparable<MyGroupingKey>
>> {
>>     ...
>> }
>> This worked fine, but I noticed a warning logged by Flink about 
>> MyGroupingKey not having an empty constructor, and thus not being treated as 
>> a POJO.
>> I added that empty constructor, and then I got an error because 
>> partitionCustom() only works on a single field key.
>> So I changed MyGroupingKey to have a single field (a string), with transient 
>> cached values for the pieces of the key that I need while partitioning. Now 
>> I get an odd error:
>> java.lang.RuntimeException: Error while calling custom partitioner
>> Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to 
>> MyGroupingKey
>>         at MyPartitioner.partition(AdsPinotFilePartitioner.java:11)
>>         at 
>> org.apache.flink.runtime.operators.shipping.OutputEmitter.customPartition(OutputEmitter.java:235)
>>         ... 19 more
>> So I've got two questions…
>> • Should I just get rid of the empty constructor, and have Flink treat it as 
>> a non-POJO? This seemed to be working fine.
>> • Is it a bug in Flink that the extracted field from the key is being used 
>> as the expected type for partitioning?
>> Thanks!
>> — Ken
>> --------------------------
>> Ken Krugler
>> http://www.scaleunlimited.com <http://www.scaleunlimited.com>
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
> 

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Reply via email to