Agree with Bowen on this note: you should probably use some more efficient way of handling the data in sliding window, since data will be "assigned" to each sliding window through a window assigner and thus costs extra memory usage.
BTW: since we are on this topic, I was wondering if there's any way of improving the memory efficiency in dealing elements that belongs to overlapping windows. -- Rong On Thu, May 3, 2018 at 9:40 PM, Bowen Li <bowenl...@gmail.com> wrote: > Hi Gabriel, > > Yes, using RocksDB state backend can relieve your RAM usage. I see a few > issues with your job: 1) it's keeping track of 672 windows (28x24), that's > lots of data, so try to reduce number of windows 2) use reduce functions to > incrementally aggregate state, rather than buffering data internally > > BTW, this kind of questions should be posted to *user@flink alias* rather > than dev@flink. > > Bowen > > > > On Wed, May 2, 2018 at 2:20 PM, Gabriel Pelielo < > gabrielpeli...@hotmail.com> > wrote: > > > We use Flink to process transactional events. A job was created to > > aggregate information about the clients, day of week and hour of day and > > thus creating a profile as shown in the attached code. > > > > > > val stream = env.addSource(consumer) > > val result = stream > > .map(openTransaction => { > > val transactionDate = openTransaction.get("transactionDate") > > val date = if (transactionDate.isTextual) > > LocalDateTime.parse(transactionDate.asText, > > DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC).toEpochMilli > > else > > transactionDate.asLong > > (openTransaction.get("clientId").asLong, > > openTransaction.get("amount").asDouble, new Timestamp(date)) > > }) > > .keyBy(0) > > .window(SlidingEventWeekTimeWindows.of(Time.days(28), Time.hours(1))) > > .sum(1) > > > > In the code above, the stream has three fields: "transactionDate", > > "clientId" and "amount". We make a keyed stream by the clientId and a > > sliding window summing the amount. There are around 100.000 unique active > > clientIds in our database. > > > > After some time running, the total RAM used by the job is stabilized at > 36 > > GB, but the stored checkpoint in HDFS uses only 3 GB. Is there a way to > > reduce the RAM usage of the job, maybe by configuring Flink's replication > > factor or by using RocksDB? > > > > > > Best regards > > > > > > > > > > > > >