> On Jan 26, 2019, at 1:08 PM, Nimrod Hauser <nimrod.hau...@bluevoyant.com> 
> wrote:
> 
> Hey Ken, 
> 
> Thank you for your quick response! That definitely sounds like something 
> worth exploring.
> Just a few more small questions, if that's ok.
> 
> 1. You referred to the parquet source as a "stream", but what we have is a 
> static data-source which we will always want to "query" against .
>     What we thought about doing is to stream the entire parquet dataset and 
> load it into our state. 
>     Does that sound right, or is that "hacky”?

Not sure what you mean by “stream the entire parquet dataset”. Do you mean 
you’d load it into memory yourself, and then distribute it?

If so, then yes you can do that, but you’d have to obviously re-load it 
yourself, and partition it (since it’s also keyed, right?) yourself, etc.

> 2. Can the continuousFileMonitoringFunction be used to track an entire 
> directory of parquet files?

Yes.

> Also, we'd like it to refresh its' state (= its' internal data structures) 
> every time the parquet folder is updated, but only after all new files have 
> been written (meaning, we'll need it to run once an update has been detected, 
> but not right away)
> Is that a reasonable use-case?

It’s a reasonable use case, but it precludes using the 
ContinuousFileMonitoringFunction.

You can write a custom SourceFunction, but where it gets tricky is handling 
failure recovery (checkpointing).

But I should also have mentioned the fundamental issue with this kind of 
enrichment in Flink - you can’t control the ordering of the two streams 
(easily), so you have to be prepared to buffer data from Kafka until you’ve got 
a complete set of data from Parquet.

We’d worked around a similar issue with a UnionedSources 
<https://github.com/ScaleUnlimited/flink-streaming-kmeans/blob/master/src/main/java/com/scaleunlimited/flinksources/UnionedSources.java>
 source function, but I haven’t validated that it handles checkpointing 
correctly.

— Ken

 
> 
> And thank you once again.
> 
> Nimrod.
> 
> On Sat, Jan 26, 2019 at 7:10 PM Ken Krugler <kkrugler_li...@transpac.com 
> <mailto:kkrugler_li...@transpac.com>> wrote:
> Hi Nimrod,
> 
> One approach is as follows…
> 
> 1. For the Parquet data, you could use a ContinuousFileMonitoringFunction 
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.7_api_java_org_apache_flink_streaming_api_functions_source_ContinuousFileMonitoringFunction.html&d=DwMFaQ&c=euGZstcaTDllvimEN8b7jXrwqOf-v5A_CdpgnVfiiMM&r=ZiJRxaWlmZ09uE1VLnEG3ryBI3b9mAkVojy2QaG8EaA&m=5NC-xiUI1cQNX_73Zvaja-CGYF-QhEQTh-Z7XZrbE6U&s=9kusX9KP5vzip4W2BZxGO0-yceK2XsdSh4n8p3xpGW8&e=>
>  to generate a stream of enrichment records.
> 
> 2. Then use a CoMapFunction to “merge” the two connected streams (Kafka and 
> Parquet), and output something like Tuple2<key, Either<Parquet, Kafka>>
> 
> 3. In your enrichment function, based on what’s in the Either<> you’re either 
> updating your enrichment state, or processing a record from Kafka.
> 
> But I think you might want to add a stateful function in the Parquet stream, 
> so that you can separately track Kafka record state in the enrichment 
> function.
> 
> There’s also the potential issue of wanting to buffer Kafka data in the 
> enrichment function, if you need to coordinate with the enrichment data (e.g. 
> you need to get a complete set of updated enrichment data before applying any 
> of it to the incoming Kafka data).
> 
> — Ken
> 
> 
> 
>> On Jan 26, 2019, at 8:04 AM, Nimrod Hauser <nimrod.hau...@bluevoyant.com 
>> <mailto:nimrod.hau...@bluevoyant.com>> wrote:
>> 
>> Hello,
>> 
>> We're using Flink on a high velocity data-stream, and we're looking for the 
>> best way to enrich our stream using a large static source (originating from 
>> Parquet files, which are rarely updated).
>> 
>> The source for the enrichment weights a few GBs, which is why we want to 
>> avoid using techniques such as broadcast streams, which cannot be keyed and 
>> need to be duplicated for every Flink operator that is used.
>> 
>> We started looking into the possibility of merging streams with datasets, or 
>> using the Table API, but any best-practice that's been done before will be 
>> greatly appreciated.
>> 
>> I'm attaching a simple chart for convenience,
>> 
>> Thanks you very much,
>> 
>> Nimrod.
>> <flink enrichment flow.png>

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply via email to