We use Flink to process transactional events. A job was created to aggregate 
information about the clients, day of week and hour of day and thus creating a 
profile as shown in the attached code.


val stream = env.addSource(consumer)
val result = stream
  .map(openTransaction => {
    val transactionDate = openTransaction.get("transactionDate")
    val date = if (transactionDate.isTextual)
      LocalDateTime.parse(transactionDate.asText, 
DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC).toEpochMilli
    else
      transactionDate.asLong
    (openTransaction.get("clientId").asLong, 
openTransaction.get("amount").asDouble, new Timestamp(date))
  })
  .keyBy(0)
  .window(SlidingEventWeekTimeWindows.of(Time.days(28), Time.hours(1)))
  .sum(1)

In the code above, the stream has three fields: "transactionDate", "clientId" 
and "amount". We make a keyed stream by the clientId and a sliding window 
summing the amount. There are around 100.000 unique active clientIds in our 
database.

After some time running, the total RAM used by the job is stabilized at 36 GB, 
but the stored checkpoint in HDFS uses only 3 GB. Is there a way to reduce the 
RAM usage of the job, maybe by configuring Flink's replication factor or by 
using RocksDB?


Best regards





Reply via email to