HI Theo, You've been very helpful actually, thank you.
What mean by "tieing" DataSet and DataStream is that the documentation states that the DataSet can write a Save point, and the DataStream can read it, and another blog states that "You can create both Batch and Stream environment in a single job." I think it is not possible for my use case, as state in Data-Stream job is passed on through command line parameters. I did solve this problem though. You can read it at https://github.com/minmay/flink-patterns/tree/master/bootstrap-keyed-state-into-stream <https://github.com/minmay/flink-patterns/tree/master/bootstrap-keyed-state-into-stream> Again, thank you. > On Aug 10, 2020, at 12:02 PM, Theo Diefenthal > <theo.diefent...@scoop-software.de> wrote: > > Hi Marco, > > Sadly, I myself haven't ever used StateProcessorAPI either. > > I thought, the documentation here [1] is rather straight forward to be used, > but I never tried it myself. > > Also, I don' get what you mean with "tieing" DataSet and DataStream? For what > I understand, the StateProcessorAPI is internally upon DataSets at the > moment. So if you have a Streaming job (DataStream), you would run your > custom state migration program before even starting the streaming job using > the StateProcessor API and initialize your state as a DataSet. After all, > initializing a state doesn't require you to have an infinite job running, but > only a finite one (Thus batch/DataSet). Once that program finished execution, > you would submit your streaming job starting from the written savepoint. I > guess the API works with either HDFS or filesystem, but maybe someone who has > already used the API might shed some more light for us here. > > Best regards > Theo > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html > > <https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html> > > Von: "Marco Villalobos" <mvillalo...@kineteque.com> > An: "Theo Diefenthal" <theo.diefent...@scoop-software.de> > CC: "user" <user@flink.apache.org> > Gesendet: Donnerstag, 6. August 2020 23:47:13 > Betreff: Re: Two Queries and a Kafka Topic > > I am trying to use the State Processor API. Does that require HDFS or a > filesystem? > I wish there was a complete example that ties in both DataSet and DataStream > API, and the State Processor API. > > So far I have not been able to get it to work. > > Does anybody know where I can find examples of these type of techniques? > > > > On Wed, Aug 5, 2020 at 3:52 AM Theo Diefenthal > <theo.diefent...@scoop-software.de > <mailto:theo.diefent...@scoop-software.de>> wrote: > Hi Marco, > > In general, I see three solutions here you could approach: > > 1. Use the StateProcessorAPI: You can run a program with the > stateProcessorAPI that loads the data from JDBC and stores it into a Flink > SavePoint. Afterwards, you start your streaming job from that savepoint which > will load its state and within find all the data from JDBC stored already. > 2. Load from master, distribute with the job: When you build up your > jobgraph, you could execute the JDBC queries and put the result into some > Serializable class which in turn you plug in a an operator in your stream > (e.g. a map stage). The class along with all the queried data will be > serialized and deserialized on the taskmanagers (Usually, I use this for > configuration parameters, but it might be ok in this case as well) > 3. Load from TaskManager: In your map-function, if the very first event is > received, you can block processing and synchronously load the data from JDBC > (So each Taskmanager performs the JDBC query itself). You then keep the data > to be used for all subsequent map steps. > > I think, option 3 is the easiest to be implemented while option 1 might be > the most elegant way in my opinion. > > Best regards > Theo > > Von: "Marco Villalobos" <mvillalo...@kineteque.com > <mailto:mvillalo...@kineteque.com>> > An: "Leonard Xu" <xbjt...@gmail.com <mailto:xbjt...@gmail.com>> > CC: "user" <user@flink.apache.org <mailto:user@flink.apache.org>> > Gesendet: Mittwoch, 5. August 2020 04:33:23 > Betreff: Re: Two Queries and a Kafka Topic > > Hi Leonard, > > First, Thank you. > > I am currently trying to restrict my solution to Apache Flink 1.10 because > its the current version supported by Amazon EMR. > i am not ready to change our operational environment to solve this. > > Second, I am using the DataStream API. The Kafka Topic is not in a table, it > is in a DataStream. > > The SQL queries are literally from a PostgresSQL database, and only need to > be run exactly once in the lifetime of the job. > > I am struggling to determine where this happens. > > JDBCInputFormat seems to query the SQL table repetitively, and also > connecting streams and aggregating into one object is very complicated. > > Thus, I am wondering what is the right approach. > > Let me restate the parameters. > > SQL Query One = data in PostgreSQL (200K records) that is used for business > logic. > SQL Query Two = data in PostgreSQL (1000 records) that is used for business > logic. > Kafka Topic One = unlimited data-stream that uses the data-stream api and > queries above to write into multiple sinks > > Asci Diagram: > > [SQL Query One] ----> [Aggregate to Map] > > > Kafka > ----> [Kafka Topic One] --- [Keyed Process Function (Query One Map, Query > Two Map)] ---<[Multiple Sinks] > > [SQL Query Two] ---->[Aggregate to Map] > > > Maybe my graph above helps. You see, I need Query One and Query Two only > ever execute once. After that the information they provide are used to > correctly process the Kafka Topic. > > I'll take a deep further to try and understand what you said, thank you, but > JDBCInputFormat seem to repetitively query the database. Maybe I need to > write a RichFunction or AsyncIO function and cache the results in state after > that. > > > > On Aug 4, 2020, at 6:25 PM, Leonard Xu <xbjt...@gmail.com > <mailto:xbjt...@gmail.com>> wrote: > > Hi, Marco > > If I need SQL Query One and SQL Query Two to happen just one time, > > Looks like you want to reuse this kafka table in one job, It’s supported to > execute multiple query in one sql job in Flink 1.11. > You can use `StatementSet`[1] to add SQL Query one and SQL query Two in a > single SQL job[1]. > > > Best > Leonard > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement > > <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement> > > > 在 2020年8月5日,04:34,Marco Villalobos <mvillalo...@kineteque.com > <mailto:mvillalo...@kineteque.com>> 写道: > > Lets say that I have: > > SQL Query One from data in PostgreSQL (200K records). > SQL Query Two from data in PostgreSQL (1000 records). > and Kafka Topic One. > > Let's also say that main data from this Flink job arrives in Kafka Topic One. > > If I need SQL Query One and SQL Query Two to happen just one time, when the > job starts up, and afterwards maybe store it in Keyed State or Broadcast > State, but it's not really part of the stream, then what is the best practice > for supporting that in Flink > > The Flink job needs to stream data from Kafka Topic One, aggregate it, and > perform computations that require all of the data in SQL Query One and SQL > Query Two to perform its business logic. > > I am using Flink 1.10. > > I supposed to query the database before the Job I submitted, and then pass it > on as parameters to a function? > Or am I supposed to use JDBCInputFormat for both queries and create two > streams, and somehow connect or broadcast both of them two the main stream > that uses Kafka Topic One? > > I would appreciate guidance. Please. Thank you. > > Sincerely, > > Marco A. Villalobo