If I understood you correctly, you want to compute windows in parallel
without using a key.
Are you aware that the results of such a computation is not deterministic
and kind of arbitrary?

If that is still OK for you, you can use a mapper to assign the current
parallel index as a key field, i.e., wrap the data in a Tuple2<Key,
PayLoad> and then do a keyBy(0). This will keep the data local. The mapper
should extend RichMapFunction. You can access the parallel index via
getRuntimeContext().getParallelSubTaskId().

Hope this helps.
Cheers, Fabian

2016-06-11 11:53 GMT+02:00 Yukun Guo <gyk....@gmail.com>:

> Thx, now I use element.hashCode() % nPartitions and it works as expected.
>
> But I'm afraid it's not a best practice for just turning a plain (already
> paralellized) DataStream into a KeyedStream? Because it introduces some
> overhead due to physical repartitioning by key, which is unnecessary since
> I don't really care about keys.
>
> On 9 June 2016 at 22:00, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Yukun,
>>
>> the problem is that the KeySelector is internally invoked multiple times.
>> Hence it must be deterministic, i.e., it must extract the same key for
>> the same object if invoked multiple times.
>> The documentation is not discussing this aspect and should be extended.
>>
>> Thanks for pointing out this issue.
>>
>> Cheers,
>> Fabian
>>
>>
>> 2016-06-09 13:19 GMT+02:00 Yukun Guo <gyk....@gmail.com>:
>>
>>> I’m playing with the (Window)WordCount example from Flink QuickStart. I
>>> generate a DataStream consisting of 1000 Strings of random digits,
>>> which is windowed with a tumbling count window of 50 elements:
>>>
>>> import org.apache.flink.api.common.functions.FlatMapFunction;import 
>>> org.apache.flink.api.java.functions.KeySelector;import 
>>> org.apache.flink.api.java.tuple.Tuple2;import 
>>> org.apache.flink.streaming.api.datastream.DataStream;import 
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import
>>>  org.apache.flink.util.Collector;
>>> import java.util.Random;
>>> public class DigitCount {
>>>
>>>
>>>     public static void main(String[] args) throws Exception {
>>>         final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>
>>>         DataStream<String> text = env.fromElements(
>>>                 "14159265358979323846264338327950288419716939937510",
>>>                 "58209749445923078164062862089986280348253421170679",
>>>                 "82148086513282306647093844609550582231725359408128",
>>>                 "48111745028410270193852110555964462294895493038196",
>>>                 "44288109756659334461284756482337867831652712019091",
>>>                 "45648566923460348610454326648213393607260249141273",
>>>                 "72458700660631558817488152092096282925409171536436",
>>>                 "78925903600113305305488204665213841469519415116094",
>>>                 "33057270365759591953092186117381932611793105118548",
>>>                 "07446237996274956735188575272489122793818301194912",
>>>                 "98336733624406566430860213949463952247371907021798",
>>>                 "60943702770539217176293176752384674818467669405132",
>>>                 "00056812714526356082778577134275778960917363717872",
>>>                 "14684409012249534301465495853710507922796892589235",
>>>                 "42019956112129021960864034418159813629774771309960",
>>>                 "51870721134999999837297804995105973173281609631859",
>>>                 "50244594553469083026425223082533446850352619311881",
>>>                 "71010003137838752886587533208381420617177669147303",
>>>                 "59825349042875546873115956286388235378759375195778",
>>>                 "18577805321712268066130019278766111959092164201989"
>>>         );
>>>
>>>         DataStream<Tuple2<Integer, Integer>> digitCount = text
>>>                 .flatMap(new Splitter())
>>>                 .keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() 
>>> {
>>>                     @Override
>>>                     public Integer getKey(Tuple2<Integer, Integer> x) 
>>> throws Exception {
>>>                         return x.f0 % 2;
>>>                     }
>>>                 })
>>>                 .countWindow(50)
>>>                 .sum(1);
>>>
>>>         digitCount.print();
>>>         env.execute();
>>>
>>>     }
>>>
>>>     public static final class Splitter implements FlatMapFunction<String, 
>>> Tuple2<Integer, Integer>> {
>>>         @Override
>>>         public void flatMap(String value, Collector<Tuple2<Integer, 
>>> Integer>> out) {
>>>             for (String token : value.split("")) {
>>>                 if (token.length() == 0) {
>>>                     continue;
>>>                 }
>>>                 out.collect(Tuple2.of(Integer.parseInt(token), 1));
>>>             }
>>>         }
>>>     }
>>> }
>>>
>>> The code above will produce 19 lines of output which is reasonable as
>>> the 1000 digits will be keyed into 2 partitions where one partition
>>> contains 500+ elements and the other contains slightly fewer than 500
>>> elements, therefore as a result one 50-digit window is ignored.
>>>
>>> So far so good, but if I replace the mod KeySelector with a random one:
>>>
>>> private static class RandomKeySelector<T> implements KeySelector<T, 
>>> Integer> {
>>>     private int nPartitions;
>>>     private Random random;
>>>
>>>     RandomKeySelector(int nPartitions) {
>>>         this.nPartitions = nPartitions;
>>>         random = new Random();
>>>     }
>>>
>>>     @Override
>>>     public Integer getKey(T dummy) throws Exception {
>>>         return random.nextInt(this.nPartitions);
>>>     }
>>> }
>>>
>>> and then
>>>
>>> .keyBy(new RandomKeySelector<Tuple2<Integer, Integer>>(2))
>>>
>>> it may generate 17 or 18 lines of output. How could that happen?
>>> Moreover, if I set the number of partitions to 10, in theory the lines of
>>> output should be no fewer than 11, but actually it can be only 9.
>>>
>>> Please help me understand why countWindow behaves like this.
>>>
>>
>>
>

Reply via email to