We use Flink to process transactional events. A job was created to aggregate
information about the clients, day of week and hour of day and thus creating a
profile as shown in the attached code.
val stream = env.addSource(consumer)
val result = stream
.map(openTransaction => {
val transactionDate = openTransaction.get("transactionDate")
val date = if (transactionDate.isTextual)
LocalDateTime.parse(transactionDate.asText,
DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC).toEpochMilli
else
transactionDate.asLong
(openTransaction.get("clientId").asLong,
openTransaction.get("amount").asDouble, new Timestamp(date))
})
.keyBy(0)
.window(SlidingEventWeekTimeWindows.of(Time.days(28), Time.hours(1)))
.sum(1)
In the code above, the stream has three fields: "transactionDate", "clientId"
and "amount". We make a keyed stream by the clientId and a sliding window
summing the amount. There are around 100.000 unique active clientIds in our
database.
After some time running, the total RAM used by the job is stabilized at 36 GB,
but the stored checkpoint in HDFS uses only 3 GB. Is there a way to reduce the
RAM usage of the job, maybe by configuring Flink's replication factor or by
using RocksDB?
Best regards