HI Theo,

You've been very helpful actually, thank you.

What  mean by "tieing" DataSet and DataStream is that the documentation states 
that the DataSet can write a Save point, and the DataStream can read it, and 
another blog states that "You can create both Batch and Stream environment in a 
single job."

I think it is not possible for my use case, as state in Data-Stream job is 
passed on through command line parameters.

I did solve this problem though.  You can read it at 
https://github.com/minmay/flink-patterns/tree/master/bootstrap-keyed-state-into-stream
 
<https://github.com/minmay/flink-patterns/tree/master/bootstrap-keyed-state-into-stream>

Again, thank you.

> On Aug 10, 2020, at 12:02 PM, Theo Diefenthal 
> <theo.diefent...@scoop-software.de> wrote:
> 
> Hi Marco,
> 
> Sadly, I myself haven't ever used StateProcessorAPI either. 
> 
> I thought, the documentation here [1] is rather straight forward to be used, 
> but I never tried it myself.
> 
> Also, I don' get what you mean with "tieing" DataSet and DataStream? For what 
> I understand, the StateProcessorAPI is internally upon DataSets at the 
> moment. So if you have a Streaming job (DataStream), you would run your 
> custom state migration program before even starting the streaming job using 
> the StateProcessor API and initialize your state as a  DataSet. After all, 
> initializing a state doesn't require you to have an infinite job running, but 
> only a finite one (Thus batch/DataSet). Once that program finished execution, 
> you would submit your streaming job starting from the written savepoint. I 
> guess the API works with either HDFS or filesystem, but maybe someone who has 
> already used the API might shed some more light for us here. 
> 
> Best regards
> Theo
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html>
> 
> Von: "Marco Villalobos" <mvillalo...@kineteque.com>
> An: "Theo Diefenthal" <theo.diefent...@scoop-software.de>
> CC: "user" <user@flink.apache.org>
> Gesendet: Donnerstag, 6. August 2020 23:47:13
> Betreff: Re: Two Queries and a Kafka Topic
> 
> I am trying to use the State Processor API.  Does that require HDFS or a 
> filesystem?
> I wish there was a complete example that ties in both DataSet and DataStream 
> API, and the State Processor API.
> 
> So far I have not been able to get it to work.
> 
> Does anybody know where I can find examples of these type of techniques?
> 
> 
> 
> On Wed, Aug 5, 2020 at 3:52 AM Theo Diefenthal 
> <theo.diefent...@scoop-software.de 
> <mailto:theo.diefent...@scoop-software.de>> wrote:
> Hi Marco,
> 
> In general, I see three solutions here you could approach: 
> 
> 1. Use the StateProcessorAPI: You can run a program with the 
> stateProcessorAPI that loads the data from JDBC and stores it into a Flink 
> SavePoint. Afterwards, you start your streaming job from that savepoint which 
> will load its state and within find all the data from JDBC stored already. 
> 2. Load from master, distribute with the job: When you build up your 
> jobgraph, you could execute the JDBC queries and put the result into some 
> Serializable class which in turn you plug in a an operator in your stream 
> (e.g. a map stage). The class along with all the queried data will be 
> serialized and deserialized on the taskmanagers (Usually, I use this for 
> configuration parameters, but it might be ok in this case as well)
> 3. Load from TaskManager: In your map-function, if the very first event is 
> received, you can block processing and synchronously load the data from JDBC 
> (So each Taskmanager performs the JDBC query itself). You then keep the data 
> to be used for all subsequent map steps. 
> 
> I think, option 3 is the easiest to be implemented while option 1 might be 
> the most elegant way in my opinion. 
> 
> Best regards
> Theo
> 
> Von: "Marco Villalobos" <mvillalo...@kineteque.com 
> <mailto:mvillalo...@kineteque.com>>
> An: "Leonard Xu" <xbjt...@gmail.com <mailto:xbjt...@gmail.com>>
> CC: "user" <user@flink.apache.org <mailto:user@flink.apache.org>>
> Gesendet: Mittwoch, 5. August 2020 04:33:23
> Betreff: Re: Two Queries and a Kafka Topic
> 
> Hi Leonard,
> 
> First, Thank you.
> 
> I am currently trying to restrict my solution to Apache Flink 1.10 because 
> its the current version supported by Amazon EMR.
> i am not ready to change our operational environment to solve this.
> 
> Second, I am using the DataStream API.  The Kafka Topic is not in a table, it 
> is in a DataStream.
> 
> The SQL queries are literally from a PostgresSQL database, and only need to 
> be run exactly once in the lifetime of the job.
> 
> I am struggling to determine where this happens.
> 
> JDBCInputFormat seems to query the SQL table repetitively, and also 
> connecting streams and aggregating into one object is very complicated.
> 
> Thus, I am wondering what is the right approach.  
> 
> Let me restate the parameters.
> 
> SQL Query One = data in PostgreSQL (200K records) that is used for business 
> logic.
> SQL Query Two = data in PostgreSQL (1000 records) that is used for business 
> logic.
> Kafka Topic One = unlimited data-stream that uses the data-stream api and 
> queries above to write into multiple sinks
> 
> Asci Diagram:
> 
> [SQL Query One] ----> [Aggregate to Map]                                      
>                                                 
> 
>                                                                       Kafka 
> ----> [Kafka Topic One]  --- [Keyed Process Function (Query One Map, Query 
> Two Map)] ---<[Multiple Sinks] 
> 
> [SQL Query Two] ---->[Aggregate to Map]
> 
> 
> Maybe my graph above helps.  You see, I need Query One and Query Two only 
> ever execute once.  After that the information they provide are used to 
> correctly process the Kafka Topic.
> 
> I'll take a deep further to try and understand what you said, thank you, but 
> JDBCInputFormat seem to repetitively query the database.  Maybe I need to 
> write a RichFunction or AsyncIO function and cache the results in state after 
> that.
> 
> 
> 
> On Aug 4, 2020, at 6:25 PM, Leonard Xu <xbjt...@gmail.com 
> <mailto:xbjt...@gmail.com>> wrote:
> 
> Hi, Marco
> 
> If I need SQL Query One and SQL Query Two to happen just one time,
> 
> Looks like you want to reuse this kafka table in one job, It’s supported to 
> execute multiple query in one sql job in Flink 1.11. 
> You can use `StatementSet`[1] to add SQL Query one and SQL query Two in a 
> single SQL job[1].
> 
> 
> Best
> Leonard 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement>
> 
> 
> 在 2020年8月5日,04:34,Marco Villalobos <mvillalo...@kineteque.com 
> <mailto:mvillalo...@kineteque.com>> 写道:
> 
> Lets say that I have:
> 
> SQL Query One from data in PostgreSQL (200K records).
> SQL Query Two from data in PostgreSQL (1000 records).
> and Kafka Topic One.
> 
> Let's also say that main data from this Flink job arrives in Kafka Topic One.
> 
> If I need SQL Query One and SQL Query Two to happen just one time, when the 
> job starts up, and afterwards maybe store it in Keyed State or Broadcast 
> State, but it's not really part of the stream, then what is the best practice 
> for supporting that in Flink
> 
> The Flink job needs to stream data from Kafka Topic One, aggregate it, and 
> perform computations that require all of the data in SQL Query One and SQL 
> Query Two to perform its business logic.
> 
> I am using Flink 1.10.
> 
> I supposed to query the database before the Job I submitted, and then pass it 
> on as parameters to a function?
> Or am I supposed to use JDBCInputFormat for both queries and create two 
> streams, and somehow connect or broadcast both of them two the main stream 
> that uses Kafka Topic One?
> 
> I would appreciate guidance. Please.  Thank you.
> 
> Sincerely,
> 
> Marco A. Villalobo

Reply via email to