Thanks for the response. But, no, that's not the scenario. I'm familiar
with later arriving data. In my case, though, the record on table 2 never
arrives.
I'm not positive, but it may only be happening when there are three or more
tables being joined in one statement.

On Sun, Nov 14, 2021, 7:32 PM Caizhi Weng <tsreape...@gmail.com> wrote:

> Hi!
>
> sometimes it produces a result, and sometimes it doesn't
>
>
> Do you mean this scenario: Consider a record from table1 with a.bId = 1.
> You know that in table2 there is a record with b.Id = 1, but the join
> result is null at the beginning, and then it is updated with the correct
> result?
>
> This is the property of streaming joins. If the records with a.bId = 1
> arrives earlier than the records with b.Id = 1, the join operator will
> first output a null result (because currently there is nothing to join) and
> remember that record in state. When later the record b.Id = 1 arrives the
> operator will look into the state and update the results. So the results
> are eventually correct.
>
> I thought there might be a way to specify the partitioning.
>
>
> There is no need to specify partitioning for joins. Flink's SQL planner
> does this for you. If you look into the web UI you'll see an arrow marked
> with HASH pointing from sources to join operators. It means that records
> flowing through this arrow will be distributed to the corresponding
> parallelism according to the hash values of their join keys.
>
> Curt Buechter <tricksho...@gmail.com> 于2021年11月14日周日 下午11:52写道:
>
>> Hi,
>> I'd like to understand a little more how joins work. I have a fairly
>> simple LEFT JOIN query, and I'm seeing spotty results on the joins. I know
>> there is a record on the right side, but sometimes it produces a result,
>> and sometimes it doesn't.
>>
>> Sample query:
>> SELECT a.id, b.val1, c.val2
>> FROM table1 a
>> LEFT JOIN table2 b ON a.bId = b.Id
>> LEFT JOIN table3 c ON a.cId = c.Id
>>
>> I'm using Flink 1.13.2. All tables are loaded from kafka. Using the
>> Table/SQL API.
>>
>> My suspicion is that the distributed nature of the join causes the
>> problem. If I reduce the parallelism to match the number of slots,
>> resulting in a single task manager, the joins always (I think) seem to
>> work. So, my assumption is that records in table3 that should join to
>> table1 are not present on the same task manager, so the join produces null
>> values for table3.
>>
>> I thought there might be a way to specify the partitioning. If the tables
>> are from a multi-tenant database, I could specify the tenant-id as the
>> partition key, but the Flink SQL "PARTITION BY" statement doesn't seem to
>> work that way.
>>
>> Is there a way to confirm this? Does anyone know if this is a known
>> problem, and is there a solution?
>>
>> Thanks,
>> Curt
>>
>

Reply via email to