Hi Darshan,

The join implementation in SQL / Table API does what is demanded by the SQL
semantics.
Hence, what results to emit and also what data to store (state) to compute
these results is pretty much given.
You can think of the semantics of the join as writing both streams into a
relational DBMS and executing the join on the DBMS.
The DBMS computes the join result on all data at once. In contrast, Flink
computes the results continuously whenever a new record arrives.

It might be that your join requirements do not match the semantics of a SQL
join.
In that case, you might be better off with a custom implementation based on
a ProcessFunction as Vino pointed out.

Btw. from your description, it looks like your use case could be addressed
by the time-versioned / enrichment join that is currently being developed
[1] for SQL / Table API.

Best,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9712

2018-07-24 8:17 GMT+02:00 vino yang <yanghua1...@gmail.com>:

> Hi Darshan,
>
> In your use case, I think you can implement the outer join with DataStream
> API ( use State + ProcessFunction + Timer ). Using suitable statue, you can
> store 1 value per key and do not need to keep all the value's history for
> every key.
>
> And you can refer to Flink's implementation of DataStream join[1].
>
> [1]: https://github.com/apache/flink/blob/master/
> flink-libraries/flink-table/src/main/scala/org/apache/
> flink/table/plan/nodes/datastream/DataStreamJoin.scala#L223
>
> Thanks, vino.
>
> 2018-07-24 1:28 GMT+08:00 Darshan Singh <darshan.m...@gmail.com>:
>
>> Hi
>>
>> I was looking at the new full outer join. This seems to be working fine
>> for my use case however I have a question regarding the state size.
>>
>> I have 2 streams each will have 100's of million unique keys. Also, Each
>> of these will get the updated value of keys 100's of times per day.
>>
>> As per my understanding in full outer join flink will keep all the values
>> of the keys which it has seen in the state and whenever a new value comes
>> from
>> 1 of the stream. It will be joined against all of the key values which
>> were there for 2nd stream.It could be 1 or 100's of rows. This seems
>> inefficient
>> but my question is more on the state side. Thus, I will need to keep
>> billion's of values in state on both side. This will be very expensive.
>>
>> It is a non windowed join. A key can recieve updates for 50-60 days and
>> after that it wont get any updates on any of the streams.
>>
>> Is there a way we could use a state such that only 1 value per key is
>> retained in the state to reduce the size of the state?
>>
>> I am using the Table API but could use the Datastream api if needed.
>>
>> Thanks
>>
>
>

Reply via email to