Hi all,

@Ken, the approach of telling the operator which input to read from would
cause problems with the current checkpointing mechanism because checkpoint
barriers are not allowed to overtake regular records. Chaining wouldn't be
an issue, because operators with two inputs are not chained to their
predecessors.

The side inputs are exactly the effort to address these use cases. Im not
100% into the details, but AFAIK, there are some improvements to the
checkpointing mechanism that need to be solved before side input can be
implemented.
Side inputs will also support to initially read side inputs (blocking all
other streams) and starting the other other streams once the initialization
is completed. Afterwards the side inputs will still be able to provide
updates.

Buffering records in a function does not necessarily lead to OOME. If the
stream is keyed, you can put the state into a RocksDBStateBackend and write
it to disk.

Best, Fabian

2018-04-25 23:36 GMT+02:00 Ken Krugler <kkrugler_li...@transpac.com>:

> Hi Michael,
>
> Windowing works when you’re joining timestamped metadata and non-metadata.
>
> The common case I’m referring to is where there’s some function state
> (e.g. rules to process data, machine learning models, or in my case
> clusters), where you want to process the non-metadata with the "current
> state”.
>
> In that case, blindly applying whatever metadata has been collected to
> incoming non-metadata often doesn’t work well. That’s why Fabian was
> suggesting various approaches (below) to work around the problem. The
> general solution (his option #2, with buffering) will work, but can lead to
> OOME and feels like it breaks the basic Flink back-pressure mechanism, due
> to in-operator buffering.
>
> If it was possible to essentially allow Flink to block (or not pull, for
> sources) from the non-metadata stream when appropriate, then no buffering
> would be needed. Then it would be straightforward to do things like…
>
> - drain all metadata from a Kafka topic before applying that to the other
> stream.
> - defer processing data from the other stream if there was newer metadata.
>
> As an aside, what I’m seeing with Flink 1.5 and using a connected keyed &
> broadcast stream is that the CoFlatMapFunction seems to be giving priority
> to data going to the flatMap1() method, though this could be an odd side
> effect of how iterations impact the two streams.
>
> — Ken
>
>
> On Apr 25, 2018, at 1:09 PM, TechnoMage <mla...@technomage.com> wrote:
>
> I agree in the general case you need to operate on the stream data based
> on the metadata you have.  The side input feature coming some day may help
> you, in that it would give you a means to receive inputs out of band.  But,
> given changing metadata and changing stream data I am not sure this is any
> different from dual stream data inputs.  Either you use windowing to do
> small batches of data to allow coordination of stream and metadata, or you
> use the metadata you have collected to date on receipt of the stream data.
> Given flink will do record by record processing you have the option of
> controlling the timing as needed for your use case.
>
> Michael
>
> On Apr 25, 2018, at 1:57 PM, Ken Krugler <kkrugler_li...@transpac.com>
> wrote:
>
> Hi Michael,
>
> I agree there are cases where it’s possible to implement a solution via
> buffering.
>
> But this case of using broadcast state to update a function operating on
> streaming data seems common enough that it would be useful for Flink to
> provide some help.
>
> Additionally, even with buffering there are currently challenges...
>
> 1. For the case I’m dealing with (iterative KMeans clustering) you don’t
> have a time when "metadata is aggregated", as it’s constantly evolving.
>
> 2. It’s sometimes not possible to know when you’ve received all of the
> metadata (e.g. if you’re reading from a Kafka topic).
>
> 3. Buffering the non-metadata can create an unbounded memory issue.
>
> Regards,
>
> — Ken
>
>
> On Apr 25, 2018, at 12:39 PM, Michael Latta <lat...@me.com> wrote:
>
> Using a flat map function, you can always buffer the non-meta data stream
> in the operator state until the metadata is aggregated, and then process
> any collected data.  It would require a RichFlatMap to hold data.
>
> Michael
>
> On Apr 25, 2018, at 1:20 PM, Ken Krugler <kkrugler_li...@transpac.com>
> wrote:
>
> Hi Fabian,
>
> On Apr 24, 2018, at 3:01 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
> Hi Alex,
>
> An operator that has to join two input streams obviously requires two
> inputs. In case of an enrichment join, the operator should first read the
> meta-data stream and build up a data structure as state against which the
> other input is joined. If the meta data is (infrequently) updated, these
> updates should be integrated into the state.
>
> The problem is that it is currently not possible to implement such an
> operator with Flink because operators cannot decide from which input to
> read, i.e., they have to process whatever data is given to them.
> Hence, it is not possible to build up a data structure from the meta data
> stream before consuming the other stream.
>
>
> This seems like a common situation, and one where it might be relatively
> easy for Flink to help resolve.
>
> Specifically, for a connected stream feeding a Co(Flat)MapFunction, it
> seems like we could let Flink know how to pick elements from the two
> network buffers - e.g. random, round robin, or by timestamp.
>
> I don’t know how this works with chained operators, but it does seem a bit
> odd to have operators create buffers of elements when (network) buffers
> often already exist.
>
> If there’s no network buffers in play (e.g. there’s a direct chain of
> operators from a source) then it could be something that’s not supported,
> though with the future source-pull architecture that would also be easy to
> resolve.
>
> Anyway, I could take a whack at this if it seems reasonable.
>
> — Ken
>
>
>
>
> There are a few workarounds that work in special cases.
> 1) The meta data is rather small and never updated. You put the meta data
> as a file into a (distributed) file system an read it from each function
> instance when it is initialized, i.e., in open(), and put into a hash map.
> Each function instance will hold the complete meta data in memory (on the
> heap). Since the meta data is broadcasted, the other stream does not need
> to be partitioned to join against the meta data in the hash map. You can
> implement this function as a FlatMapFunction or ProcessFunction.
> 2) The meta data is too large and/or is updated. In this case, you need a
> function with two inputs. Both inputs are keyed (keyBy()) on a join
> attribute. Since you cannot hold back the non-meta data stream, you need to
> buffer it in (keyed) state until you've read the meta data stream up to a
> point when you can start processing the other stream. If the meta data is
> updated at some point, you can just add the new data to the state. The
> benefits of this approach is that the state is shared across all operators
> and can be updated. However, you might need to initially buffer quite a bit
> of data in state if the non-meta data stream has a high volume.
>
> Hope that one of these approaches works for your use case.
>
> Best, Fabian
>
> 2018-04-23 13:29 GMT+02:00 Alexander Smirnov <alexander.smirn...@gmail.com
> >:
>
>> Hi Fabian,
>>
>> please share the workarounds, that must be helpful for my case as well
>>
>> Thank you,
>> Alex
>>
>> On Mon, Apr 23, 2018 at 2:14 PM Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Miki,
>>>
>>> Sorry for the late response.
>>> There are basically two ways to implement an enrichment join as in your
>>> use case.
>>>
>>> 1) Keep the meta data in the database and implement a job that reads the
>>> stream from Kafka and queries the database in an ASyncIO operator for every
>>> stream record. This should be the easier implementation but it will send
>>> one query to the DB for each streamed record.
>>> 2) Replicate the meta data into Flink state and join the streamed
>>> records with the state. This solution is more complex because you need
>>> propagate updates of the meta data (if there are any) into the Flink state.
>>> At the moment, Flink lacks a few features to have a good implementation of
>>> this approach, but there a some workarounds that help in certain cases.
>>>
>>> Note that Flink's SQL support does not add advantages for the either of
>>> both approaches. You should use the DataStream API (and possible
>>> ProcessFunctions).
>>>
>>> I'd go for the first approach if one query per record is feasible.
>>> Let me know if you need to tackle the second approach and I can give
>>> some details on the workarounds I mentioned.
>>>
>>> Best, Fabian
>>>
>>> 2018-04-16 20:38 GMT+02:00 Ken Krugler <kkrugler_li...@transpac.com>:
>>>
>>>> Hi Miki,
>>>>
>>>> I haven’t tried mixing AsyncFunctions with SQL queries.
>>>>
>>>> Normally I’d create a regular DataStream workflow that first reads from
>>>> Kafka, then has an AsyncFunction to read from the SQL database.
>>>>
>>>> If there are often duplicate keys in the Kafka-based stream, you could
>>>> keyBy(key) before the AsyncFunction, and then cache the result of the SQL
>>>> query.
>>>>
>>>> — Ken
>>>>
>>>> On Apr 16, 2018, at 11:19 AM, miki haiat <miko5...@gmail.com> wrote:
>>>>
>>>> HI thanks  for the reply  i will try to break your reply to the flow
>>>> execution order .
>>>>
>>>> First data stream Will use AsyncIO and select the table ,
>>>> Second stream will be kafka and the i can join the stream and map it ?
>>>>
>>>> If that   the case  then i will select the table only once on load ?
>>>> How can i make sure that my stream table is "fresh" .
>>>>
>>>> Im thinking to myself , is thire a way to use flink backend (ROKSDB)
>>>> and create read/write through
>>>> macanisem ?
>>>>
>>>> Thanks
>>>>
>>>> miki
>>>>
>>>>
>>>>
>>>> On Mon, Apr 16, 2018 at 2:45 AM, Ken Krugler <kkrugler_lists@
>>>> transpac.com> wrote:
>>>>
>>>>> If the SQL data is all (or mostly all) needed to join against the data
>>>>> from Kafka, then I might try a regular join.
>>>>>
>>>>> Otherwise it sounds like you want to use an AsyncFunction to do ad hoc
>>>>> queries (in parallel) against your SQL DB.
>>>>>
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>>>> dev/stream/operators/asyncio.html
>>>>>
>>>>> — Ken
>>>>>
>>>>>
>>>>> On Apr 15, 2018, at 12:15 PM, miki haiat <miko5...@gmail.com> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I have a case of meta data enrichment and im wondering if my approach
>>>>> is the correct way .
>>>>>
>>>>>    1. input stream from kafka.
>>>>>    2. MD in msSQL .
>>>>>    3. map to new pojo
>>>>>
>>>>> I need to extract  a key from the kafka stream   and use it to select
>>>>> some values from the sql table  .
>>>>>
>>>>> SO i thought  to use  the table SQL api in order to select the table
>>>>> MD
>>>>> then convert the kafka stream to table and join the data by  the
>>>>> stream key .
>>>>>
>>>>> At the end i need to map the joined data to a new POJO and send it to
>>>>> elesticserch .
>>>>>
>>>>> Any suggestions or different ways to solve this use case ?
>>>>>
>>>>> thanks,
>>>>> Miki
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --------------------------
>>>>> Ken Krugler
>>>>> http://www.scaleunlimited.com
>>>>> custom big data solutions & training
>>>>> Hadoop, Cascading, Cassandra & Solr
>>>>>
>>>>>
>>>>
>>>> --------------------------------------------
>>>> http://about.me/kkrugler
>>>> +1 530-210-6378 <(530)%20210-6378>
>>>>
>>>>
>>>
>
> --------------------------------------------
> http://about.me/kkrugler
> +1 530-210-6378
>
>
>
> --------------------------------------------
> http://about.me/kkrugler
> +1 530-210-6378
>
>
>
> --------------------------------------------
> http://about.me/kkrugler
> +1 530-210-6378
>
>

Reply via email to