Would you like to open an issue for this for starters Chesnay? Would be good to 
fix for the upcoming release even.


On 8 December 2016 at 16:39:58, Chesnay Schepler (ches...@apache.org) wrote:
> It would be neat if we could support arrays as keys directly; it should
> boil down to checking the key type and in case of an array injecting a
> KeySelector that calls Arrays.hashCode(array).
> This worked for me when i ran into the same issue while experimenting
> with some stuff.
>  
> The batch API can use arrays as keys as well, so it's also a matter of
> consistency imo.
>  
> Regards,
> Chesnay
>  
> On 08.12.2016 16:23, Ufuk Celebi wrote:
> > @Aljoscha: I remember that someone else ran into this, too. Should we 
> > address arrays  
> as keys specifically in the API? Prohibit? Document this?
> >
> > – Ufuk
> >
> > On 7 December 2016 at 17:41:40, Andrew Roberts (arobe...@fuze.com) wrote:
> >> Sure!
> >>
> >> (Aside, it turns out that the issue was using an `Array[Byte]` as a key - 
> >> byte arrays  
> don’t
> >> appear to have a stable hashCode. I’ll provide the skeleton for fullness, 
> >> though.)  
> >>
> >> val env = StreamExecutionEnvironment.getExecutionEnvironment
> >> env.setParallelism(Config.callAggregator.parallelism)
> >>
> >> env.addSource(kafkaSource)
> >> .flatMap(transformToRecords(_))
> >> .keyBy(b => new String(b.rowKey)) // rowKey is Array[Byte]
> >> .map(new StatefulAggregator())
> >> .addSink(hbaseSink)
> >>
> >>
> >> Again, wrapping my keyBy function in `new String()` has fixed my issue. 
> >> Thanks!
> >>
> >> -a
> >>
> >>
> >>
> >>> On Dec 7, 2016, at 11:28 AM, Stefan Richter wrote:
> >>>
> >>> Hi,
> >>>
> >>> could you maybe provide the (minimal) code for the problematic job? Also, 
> >>> are you  
> sure
> >> that the keyBy is working on the correct key attribute?
> >>> Best,
> >>> Stefan
> >>>
> >>>> Am 07.12.2016 um 15:57 schrieb Andrew Roberts :
> >>>>
> >>>> Hello,
> >>>>
> >>>> I’m trying to perform a stateful mapping of some objects coming in from 
> >>>> Kafka in a  
> parallelized
> >> flink job (set on the job using env.setParallelism(3)). The data source is 
> >> a kafka  
> topic,
> >> but the partitions aren’t meaningfully keyed for this operation (each 
> >> kafka message  
> >> is flatMapped to between 0-2 objects, with potentially different keys). I 
> >> have a keyBy()  
> >> operator directly before my map(), but I’m seeing objects with the same 
> >> key distributed  
> >> to different parallel task instances, as reported by 
> >> getRuntimeContext().getIndexOfThisSubtask().  
> >>>> My understanding of keyBy is that it would segment the stream by key, 
> >>>> and guarantee  
> >> that all data with a given key would hit the same instance. Am I possibly 
> >> seeing residual  
> >> “keying” from the kafka topic?
> >>>> I’m running flink 1.1.3 in scala. Please let me know if I can add more 
> >>>> info.
> >>>>
> >>>> Thanks,
> >>>>
> >>>> Andrew
> >>
> >>
> >
>  
>  

Reply via email to