If I understood you correctly, you want to compute windows in parallel without using a key. Are you aware that the results of such a computation is not deterministic and kind of arbitrary?
If that is still OK for you, you can use a mapper to assign the current parallel index as a key field, i.e., wrap the data in a Tuple2<Key, PayLoad> and then do a keyBy(0). This will keep the data local. The mapper should extend RichMapFunction. You can access the parallel index via getRuntimeContext().getParallelSubTaskId(). Hope this helps. Cheers, Fabian 2016-06-11 11:53 GMT+02:00 Yukun Guo <gyk....@gmail.com>: > Thx, now I use element.hashCode() % nPartitions and it works as expected. > > But I'm afraid it's not a best practice for just turning a plain (already > paralellized) DataStream into a KeyedStream? Because it introduces some > overhead due to physical repartitioning by key, which is unnecessary since > I don't really care about keys. > > On 9 June 2016 at 22:00, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Yukun, >> >> the problem is that the KeySelector is internally invoked multiple times. >> Hence it must be deterministic, i.e., it must extract the same key for >> the same object if invoked multiple times. >> The documentation is not discussing this aspect and should be extended. >> >> Thanks for pointing out this issue. >> >> Cheers, >> Fabian >> >> >> 2016-06-09 13:19 GMT+02:00 Yukun Guo <gyk....@gmail.com>: >> >>> I’m playing with the (Window)WordCount example from Flink QuickStart. I >>> generate a DataStream consisting of 1000 Strings of random digits, >>> which is windowed with a tumbling count window of 50 elements: >>> >>> import org.apache.flink.api.common.functions.FlatMapFunction;import >>> org.apache.flink.api.java.functions.KeySelector;import >>> org.apache.flink.api.java.tuple.Tuple2;import >>> org.apache.flink.streaming.api.datastream.DataStream;import >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import >>> org.apache.flink.util.Collector; >>> import java.util.Random; >>> public class DigitCount { >>> >>> >>> public static void main(String[] args) throws Exception { >>> final StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> >>> DataStream<String> text = env.fromElements( >>> "14159265358979323846264338327950288419716939937510", >>> "58209749445923078164062862089986280348253421170679", >>> "82148086513282306647093844609550582231725359408128", >>> "48111745028410270193852110555964462294895493038196", >>> "44288109756659334461284756482337867831652712019091", >>> "45648566923460348610454326648213393607260249141273", >>> "72458700660631558817488152092096282925409171536436", >>> "78925903600113305305488204665213841469519415116094", >>> "33057270365759591953092186117381932611793105118548", >>> "07446237996274956735188575272489122793818301194912", >>> "98336733624406566430860213949463952247371907021798", >>> "60943702770539217176293176752384674818467669405132", >>> "00056812714526356082778577134275778960917363717872", >>> "14684409012249534301465495853710507922796892589235", >>> "42019956112129021960864034418159813629774771309960", >>> "51870721134999999837297804995105973173281609631859", >>> "50244594553469083026425223082533446850352619311881", >>> "71010003137838752886587533208381420617177669147303", >>> "59825349042875546873115956286388235378759375195778", >>> "18577805321712268066130019278766111959092164201989" >>> ); >>> >>> DataStream<Tuple2<Integer, Integer>> digitCount = text >>> .flatMap(new Splitter()) >>> .keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() >>> { >>> @Override >>> public Integer getKey(Tuple2<Integer, Integer> x) >>> throws Exception { >>> return x.f0 % 2; >>> } >>> }) >>> .countWindow(50) >>> .sum(1); >>> >>> digitCount.print(); >>> env.execute(); >>> >>> } >>> >>> public static final class Splitter implements FlatMapFunction<String, >>> Tuple2<Integer, Integer>> { >>> @Override >>> public void flatMap(String value, Collector<Tuple2<Integer, >>> Integer>> out) { >>> for (String token : value.split("")) { >>> if (token.length() == 0) { >>> continue; >>> } >>> out.collect(Tuple2.of(Integer.parseInt(token), 1)); >>> } >>> } >>> } >>> } >>> >>> The code above will produce 19 lines of output which is reasonable as >>> the 1000 digits will be keyed into 2 partitions where one partition >>> contains 500+ elements and the other contains slightly fewer than 500 >>> elements, therefore as a result one 50-digit window is ignored. >>> >>> So far so good, but if I replace the mod KeySelector with a random one: >>> >>> private static class RandomKeySelector<T> implements KeySelector<T, >>> Integer> { >>> private int nPartitions; >>> private Random random; >>> >>> RandomKeySelector(int nPartitions) { >>> this.nPartitions = nPartitions; >>> random = new Random(); >>> } >>> >>> @Override >>> public Integer getKey(T dummy) throws Exception { >>> return random.nextInt(this.nPartitions); >>> } >>> } >>> >>> and then >>> >>> .keyBy(new RandomKeySelector<Tuple2<Integer, Integer>>(2)) >>> >>> it may generate 17 or 18 lines of output. How could that happen? >>> Moreover, if I set the number of partitions to 10, in theory the lines of >>> output should be no fewer than 11, but actually it can be only 9. >>> >>> Please help me understand why countWindow behaves like this. >>> >> >> >