Would you like to open an issue for this for starters Chesnay? Would be good to fix for the upcoming release even.
On 8 December 2016 at 16:39:58, Chesnay Schepler (ches...@apache.org) wrote: > It would be neat if we could support arrays as keys directly; it should > boil down to checking the key type and in case of an array injecting a > KeySelector that calls Arrays.hashCode(array). > This worked for me when i ran into the same issue while experimenting > with some stuff. > > The batch API can use arrays as keys as well, so it's also a matter of > consistency imo. > > Regards, > Chesnay > > On 08.12.2016 16:23, Ufuk Celebi wrote: > > @Aljoscha: I remember that someone else ran into this, too. Should we > > address arrays > as keys specifically in the API? Prohibit? Document this? > > > > – Ufuk > > > > On 7 December 2016 at 17:41:40, Andrew Roberts (arobe...@fuze.com) wrote: > >> Sure! > >> > >> (Aside, it turns out that the issue was using an `Array[Byte]` as a key - > >> byte arrays > don’t > >> appear to have a stable hashCode. I’ll provide the skeleton for fullness, > >> though.) > >> > >> val env = StreamExecutionEnvironment.getExecutionEnvironment > >> env.setParallelism(Config.callAggregator.parallelism) > >> > >> env.addSource(kafkaSource) > >> .flatMap(transformToRecords(_)) > >> .keyBy(b => new String(b.rowKey)) // rowKey is Array[Byte] > >> .map(new StatefulAggregator()) > >> .addSink(hbaseSink) > >> > >> > >> Again, wrapping my keyBy function in `new String()` has fixed my issue. > >> Thanks! > >> > >> -a > >> > >> > >> > >>> On Dec 7, 2016, at 11:28 AM, Stefan Richter wrote: > >>> > >>> Hi, > >>> > >>> could you maybe provide the (minimal) code for the problematic job? Also, > >>> are you > sure > >> that the keyBy is working on the correct key attribute? > >>> Best, > >>> Stefan > >>> > >>>> Am 07.12.2016 um 15:57 schrieb Andrew Roberts : > >>>> > >>>> Hello, > >>>> > >>>> I’m trying to perform a stateful mapping of some objects coming in from > >>>> Kafka in a > parallelized > >> flink job (set on the job using env.setParallelism(3)). The data source is > >> a kafka > topic, > >> but the partitions aren’t meaningfully keyed for this operation (each > >> kafka message > >> is flatMapped to between 0-2 objects, with potentially different keys). I > >> have a keyBy() > >> operator directly before my map(), but I’m seeing objects with the same > >> key distributed > >> to different parallel task instances, as reported by > >> getRuntimeContext().getIndexOfThisSubtask(). > >>>> My understanding of keyBy is that it would segment the stream by key, > >>>> and guarantee > >> that all data with a given key would hit the same instance. Am I possibly > >> seeing residual > >> “keying” from the kafka topic? > >>>> I’m running flink 1.1.3 in scala. Please let me know if I can add more > >>>> info. > >>>> > >>>> Thanks, > >>>> > >>>> Andrew > >> > >> > > > >