Hi, We're trying to use Flink 1.11 Java tables API to process a streaming use case:
We have 2 streams, each one with different structures. Both events, coming from Kafka, can be: - A new event (not in the system already) - An updated event (updating an event that previously was inserted) so we only want to store the latest data in the Table. We need to join the 2 previous Tables to have all this data stored in the Flink system. We think that the best way is to store joined data as a Table. This is going to be a Flink Table, that will be a join of the 2 tables by a common key. To sum up, we have: - Stream 1 (coming from Kafka topic) -> Flink Table 1 - Stream 2 (coming from Kafka topic) -> Flink Table 2 - Table 3 = Table 1 join Table 2 - DataStream using RetractStream of Table 3 To get the last element in Table 1 and Table 2, we are using Functions (LastValueAggFunction): streamTableEnvironment.registerFunction("LAST_VALUE_STRING", new LastValueAggFunction.StringLastValueAggFunction()); ... streamTableEnvironment.fromDataStream(inputDataStream) .groupBy($("id")) .select( $("id").as("o_id"), call("LAST_VALUE_STRING", $("title")).as("o_title"), call("LAST_VALUE_STRING", $("description")).as("o_description") ); The questions are: - Is our approach correct to get the data stored in the Flink system? - Is it necessary to use the *LastValueAggFunction *in our case ? as we want to retract the stream to out custom Pojo instead of *Row*, but we're getting the attached error: (attached*: stack_trace.log*) Abdelilah Choukdi, Backend dev at ManoMano.
stack_trace.log
Description: Binary data