for each operator that needs the *data** 🤦‍♂️ *

On Mon, Aug 10, 2020 at 3:58 PM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Hey all,
>
> A bit late here and I’m not sure it’s totally valuable, but we have a
> similar job where we need to query an external data source on startup
> before processing the main stream as well.
>
> Instead of making that request in the Jobmanager process when building the
> graph, we make those requests from the operator “open()” methods, and then
> store it in broadcast state.
>
> Our queries aren’t that expensive to run, so we run multiple on startup
> for each operator that needs the fat to avoid a race condition. The
> blocking until the broadcast state is received downstream sounds like a
> reasonable way to do it to.
>
> Hope that helps a bit, or at least as another example!
>
> Best,
> Austin
>
> On Mon, Aug 10, 2020 at 3:03 PM Theo Diefenthal <
> theo.diefent...@scoop-software.de> wrote:
>
>> Hi Marco,
>>
>> Sadly, I myself haven't ever used StateProcessorAPI either.
>>
>> I thought, the documentation here [1] is rather straight forward to be
>> used, but I never tried it myself.
>>
>> Also, I don' get what you mean with "tieing" DataSet and DataStream? For
>> what I understand, the StateProcessorAPI is internally upon DataSets at the
>> moment. So if you have a Streaming job (DataStream), you would run your
>> custom state migration program before even starting the streaming job using
>> the StateProcessor API and initialize your state as a  DataSet. After all,
>> initializing a state doesn't require you to have an infinite job running,
>> but only a finite one (Thus batch/DataSet). Once that program finished
>> execution, you would submit your streaming job starting from the written
>> savepoint. I guess the API works with either HDFS or filesystem, but maybe
>> someone who has already used the API might shed some more light for us
>> here.
>>
>> Best regards
>> Theo
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>
>> ------------------------------
>> *Von: *"Marco Villalobos" <mvillalo...@kineteque.com>
>> *An: *"Theo Diefenthal" <theo.diefent...@scoop-software.de>
>> *CC: *"user" <user@flink.apache.org>
>> *Gesendet: *Donnerstag, 6. August 2020 23:47:13
>>
>> *Betreff: *Re: Two Queries and a Kafka Topic
>>
>> I am trying to use the State Processor API.  Does that require HDFS or a
>> filesystem?
>> I wish there was a complete example that ties in both DataSet and
>> DataStream API, and the State Processor API.
>>
>> So far I have not been able to get it to work.
>>
>> Does anybody know where I can find examples of these type of techniques?
>>
>>
>>
>> On Wed, Aug 5, 2020 at 3:52 AM Theo Diefenthal <
>> theo.diefent...@scoop-software.de> wrote:
>>
>>> Hi Marco,
>>>
>>> In general, I see three solutions here you could approach:
>>>
>>> 1. Use the StateProcessorAPI: You can run a program with the
>>> stateProcessorAPI that loads the data from JDBC and stores it into a Flink
>>> SavePoint. Afterwards, you start your streaming job from that savepoint
>>> which will load its state and within find all the data from JDBC stored
>>> already.
>>> 2. Load from master, distribute with the job: When you build up your
>>> jobgraph, you could execute the JDBC queries and put the result into some
>>> Serializable class which in turn you plug in a an operator in your stream
>>> (e.g. a map stage). The class along with all the queried data will be
>>> serialized and deserialized on the taskmanagers (Usually, I use this for
>>> configuration parameters, but it might be ok in this case as well)
>>> 3. Load from TaskManager: In your map-function, if the very first event
>>> is received, you can block processing and synchronously load the data from
>>> JDBC (So each Taskmanager performs the JDBC query itself). You then keep
>>> the data to be used for all subsequent map steps.
>>>
>>> I think, option 3 is the easiest to be implemented while option 1 might
>>> be the most elegant way in my opinion.
>>>
>>> Best regards
>>> Theo
>>>
>>> ------------------------------
>>> *Von: *"Marco Villalobos" <mvillalo...@kineteque.com>
>>> *An: *"Leonard Xu" <xbjt...@gmail.com>
>>> *CC: *"user" <user@flink.apache.org>
>>> *Gesendet: *Mittwoch, 5. August 2020 04:33:23
>>> *Betreff: *Re: Two Queries and a Kafka Topic
>>>
>>> Hi Leonard,
>>>
>>> First, Thank you.
>>>
>>> I am currently trying to restrict my solution to Apache Flink 1.10
>>> because its the current version supported by Amazon EMR.
>>> i am not ready to change our operational environment to solve this.
>>>
>>> Second, I am using the DataStream API.  The Kafka Topic is not in a
>>> table, it is in a DataStream.
>>>
>>> The SQL queries are literally from a PostgresSQL database, and only need
>>> to be run exactly once in the lifetime of the job.
>>>
>>> I am struggling to determine where this happens.
>>>
>>> JDBCInputFormat seems to query the SQL table repetitively, and also
>>> connecting streams and aggregating into one object is very complicated.
>>>
>>> Thus, I am wondering what is the right approach.
>>>
>>> Let me restate the parameters.
>>>
>>> SQL Query One = data in PostgreSQL (200K records) that is used for
>>> business logic.
>>> SQL Query Two = data in PostgreSQL (1000 records) that is used for
>>> business logic.
>>> Kafka Topic One = unlimited data-stream that uses the data-stream api
>>> and queries above to write into multiple sinks
>>>
>>> Asci Diagram:
>>>
>>> [SQL Query One] ----> [Aggregate to Map]
>>>
>>> Kafka ----> [Kafka Topic One]  --- [Keyed Process Function (Query One
>>> Map, Query Two Map)] ---<[Multiple Sinks]
>>>
>>> [SQL Query Two] ---->[Aggregate to Map]
>>>
>>>
>>> Maybe my graph above helps.  You see, I need Query One and Query Two
>>> only ever execute once.  After that the information they provide are used
>>> to correctly process the Kafka Topic.
>>>
>>> I'll take a deep further to try and understand what you said, thank you,
>>> but JDBCInputFormat seem to repetitively query the database.  Maybe I need
>>> to write a RichFunction or AsyncIO function and cache the results in state
>>> after that.
>>>
>>>
>>>
>>> On Aug 4, 2020, at 6:25 PM, Leonard Xu <xbjt...@gmail.com> wrote:
>>>
>>> Hi, Marco
>>>
>>> If I need SQL Query One and SQL Query Two to happen just one time,
>>>
>>>
>>> Looks like you want to reuse this kafka table in one job, It’s supported
>>> to execute multiple query in one sql job in Flink 1.11.
>>> You can use `StatementSet`[1] to add SQL Query one and SQL query Two in
>>> a single SQL job[1].
>>>
>>>
>>> Best
>>> Leonard
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/insert.html#run-an-insert-statement
>>>
>>>
>>> 在 2020年8月5日,04:34,Marco Villalobos <mvillalo...@kineteque.com> 写道:
>>>
>>> Lets say that I have:
>>>
>>> SQL Query One from data in PostgreSQL (200K records).
>>> SQL Query Two from data in PostgreSQL (1000 records).
>>> and Kafka Topic One.
>>>
>>> Let's also say that main data from this Flink job arrives in Kafka Topic
>>> One.
>>>
>>> If I need SQL Query One and SQL Query Two to happen just one time, when
>>> the job starts up, and afterwards maybe store it in Keyed State or
>>> Broadcast State, but it's not really part of the stream, then what is the
>>> best practice for supporting that in Flink
>>>
>>> The Flink job needs to stream data from Kafka Topic One, aggregate it,
>>> and perform computations that require all of the data in SQL Query One and
>>> SQL Query Two to perform its business logic.
>>>
>>> I am using Flink 1.10.
>>>
>>> I supposed to query the database before the Job I submitted, and then
>>> pass it on as parameters to a function?
>>> Or am I supposed to use JDBCInputFormat for both queries and create two
>>> streams, and somehow connect or broadcast both of them two the main stream
>>> that uses Kafka Topic One?
>>>
>>> I would appreciate guidance. Please.  Thank you.
>>>
>>> Sincerely,
>>>
>>> Marco A. Villalobo
>>>
>>>

Reply via email to