Hi Oscar,
could you provide the Java code to illustrate what you were doing?
The difference between version A and B might be especially helpful. I
assume you already looked into the FAQ about operator IDs [1]?

Adding the JM and TM logs might help as well to investigate the issue, as
Yu Chen mentioned.

Best,
Matthias

On Sun, Nov 26, 2023 at 2:18 PM Yu Chen <yuchen.e...@gmail.com> wrote:

> Hi Oscar,
>
> The Operator ID of the SQL job was generated by
> `StreamingJobGraphGenerator`, it was releated with the topology of the
> stream graph.
> If you would like to confirm that the problem was caused by the changes of
> opearator id or not, please remove --allowNonRestoredState, and you will
> get the exception of the failed restore operator id.
>
> However, the lost of the operator state would only produce some erroneous
> results and would not result in `not able to return any row`. It would be
> better to provide logs after restoring to locate a more specific problem.
>
> Best,
> Yu Chen
> ------------------------------
> *发件人:* Oscar Perez via user <user@flink.apache.org>
> *发送时间:* 2023年11月25日 0:08
> *收件人:* Oscar Perez via user <user@flink.apache.org>
> *主题:* Doubts about state and table API
>
> Hi,
>
> We are having a job in production where we use table API to join multiple
> topics. The query looks like this:
>
>
> SELECT *
> FROM topic1 AS t1
> JOIN topic2 AS t2 ON t1.userId = t2.userId
> JOIN topic3 AS t3 ON t1.userId = t3.accountUserId
>
>
> This works and produces an EnrichedActivity any time any of the topics
> receives a new event, which is what we expect. This SQL query is linked to
> a processor function and the processElement gets triggered whenever a new
> EnrichedActivity occurs
>
> We have experienced an issue a couple of times in production where we have
> deployed a new version from savepoint and then suddenly we
> stopped receiving EnrichedActivities in the process function.
>
> Our assumption is that this is related to the table API state and that
> some operators are lost from going from one savepoint to new deployment.
>
> Let me illustrate with one example:
>
> version A of the job is deployed
> version B of the job is deployed
>
> version B UID for some table api operators changes and this operator is
> removed when deploying version B as it is unable to be mapped (we have the
> --allowNonRestoredState enabled)
>
> The state for the table api stores bot the committed offset and the
> contents of the topic but just the contents are lost and the committed
> offset is still in the offset
>
> Therefore, when doing the join of the query, it is not able to return any
> row as it is unable to get data from topic2 or topic 3.
>
> Can this be the case?
> We are having a hard time trying to understand how the table api and state
> works internally so any help in this regard would be truly helpful!
>
> Thanks,
> Oscar
>
>
>

Reply via email to