Hello, thanks for the answer.

1. There is currently no way to avoid the repartitioning. When you do a
> keyBy(), Flink will shuffle the data through the network. What you would
> need is a way to tell Flink that the data is already partitioned. If you
> would use keyed state, you would also need to ensure that the same hash
> function is used for the partitions and the state.
>

Is it an assumption only or are some examples exist? Yesterday I wrote a
question about incompatibility of keyed serializer in Flink with Kafka's
deserializer.

2. Why do you assume that this would end up in one partition?


Just assumption. I don't know ways how to check it.

3. You can also read old messages from a Kafka topic by setting the
> "auto.offset.reset" to "smallest" (or "latest") and using a new "group.id
> ".
>

Ok, I know about it. But "smallest" is a way to repeat test with same data.


The question from my side in general. Is the implementation
https://github.com/rssdev10/flink-kafka-streaming/blob/master/src/main/java/FlinkStreamingConsumer.java
appropriate to the schema in the first email?


Regarding the example. This small autonomous test is based on DIMA's
project. And in this form you can use it, If it may be useful.


Thanks,
best regards.

2016-08-29 13:54 GMT+02:00 Robert Metzger <rmetz...@apache.org>:

> Hi rss,
>
> Concerning your questions:
> 1. There is currently no way to avoid the repartitioning. When you do a
> keyBy(), Flink will shuffle the data through the network. What you would
> need is a way to tell Flink that the data is already partitioned. If you
> would use keyed state, you would also need to ensure that the same hash
> function is used for the partitions and the state.
>
> 2. Why do you assume that this would end up in one partition?
>
> 3. You can also read old messages from a Kafka topic by setting the
> "auto.offset.reset" to "smallest" (or "latest") and using a new "group.id
> ".
>
> I'll add Aljoscha and Kostas to the eMail. Maybe they can help with the
> incorrect results of the windowing.
>
> Regards,
> Robert
>
>
> On Thu, Aug 25, 2016 at 8:21 PM, rss rss <rssde...@gmail.com> wrote:
>
>> Hello,
>>
>>   I want to implement something like a schema of processing which is
>> presented on following diagram. This is calculation of number of unique
>> users per specified time with assumption that we have > 100k events per
>> second and > 100M unique users:
>>
>>
>>
>>  I have one Kafka's topic of events with a partitioner by hash(userId) %
>> partitionsNum  https://github.com/rssdev10/fl
>> ink-kafka-streaming/blob/master/src/main/java/KafkaPartitioner.java. I
>> have prepared a runnable example https://github.com/rssdev10/fl
>> ink-kafka-streaming/blob/master/src/main/java/FlinkStreamingConsumer.java
>>
>>  And the project is available by https://github.com/rssdev10/fl
>> ink-kafka-streaming/ . Also see this page about how to run data
>> generator and run the test.
>>
>>  Basic assumption. I need to calculate a number of unique identifiers, so
>> I need to store them in a memory in Set<String> structure but the size of
>> this data structure is dozens GB. So I need to partitioning data by
>> identifier to reduce size and collect only already calculated numbers per
>> specified time. E.g. every hour.
>>
>>  Questions:
>>
>>    1. The logic of Flink is very hidden. Window operator requires keyed
>>    stream. Does it means that when I'm doing
>>
>>    eventStream.keyBy(event -> event.partition(partNum));
>>
>>    with the same partitioner as used for Kafka then Flink saves primary
>>    partitions? I want to avoid any repartitioning.
>>    2. Then I'm doing
>>
>>    WindowedStream<Event, Integer, TimeWindow> uniqUsersWin =
>>            userIdKeyed.timeWindow(Time.seconds(windowDurationTime));
>>
>>    DataStream<ProductAggregator> uniqUsers = 
>> uniqUsersWin.trigger(ProcessingTimeTrigger.create())
>>            .fold(new UniqAggregator(), (FoldFunction<Event, UniqAggregator>) 
>> (accumulator, value) -> {
>>                accumulator.uniqIds.add(value.getUserId());
>>
>>                accumulator.registerEvent(value);
>>
>>                return accumulator;
>>            })
>>
>>    does it mean that I have only one partition?
>>    3. Next, I want to collect partial results of aggregation. I'm using
>>    a custom trigger https://github.com/rssdev10/fl
>>    ink-kafka-streaming/blob/master/src/main/java/CountOrTimeTrigger.java
>>    
>> <https://github.com/rssdev10/flink-kafka-streaming/blob/master/src/main/java/CountOrTimeTrigger.java>
>>    which provides firing on collected partial aggregates accordingly to 
>> number
>>    of Kafka's partitions of by emergency time if the number of aggregates is
>>    not enough. And the following code for aggregation:
>>
>>    AllWindowedStream<ProductAggregator, TimeWindow> combinedUniqNumStream =
>>            uniqUsers
>>                    .timeWindowAll(Time.seconds(emergencyTriggerTimeout))
>>                    
>> .trigger(PurgingTrigger.of(CountOrTimeTrigger.of(partNum)));
>>
>>    combinedUniqNumStream
>>            .fold(new ProductAggregator(),
>>                    (FoldFunction<ProductAggregator, ProductAggregator>) 
>> (accumulator, value) -> {
>>                accumulator.value += value.value;
>>
>>                accumulator.summarize(value);
>>
>>                return accumulator;
>>            })
>>
>>    But sometime I see an incorrect number of unique identifiers probably
>>    because of skewing of the partial aggregates. This test generates not more
>>    than 1000 identifiers. It is possible to see it when this test is ran 
>> after
>>    preloading of messages to Kafka.
>>
>>
>> PS: I found some information at http://data-artisans.com/kafka
>> -flink-a-practical-how-to/ and https://www.elastic.co/blog/bu
>> ilding-real-time-dashboard-applications-with-apache-flink-
>> elasticsearch-and-kibana but unfortunately these articles doesn't answer
>> how to build the specified schema.
>>
>>
>> Cheers
>>
>>
>> ​
>>
>
>

Reply via email to