We use Flink to process transactional events. A job was created to aggregate information about the clients, day of week and hour of day and thus creating a profile as shown in the attached code.
val stream = env.addSource(consumer) val result = stream .map(openTransaction => { val transactionDate = openTransaction.get("transactionDate") val date = if (transactionDate.isTextual) LocalDateTime.parse(transactionDate.asText, DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC).toEpochMilli else transactionDate.asLong (openTransaction.get("clientId").asLong, openTransaction.get("amount").asDouble, new Timestamp(date)) }) .keyBy(0) .window(SlidingEventWeekTimeWindows.of(Time.days(28), Time.hours(1))) .sum(1) In the code above, the stream has three fields: "transactionDate", "clientId" and "amount". We make a keyed stream by the clientId and a sliding window summing the amount. There are around 100.000 unique active clientIds in our database. After some time running, the total RAM used by the job is stabilized at 36 GB, but the stored checkpoint in HDFS uses only 3 GB. Is there a way to reduce the RAM usage of the job, maybe by configuring Flink's replication factor or by using RocksDB? Best regards