Agree with Bowen on this note: you should probably use some more efficient
way of handling the data in sliding window, since data will be "assigned"
to each sliding window through a window assigner and thus costs extra
memory usage.

BTW: since we are on this topic, I was wondering if there's any way of
improving the memory efficiency in dealing elements that belongs to
overlapping windows.

--
Rong

On Thu, May 3, 2018 at 9:40 PM, Bowen Li <bowenl...@gmail.com> wrote:

> Hi Gabriel,
>
> Yes, using RocksDB state backend can relieve your RAM usage. I see a few
> issues with your job: 1) it's keeping track of 672 windows (28x24), that's
> lots of data, so try to reduce number of windows 2) use reduce functions to
> incrementally aggregate state, rather than buffering data internally
>
> BTW, this kind of questions should be posted to *user@flink alias* rather
> than dev@flink.
>
> Bowen
>
>
>
> On Wed, May 2, 2018 at 2:20 PM, Gabriel Pelielo <
> gabrielpeli...@hotmail.com>
> wrote:
>
> > We use Flink to process transactional events. A job was created to
> > aggregate information about the clients, day of week and hour of day and
> > thus creating a profile as shown in the attached code.
> >
> >
> > val stream = env.addSource(consumer)
> > val result = stream
> >   .map(openTransaction => {
> >     val transactionDate = openTransaction.get("transactionDate")
> >     val date = if (transactionDate.isTextual)
> >       LocalDateTime.parse(transactionDate.asText,
> > DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC).toEpochMilli
> >     else
> >       transactionDate.asLong
> >     (openTransaction.get("clientId").asLong,
> > openTransaction.get("amount").asDouble, new Timestamp(date))
> >   })
> >   .keyBy(0)
> >   .window(SlidingEventWeekTimeWindows.of(Time.days(28), Time.hours(1)))
> >   .sum(1)
> >
> > In the code above, the stream has three fields: "transactionDate",
> > "clientId" and "amount". We make a keyed stream by the clientId and a
> > sliding window summing the amount. There are around 100.000 unique active
> > clientIds in our database.
> >
> > After some time running, the total RAM used by the job is stabilized at
> 36
> > GB, but the stored checkpoint in HDFS uses only 3 GB. Is there a way to
> > reduce the RAM usage of the job, maybe by configuring Flink's replication
> > factor or by using RocksDB?
> >
> >
> > Best regards
> >
> >
> >
> >
> >
> >
>

Reply via email to