Hi Aljosha,
thanks for the great suggestions, I wasn't aware of
AsyncDataStream.unorderedWait
and BucketingSink setBucketer().
Most probably that's exactly what I was looking for...(I should just have
the time to test it.
Just one last question: what are you referring to with "you could use a
different readFile() method where you can specify  that you want to
continue monitoring the directory for new files"?  Is there a way to delete
or move to a backup dir the new input files once enriched?

Best Flavio



On Thu, Jun 15, 2017 at 2:30 PM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Ok, just trying to make sure I understand everything: You have this:
>
> 1. A bunch of data in HDFS that you want to enrich
> 2. An external service (Solr/ES) that you query for enriching the data
> rows stored in 1.
> 3. You need to store the enriched rows in HDFS again
>
> I think you could just do this (roughly):
>
> StreamExecutionEnvironment env = …;
>
> DataStream<Row> input = env.readFile(new RowCsvInputFormat(…), “<hdfs
> path>”);
>
> DataStream<Row> enriched = input.flatMap(new MyEnricherThatCallsES());
> // or
> DataStream<Row> enriched = AsyncDataStream.unorderedWait(input, …) //
> yes, the interface for this is a bit strange
>
> BucketingSink sink = new BucketingSink(“<hdfs sink path>”);
> // this is responsible for putting files into buckets, so that you don’t
> have to many small HDFS files
> sink.setBucketer(new MyBucketer());
> enriched.addSink(sink)
>
> In this case, the file source will close once all files are read and the
> job will finish. If you don’t want this you can also use a different
> readFile() method where you can specify  that you want to continue
> monitoring the directory for new files.
>
> Best,
> Aljoscha
>
> On 6. Jun 2017, at 17:38, Flavio Pompermaier <pomperma...@okkam.it> wrote:
>
> Hi Aljosha,
> thanks for getting back to me on this! I'll try to simplify the thread
> starting from what we want to achieve.
>
> At the moment we execute some queries to a db and we store the data into
> Parquet directories (one for each query).
> Let's say we then create a DataStream<Row> from each dir, what we would
> like to achieve is to perform some sort of throttling of the queries to
> perfrom to this external service (in order to not overload it with too many
> queries...but we also need to run as much queries as possible in order to
> execute this process in a reasonable time).
>
> The current batch process has the downside that you must know at priori
> the right parallelism of the job while the streaming process should be able
> to rescale when needed [1] so it should be easier to tune the job
> parallelism without loosing all the already performed queries [2].
> Moreover, it the job crash you loose all the work done up to that moment
> because there's no checkpointing...
> My initial idea was to read from HDFS and put the data into Kafka to be
> able to change the number of consumers at runtime (accordingly to the
> maxmimum parallelism we can achieve with the external service) but maybe
> this could be done in a easier way (we've started using streaming from a
> few time so we can see things more complicated than they are).
>
> Moreover, as the last step, we need to know when all the data has been
> enriched so we can stop this first streaming job and we can start with the
> next one (that cannot run if the acquisition job is still in progress
> because it can break referential integrity). Is there any example of such a
> use case?
>
> [1] at the moment manually..maybe automatically in the future, right?
> [2] with the batch job if we want to change the parallelism we need to
> stop it and relaunch it, loosing all the already enriched data because
> there's no checkpointing there
>
> On Tue, Jun 6, 2017 at 4:46 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi Flavio,
>>
>> I’ll try and answer your questions:
>>
>> Regarding 1. Why do you need to first read the data from HDFS into Kafka
>> (or another queue)? Using 
>> StreamExecutionEnvironment.readFile(FileInputFormat,
>> String, FileProcessingMode, long) you can monitor a directory in HDFS and
>> process the files that are there and any newly arriving files. For batching
>> your output, you could look into the BucketingSink which will write to
>> files in HDFS (or some other DFS) and start new files (buckets) based on
>> some criteria, for example number of processed elements or time.
>>
>> Regarding 2. I didn’t completely understand this part. Could you maybe
>> elaborate a bit, please?
>>
>> Regarding 3. Yes, I think you can. You would use this to fire of your
>> queries to solr/ES.
>>
>> Best,
>> Aljoscha
>>
>> On 11. May 2017, at 15:06, Flavio Pompermaier <pomperma...@okkam.it>
>> wrote:
>>
>> Hi to all,
>> we have a particular use case where we have a tabular dataset on HDFS
>> (e.g. a CSV) that we want to enrich filling some cells with the content
>> returned by a query to a reverse index (e.g. solr/elasticsearch).
>> Since we want to be able to make this process resilient and scalable we
>> thought that Flink streaming could be a good fit since we can control the
>> "pressure" on the index by adding/removing consumers dynamically and there
>> is automatic error recovery.
>>
>> Right now we developed 2 different solutions to the problem:
>>
>>    1. *move the dataset from HDFS to a queue/topic* (like Kafka or
>>    RabbitMQ) and then let the queue consumers do the real job (pull Rows from
>>    the queue, enrich and then persist the enriched Rows). The questions here
>>    are:
>>       1. how to properly manage writing to HDFS ? if we read a set of
>>       rows, we enrich them and we need to write the result back to HDFS, is 
>> it
>>       possible to automatically compact files in order to avoid the "too many
>>       small files" problem on HDFS? How to avoid file name collision (put 
>> each
>>       batch of rows to a different file)?
>>       2. how to control the number dynamically? is it possible to change
>>       the parallelism once the job has started?
>>       2. in order to avoid useless data transfer from HDFS to a
>>    queue/topic (since we don't need all the Row fields to create the
>>    query..usually only 2/5 fields are needed) we can create a Flink job that
>>    put the q*ueries into a queue/topic *and wait for the result. The
>>    problem with this approach is:
>>       1. how to correlate queries with their responses? creating a
>>       unique response queue/topic implies that all consumers reads all 
>> messages
>>       (and discard those that are not directed to them) while creating a
>>       queue/topic for each sub-task could be expansive (in terms of 
>> resources and
>>       managment..but we don't have any evidence/experience of this..it's 
>> just a
>>       possible problem).
>>    3. Maybe we can exploit *Flink async/IO *somehow...? But how?
>>
>>
>> Any suggestion/drawbacks on the 2 approaches?
>>
>> Thanks in advance,
>> Flavio
>>
>>
>>

Reply via email to