for each operator that needs the *data** 🤦♂️ * On Mon, Aug 10, 2020 at 3:58 PM Austin Cawley-Edwards < austin.caw...@gmail.com> wrote:
> Hey all, > > A bit late here and I’m not sure it’s totally valuable, but we have a > similar job where we need to query an external data source on startup > before processing the main stream as well. > > Instead of making that request in the Jobmanager process when building the > graph, we make those requests from the operator “open()” methods, and then > store it in broadcast state. > > Our queries aren’t that expensive to run, so we run multiple on startup > for each operator that needs the fat to avoid a race condition. The > blocking until the broadcast state is received downstream sounds like a > reasonable way to do it to. > > Hope that helps a bit, or at least as another example! > > Best, > Austin > > On Mon, Aug 10, 2020 at 3:03 PM Theo Diefenthal < > theo.diefent...@scoop-software.de> wrote: > >> Hi Marco, >> >> Sadly, I myself haven't ever used StateProcessorAPI either. >> >> I thought, the documentation here [1] is rather straight forward to be >> used, but I never tried it myself. >> >> Also, I don' get what you mean with "tieing" DataSet and DataStream? For >> what I understand, the StateProcessorAPI is internally upon DataSets at the >> moment. So if you have a Streaming job (DataStream), you would run your >> custom state migration program before even starting the streaming job using >> the StateProcessor API and initialize your state as a DataSet. After all, >> initializing a state doesn't require you to have an infinite job running, >> but only a finite one (Thus batch/DataSet). Once that program finished >> execution, you would submit your streaming job starting from the written >> savepoint. I guess the API works with either HDFS or filesystem, but maybe >> someone who has already used the API might shed some more light for us >> here. >> >> Best regards >> Theo >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html >> >> ------------------------------ >> *Von: *"Marco Villalobos" <mvillalo...@kineteque.com> >> *An: *"Theo Diefenthal" <theo.diefent...@scoop-software.de> >> *CC: *"user" <user@flink.apache.org> >> *Gesendet: *Donnerstag, 6. August 2020 23:47:13 >> >> *Betreff: *Re: Two Queries and a Kafka Topic >> >> I am trying to use the State Processor API. Does that require HDFS or a >> filesystem? >> I wish there was a complete example that ties in both DataSet and >> DataStream API, and the State Processor API. >> >> So far I have not been able to get it to work. >> >> Does anybody know where I can find examples of these type of techniques? >> >> >> >> On Wed, Aug 5, 2020 at 3:52 AM Theo Diefenthal < >> theo.diefent...@scoop-software.de> wrote: >> >>> Hi Marco, >>> >>> In general, I see three solutions here you could approach: >>> >>> 1. Use the StateProcessorAPI: You can run a program with the >>> stateProcessorAPI that loads the data from JDBC and stores it into a Flink >>> SavePoint. Afterwards, you start your streaming job from that savepoint >>> which will load its state and within find all the data from JDBC stored >>> already. >>> 2. Load from master, distribute with the job: When you build up your >>> jobgraph, you could execute the JDBC queries and put the result into some >>> Serializable class which in turn you plug in a an operator in your stream >>> (e.g. a map stage). The class along with all the queried data will be >>> serialized and deserialized on the taskmanagers (Usually, I use this for >>> configuration parameters, but it might be ok in this case as well) >>> 3. Load from TaskManager: In your map-function, if the very first event >>> is received, you can block processing and synchronously load the data from >>> JDBC (So each Taskmanager performs the JDBC query itself). You then keep >>> the data to be used for all subsequent map steps. >>> >>> I think, option 3 is the easiest to be implemented while option 1 might >>> be the most elegant way in my opinion. >>> >>> Best regards >>> Theo >>> >>> ------------------------------ >>> *Von: *"Marco Villalobos" <mvillalo...@kineteque.com> >>> *An: *"Leonard Xu" <xbjt...@gmail.com> >>> *CC: *"user" <user@flink.apache.org> >>> *Gesendet: *Mittwoch, 5. August 2020 04:33:23 >>> *Betreff: *Re: Two Queries and a Kafka Topic >>> >>> Hi Leonard, >>> >>> First, Thank you. >>> >>> I am currently trying to restrict my solution to Apache Flink 1.10 >>> because its the current version supported by Amazon EMR. >>> i am not ready to change our operational environment to solve this. >>> >>> Second, I am using the DataStream API. The Kafka Topic is not in a >>> table, it is in a DataStream. >>> >>> The SQL queries are literally from a PostgresSQL database, and only need >>> to be run exactly once in the lifetime of the job. >>> >>> I am struggling to determine where this happens. >>> >>> JDBCInputFormat seems to query the SQL table repetitively, and also >>> connecting streams and aggregating into one object is very complicated. >>> >>> Thus, I am wondering what is the right approach. >>> >>> Let me restate the parameters. >>> >>> SQL Query One = data in PostgreSQL (200K records) that is used for >>> business logic. >>> SQL Query Two = data in PostgreSQL (1000 records) that is used for >>> business logic. >>> Kafka Topic One = unlimited data-stream that uses the data-stream api >>> and queries above to write into multiple sinks >>> >>> Asci Diagram: >>> >>> [SQL Query One] ----> [Aggregate to Map] >>> >>> Kafka ----> [Kafka Topic One] --- [Keyed Process Function (Query One >>> Map, Query Two Map)] ---<[Multiple Sinks] >>> >>> [SQL Query Two] ---->[Aggregate to Map] >>> >>> >>> Maybe my graph above helps. You see, I need Query One and Query Two >>> only ever execute once. After that the information they provide are used >>> to correctly process the Kafka Topic. >>> >>> I'll take a deep further to try and understand what you said, thank you, >>> but JDBCInputFormat seem to repetitively query the database. Maybe I need >>> to write a RichFunction or AsyncIO function and cache the results in state >>> after that. >>> >>> >>> >>> On Aug 4, 2020, at 6:25 PM, Leonard Xu <xbjt...@gmail.com> wrote: >>> >>> Hi, Marco >>> >>> If I need SQL Query One and SQL Query Two to happen just one time, >>> >>> >>> Looks like you want to reuse this kafka table in one job, It’s supported >>> to execute multiple query in one sql job in Flink 1.11. >>> You can use `StatementSet`[1] to add SQL Query one and SQL query Two in >>> a single SQL job[1]. >>> >>> >>> Best >>> Leonard >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement >>> >>> >>> 在 2020年8月5日,04:34,Marco Villalobos <mvillalo...@kineteque.com> 写道: >>> >>> Lets say that I have: >>> >>> SQL Query One from data in PostgreSQL (200K records). >>> SQL Query Two from data in PostgreSQL (1000 records). >>> and Kafka Topic One. >>> >>> Let's also say that main data from this Flink job arrives in Kafka Topic >>> One. >>> >>> If I need SQL Query One and SQL Query Two to happen just one time, when >>> the job starts up, and afterwards maybe store it in Keyed State or >>> Broadcast State, but it's not really part of the stream, then what is the >>> best practice for supporting that in Flink >>> >>> The Flink job needs to stream data from Kafka Topic One, aggregate it, >>> and perform computations that require all of the data in SQL Query One and >>> SQL Query Two to perform its business logic. >>> >>> I am using Flink 1.10. >>> >>> I supposed to query the database before the Job I submitted, and then >>> pass it on as parameters to a function? >>> Or am I supposed to use JDBCInputFormat for both queries and create two >>> streams, and somehow connect or broadcast both of them two the main stream >>> that uses Kafka Topic One? >>> >>> I would appreciate guidance. Please. Thank you. >>> >>> Sincerely, >>> >>> Marco A. Villalobo >>> >>>