I don't want to speak for Apache Flink - I'm using it via Apache Beam
only - but generally speaking, each key will have to be held in state up
to some moment when it can be garbage collected. This moment is defined
(at least in the Apache Beam case) as the timestamp of end of window +
allowed lateness. So, in the case of global window, it is (practically)
forever in future, yes.
You can clean the state manually, though. If you would use the UUID (or
similar) approach, then you would set a timer for the 15 minutes
(relative) interval and then after you emit the data, you can clear the
timer and the value state, which should clear the complete state of the
window (please someone correct me if I'm wrong).
Alternative approach would be to use session windows and a
GroupByKey-like operation, which would hold and emit element at the end
of the session, which is exactly what you need. The state of the session
window will be cleared in this case as well.
Jan
On 7/19/21 2:00 PM, Dario Heinisch wrote:
Hey Jan,
No it isn't a logical constraint. Reason is there are different kind
of users, some who pay for live data while other want a cheaper
version but where the data is delayed.
But what happens if I add a random key ( lets say a uuid ) isn't that
bad for performance? Then for every Object that is being processed I
would have a state which is only being used once but I assume Flink
wouldn't clean that state up, wouldn't it? What happens to the
ValueState? Is that still being kept in memory? Because I thought that
for every key Flink encounters it would keep a state.
But I think this could be solved with a TTL:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/#state-time-to-live-ttl,
guess I will test that at some point this week! :)
For reference, this would be the code:
[...]
.keyBy(t -> UUID.randomUUID())
.process(new DelayedProcessor<>(NAME, CLAZZ))
public abstract class Timestamper { public abstract long executedAt(); }
public class DelayedProcessor<T extends Timestamper> extends
KeyedProcessFunction<UUID, T, T> implements ResultTypeQueryable<T> {
private final String stateName;
private final Class<T> clazz;
private ValueState<T> state;
private static long TIMEOUT = TimeUnit.MINUTES.toMillis(15);
public DelayedProcessor(String stateName, Class<T> clazz) {
this.stateName = stateName;
this.clazz = clazz;
}
@Override
public void open(Configuration parameters) {
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(15))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
.build();
ValueStateDescriptor<T> desc = new
ValueStateDescriptor<>(stateName, clazz);
desc.enableTimeToLive(ttlConfig);
state = getRuntimeContext().getState(desc);
}
@Override
public void processElement(T t, Context ctx, Collector<T>
collector) throws Exception {
this.state.update(t);
long now = System.currentTimeMillis();
long timeout = (now + TIMEOUT) - t.executedAt();
ctx.timerService().registerEventTimeTimer(timeout);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx,
Collector<T> out) throws Exception {
out.collect(this.state.value());
}
@Override
public TypeInformation<T> getProducedType() {
return TypeInformation.of(clazz);
}
}
Best regards,
Dario
On 18.07.21 19:12, Jan Lukavský wrote:
Hi Dario,
out of curiosity, could you briefly describe the driving use-case?
What is the (logical) constraint, that drives the requirement? I'd
guess, that it could be related to waiting for some (external)
condition? Or maybe related to late data? I think that there might be
better approaches, than (unconditionally) delay data in pipeline. On
the other hand, if that is really the best approach, then adding a
random key to create a keyed stream should work in all cases, right?
Jan
On 7/18/21 3:52 PM, Dario Heinisch wrote:
Hey Kiran,
Yeah was thinking of another solution, so I have one posgresql sink
& one kafka sink.
So I can just process the data in real time and insert them in the
DB. Then I would just select the latest row where created_at >=
NOW() - interval '15 minutes' and for any kafka consumer I would
just do:
let msg = get_next_kafka_msg();
let diff = created_at + 15min - now();
if diff > 0 {
sleep(diff)
}
// do something
// ....
kafka_commit();
And then run some cron job to delete obsolete rows from the db which
are not required anymore.
Best regards
Dario
On 18.07.21 15:29, Kiran Japannavar wrote:
Hi Dario,
Did you explore other options? If your use case (apart from
delaying sink writes) can be solved via spark streaming. Then maybe
spark streaming with a micro-batch of 15 mins would help.
On Sat, Jul 17, 2021 at 10:17 PM Dario Heinisch
<dario.heini...@gmail.com <mailto:dario.heini...@gmail.com>> wrote:
Hey there,
Hope all is well!
I would like to delay the time by 15minutes before my data
arrives at my
sinks:
stream()
.map()
[....]
.<DELAY_DATA_FOR_15_MINUTES>
.print()
I tried implementing my own ProcessFunction where TimeStamper is a
custom Interface:
public abstract class Timestamper {
public abstract long executedAt();
}
public class DelayedProcessor<T extends Timestamper> extends
ProcessFunction<T, T> {
private final String stateName;
private final Class<T> clazz;
// TODO: Should we do ListState as this is being preferred
for
serialization
// or should we do Value<Queue> but this may impact
serialization.
private ListState<T> state;
private static long TIMEOUT = TimeUnit.MINUTES.toMillis(15);
public DelayedProcessor(String stateName, Class<T> clazz) {
this.stateName = stateName;
this.clazz = clazz;
}
@Override
public void open(Configuration parameters) {
state = getRuntimeContext().getListState(new
ListStateDescriptor<>(stateName, clazz));
}
@Override
public void processElement(T t, Context ctx, Collector<T>
collector) throws Exception {
this.state.add(t);
ctx.timerService().registerEventTimeTimer(ctx.timestamp() +
TIMEOUT);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx,
Collector<T> out) throws Exception {
List<T> list = new ArrayList<>();
this.state.get().forEach(list::add);
val now = System.currentTimeMillis();
list = list.stream().filter(v -> {
if (v.executedAt() + TIMEOUT <= now) {
out.collect(v);
return false;
}
return true;
}).collect(Collectors.toList());
this.state.update(list);
}
}
Unfortunately, this can only used on a keyed stream which may
not always
be the case for me.
One possible solution would be to use:
.windowAll(SlidingEventTimeWindows.of(Time.minutes(15),
Time.seconds(1)))
and then always just take the value with the lowest timestamp
but this
seems very bad performance wise and the state would be very large.
Does anyone has a solution for me or can point me in the right
direction?
Best regards,
Dario