Hi rss,

Concerning your questions:
1. There is currently no way to avoid the repartitioning. When you do a
keyBy(), Flink will shuffle the data through the network. What you would
need is a way to tell Flink that the data is already partitioned. If you
would use keyed state, you would also need to ensure that the same hash
function is used for the partitions and the state.

2. Why do you assume that this would end up in one partition?

3. You can also read old messages from a Kafka topic by setting the
"auto.offset.reset" to "smallest" (or "latest") and using a new "group.id".

I'll add Aljoscha and Kostas to the eMail. Maybe they can help with the
incorrect results of the windowing.

Regards,
Robert


On Thu, Aug 25, 2016 at 8:21 PM, rss rss <rssde...@gmail.com> wrote:

> Hello,
>
>   I want to implement something like a schema of processing which is
> presented on following diagram. This is calculation of number of unique
> users per specified time with assumption that we have > 100k events per
> second and > 100M unique users:
>
>
>
>  I have one Kafka's topic of events with a partitioner by hash(userId) %
> partitionsNum  https://github.com/rssdev10/flink-kafka-streaming/blob/
> master/src/main/java/KafkaPartitioner.java. I have prepared a runnable
> example https://github.com/rssdev10/flink-kafka-streaming/blob/
> master/src/main/java/FlinkStreamingConsumer.java
>
>  And the project is available by https://github.com/rssdev10/
> flink-kafka-streaming/ . Also see this page about how to run data
> generator and run the test.
>
>  Basic assumption. I need to calculate a number of unique identifiers, so
> I need to store them in a memory in Set<String> structure but the size of
> this data structure is dozens GB. So I need to partitioning data by
> identifier to reduce size and collect only already calculated numbers per
> specified time. E.g. every hour.
>
>  Questions:
>
>    1. The logic of Flink is very hidden. Window operator requires keyed
>    stream. Does it means that when I'm doing
>
>    eventStream.keyBy(event -> event.partition(partNum));
>
>    with the same partitioner as used for Kafka then Flink saves primary
>    partitions? I want to avoid any repartitioning.
>    2. Then I'm doing
>
>    WindowedStream<Event, Integer, TimeWindow> uniqUsersWin =
>            userIdKeyed.timeWindow(Time.seconds(windowDurationTime));
>
>    DataStream<ProductAggregator> uniqUsers = 
> uniqUsersWin.trigger(ProcessingTimeTrigger.create())
>            .fold(new UniqAggregator(), (FoldFunction<Event, UniqAggregator>) 
> (accumulator, value) -> {
>                accumulator.uniqIds.add(value.getUserId());
>
>                accumulator.registerEvent(value);
>
>                return accumulator;
>            })
>
>    does it mean that I have only one partition?
>    3. Next, I want to collect partial results of aggregation. I'm using a
>    custom trigger https://github.com/rssdev10/flink-kafka-streaming/blob/
>    master/src/main/java/CountOrTimeTrigger.java
>    
> <https://github.com/rssdev10/flink-kafka-streaming/blob/master/src/main/java/CountOrTimeTrigger.java>
>    which provides firing on collected partial aggregates accordingly to number
>    of Kafka's partitions of by emergency time if the number of aggregates is
>    not enough. And the following code for aggregation:
>
>    AllWindowedStream<ProductAggregator, TimeWindow> combinedUniqNumStream =
>            uniqUsers
>                    .timeWindowAll(Time.seconds(emergencyTriggerTimeout))
>                    
> .trigger(PurgingTrigger.of(CountOrTimeTrigger.of(partNum)));
>
>    combinedUniqNumStream
>            .fold(new ProductAggregator(),
>                    (FoldFunction<ProductAggregator, ProductAggregator>) 
> (accumulator, value) -> {
>                accumulator.value += value.value;
>
>                accumulator.summarize(value);
>
>                return accumulator;
>            })
>
>    But sometime I see an incorrect number of unique identifiers probably
>    because of skewing of the partial aggregates. This test generates not more
>    than 1000 identifiers. It is possible to see it when this test is ran after
>    preloading of messages to Kafka.
>
>
> PS: I found some information at http://data-artisans.com/
> kafka-flink-a-practical-how-to/ and https://www.elastic.co/blog/
> building-real-time-dashboard-applications-with-apache-
> flink-elasticsearch-and-kibana but unfortunately these articles doesn't
> answer how to build the specified schema.
>
>
> Cheers
>
>
> ​
>

Reply via email to