Hi,

We're trying to use Flink 1.11 Java tables API to process a streaming use
case:

We have 2 streams, each one with different structures. Both events,
coming from Kafka, can be:
- A new event (not in the system already)
- An updated event (updating an event that previously was inserted)
so we only want to store the latest data in the Table.

We need to join the 2 previous Tables to have all this data stored in the
Flink system. We think that the best way is to store joined data as a
Table.
This is going to be a Flink Table, that will be a join of the 2 tables by a
common key.

To sum up, we have:
- Stream 1 (coming from Kafka topic) -> Flink Table 1
- Stream 2 (coming from Kafka topic) -> Flink Table 2
- Table 3 = Table 1 join Table 2
- DataStream using RetractStream of Table 3

To get the last element in Table 1 and Table 2, we are using Functions
(LastValueAggFunction):

streamTableEnvironment.registerFunction("LAST_VALUE_STRING", new
LastValueAggFunction.StringLastValueAggFunction());
...
streamTableEnvironment.fromDataStream(inputDataStream)
        .groupBy($("id"))
        .select(
                $("id").as("o_id"),
                call("LAST_VALUE_STRING", $("title")).as("o_title"),
                call("LAST_VALUE_STRING", $("description")).as("o_description")
        );


The questions are:
- Is our approach correct to get the data stored in the Flink system?
- Is it necessary to use the *LastValueAggFunction *in our case ? as we
want to retract the stream to
out custom Pojo instead of *Row*, but we're getting the attached error:
(attached*: stack_trace.log*)


Abdelilah Choukdi,
Backend dev at ManoMano.

Attachment: stack_trace.log
Description: Binary data

Reply via email to