Hi,

Konstantin is right.
reinterpreteAsKeyedStream only works if you call it on a DataStream that
was keyBy'ed before (with the same parallelism).
Flink cannot reuse the partioning of another system like Kafka.

Best, Fabian


Adrienne Kole <adrienneko...@gmail.com> schrieb am Do., 4. Apr. 2019, 14:33:

> Thanks a lot for the replies.
>
> Below I paste my code:
>
>
>         DataStreamSource<Tuple> source = env.addSource(new MySource());
>         KeyedStream<Tuple, Integer> keyedStream =
> DataStreamUtils.reinterpretAsKeyedStream(source, new DummyKeySelector(),
> TypeInformation.of(Integer.class) );
>         keyedStream.timeWindow(Time.seconds(1)).apply(new
> WindowFunction<Tuple, Object, Integer, TimeWindow>() {
>             @Override
>             public void apply(Integer integer, TimeWindow timeWindow,
> Iterable<Tuple> iterable, Collector<Object> collector) throws Exception {
>                 collector.collect(1);
>             }
>         });
>         env.execute("Test");
>
>     static class DummyKeySelector implements KeySelector<Tuple, Integer> {
>
>         @Override
>         public Integer getKey(Tuple value) throws Exception {
>             return value.getSourceID();
>         }
>     }
>
>     static class MySource extends RichParallelSourceFunction<Tuple> {
>         public MySource() {
>             this.sourceID = sourceID;
>         }
>         @Override
>         public void open(Configuration parameters) throws Exception {
>             sourceID = sourceID +
> getRuntimeContext().getIndexOfThisSubtask();
>         }
>
>         @Override
>         public void run(SourceContext<Tuple> ctx) throws Exception {
>             while (true) {
>                 Tuple tuple = new Tuple(sourceID);
>                 ctx.collect(tuple);
>             }
>         }
>
>         @Override
>         public void cancel() {
>
>         }
>     }
>
>
> Whatever I do, I get
> Caused by: java.lang.IllegalArgumentException
>     at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>
> When I check the details from the source code, it seems that some keys are
> not within allowed key range, that is why Flink throws an exception.
> In this case, as Konstantin said, it is not possible to interpret source
> as keyed.
> Please correct me if I am wrong.
>
>
> Thanks,
> Adrienne
>
>
>
>
>
>
>
> On Wed, Apr 3, 2019 at 8:08 PM Konstantin Knauf <konstan...@ververica.com>
> wrote:
>
>> Hi Adrienne,
>>
>> you can only use DataStream#reinterpretAsKeyedStream on a stream, which
>> has previously been keyed/partitioned by Flink with exactly the same
>> KeySelector as given to reinterpretAsKeyedStream. It does not work with a
>> key-partitioned stream, which has been partitioned by any other process.
>>
>> Best,
>>
>> Konstantin
>>
>> On Fri, Mar 29, 2019 at 11:47 PM Rong Rong <walter...@gmail.com> wrote:
>>
>>> Hi Adrienne,
>>>
>>> I think you should be able to reinterpretAsKeyedStream by passing in a
>>> DataStreamSource based on the ITCase example [1].
>>> Can you share the full code/error logs or the IAE?
>>>
>>> --
>>> Rong
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/release-1.7.2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java#L98
>>>
>>> On Fri, Mar 29, 2019 at 6:09 AM Adrienne Kole <adrienneko...@gmail.com>
>>> wrote:
>>>
>>>> Dear community,
>>>>
>>>> I have a use-case where sources are keyed.
>>>> For example, there is a source function with parallelism 10, and each
>>>> instance has its own key.
>>>> I used reinterpretAsKeyedStream to convert source DataStream to
>>>> KeyedStream, however, I get an IllegalArgument exception.
>>>> Is reinterpretAsKeyedStream can be used with source operators as well,
>>>> or should the operator to be used be already partitioned (by keyby(..)) ?
>>>>
>>>> Thanks,
>>>> Adrienne
>>>>
>>>
>>
>> --
>>
>> Konstantin Knauf | Solutions Architect
>>
>> +49 160 91394525
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Data Artisans GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>
>

Reply via email to