Hi,

Problem: Watermark does not move within Dynamic Alert Function

Implementing ideas (as is) from this article -
https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html
Code: https://github.com/afedulov/fraud-detection-demo
Pipeline: Kafka -> Dynamic Key Function -> Dynamic Alert Function -> Kafka
sink

Adapted code for Flink 1.14.3:

1. Init transaction source:
----------------------------------------------
KafkaSource<String> transactionSource =
TransactionsSource.createTransactionsSource(config);
int sourceParallelism = config.get(SOURCE_PARALLELISM);
WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofMillis(config.get(OUT_OF_ORDERNESS)))
.withTimestampAssigner((event, timestamp) -> System.currentTimeMillis());

DataStream<String> transactionsStringsStream = env
.fromSource(transactionSource, watermarkStrategy, "KafkaTransactions")
.name("Transactions Source").setParallelism(sourceParallelism);
DataStream<Transaction> transactionsStream = TransactionsSource
.stringsStreamToTransactions(transactionsStringsStream);

return transactionsStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<Transaction>forBoundedOutOfOrderness(Duration.ofMillis(config.get(OUT_OF_ORDERNESS)))
.withTimestampAssigner((event, timestamp) -> event.getEventTime()));

DataStream<Rule> rulesUpdateStream = getRulesUpdateStream(env);
BroadcastStream<Rule> rulesStream =
rulesUpdateStream.broadcast(Descriptors.rulesDescriptor);

// Processing pipeline setup
DataStream<Alert> alerts = transactions.connect(rulesStream).process(new
DynamicKeyFunction())
.uid("DynamicKeyFunction").name("Dynamic Partitioning
Function").keyBy((keyed) -> keyed.getKey())
.connect(rulesStream).process(new
DynamicAlertFunction()).uid("DynamicAlertFunction")
.name("Dynamic Rule Evaluation Function");

private DataStream<Rule> getRulesUpdateStream(StreamExecutionEnvironment
env) throws IOException {

RulesSource.Type rulesSourceEnumType = getRulesSourceType();

KafkaSource<String> rulesSource = RulesSource.createRulesSource(config);
DataStream<String> rulesStrings = env
.fromSource(rulesSource, WatermarkStrategy.noWatermarks(), "KafkaRules")
.name(rulesSourceEnumType.getName()).setParallelism(1);
return RulesSource.stringsStreamToRules(rulesStrings);
}
----------------------------------------------
Watermark stays at -9223372036854775808. DynamicAlertFunction onTimer()
does not fire. In Web UI, I see "No Watermark (Watermarks are only
available if EventTime is used)"

Please help.

Thanks

Reply via email to