Hi Leonard,

First, Thank you.

I am currently trying to restrict my solution to Apache Flink 1.10 because its 
the current version supported by Amazon EMR.
i am not ready to change our operational environment to solve this.

Second, I am using the DataStream API.  The Kafka Topic is not in a table, it 
is in a DataStream.

The SQL queries are literally from a PostgresSQL database, and only need to be 
run exactly once in the lifetime of the job.

I am struggling to determine where this happens.

JDBCInputFormat seems to query the SQL table repetitively, and also connecting 
streams and aggregating into one object is very complicated.

Thus, I am wondering what is the right approach.  

Let me restate the parameters.

SQL Query One = data in PostgreSQL (200K records) that is used for business 
logic.
SQL Query Two = data in PostgreSQL (1000 records) that is used for business 
logic.
Kafka Topic One = unlimited data-stream that uses the data-stream api and 
queries above to write into multiple sinks

Asci Diagram:

[SQL Query One] ----> [Aggregate to Map]                                        
                                                

                                                                        Kafka 
----> [Kafka Topic One]  --- [Keyed Process Function (Query One Map, Query Two 
Map)] ---<[Multiple Sinks] 

[SQL Query Two] ---->[Aggregate to Map]


Maybe my graph above helps.  You see, I need Query One and Query Two only ever 
execute once.  After that the information they provide are used to correctly 
process the Kafka Topic.

I'll take a deep further to try and understand what you said, thank you, but 
JDBCInputFormat seem to repetitively query the database.  Maybe I need to write 
a RichFunction or AsyncIO function and cache the results in state after that.



> On Aug 4, 2020, at 6:25 PM, Leonard Xu <xbjt...@gmail.com> wrote:
> 
> Hi, Marco
> 
>> If I need SQL Query One and SQL Query Two to happen just one time,
> 
> Looks like you want to reuse this kafka table in one job, It’s supported to 
> execute multiple query in one sql job in Flink 1.11. 
> You can use `StatementSet`[1] to add SQL Query one and SQL query Two in a 
> single SQL job[1].
> 
> 
> Best
> Leonard 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement>
> 
> 
>> 在 2020年8月5日,04:34,Marco Villalobos <mvillalo...@kineteque.com 
>> <mailto:mvillalo...@kineteque.com>> 写道:
>> 
>> Lets say that I have:
>> 
>> SQL Query One from data in PostgreSQL (200K records).
>> SQL Query Two from data in PostgreSQL (1000 records).
>> and Kafka Topic One.
>> 
>> Let's also say that main data from this Flink job arrives in Kafka Topic One.
>> 
>> If I need SQL Query One and SQL Query Two to happen just one time, when the 
>> job starts up, and afterwards maybe store it in Keyed State or Broadcast 
>> State, but it's not really part of the stream, then what is the best 
>> practice for supporting that in Flink
>> 
>> The Flink job needs to stream data from Kafka Topic One, aggregate it, and 
>> perform computations that require all of the data in SQL Query One and SQL 
>> Query Two to perform its business logic.
>> 
>> I am using Flink 1.10.
>> 
>> I supposed to query the database before the Job I submitted, and then pass 
>> it on as parameters to a function?
>> Or am I supposed to use JDBCInputFormat for both queries and create two 
>> streams, and somehow connect or broadcast both of them two the main stream 
>> that uses Kafka Topic One?
>> 
>> I would appreciate guidance. Please.  Thank you.
>> 
>> Sincerely,
>> 
>> Marco A. Villalobos
>> 
>> 
>> 
> 

Reply via email to