Yes, if you implement both maps in a single job, data is read once.

2016-02-16 15:53 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>:

> Fabian,
>
> I've a quick follow-up question on what you suggested. When streaming the
> same data through different maps, were you implying that everything goes as
> single job in Flink, so data read happens only once?
>
> Thanks,
> Saliya
>
> On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> It is not possible to "pin" data sets in memory, yet.
>> However, you can stream the same data set through two different mappers
>> at the same time.
>>
>> For instance you can have a job like:
>>
>>                  /---> Map 1 --> SInk1
>> Source --<
>>                  \---> Map 2 --> SInk2
>>
>> and execute it at once.
>> For that you define you data flow and call execute once after all sinks
>> have been created.
>>
>> Best, Fabian
>>
>> 2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>:
>>
>>> Fabian,
>>>
>>> count() was just an example. What I would like to do is say run two map
>>> operations on the dataset (ds). Each map will have it's own reduction, so
>>> is there a way to avoid creating two jobs for such scenario?
>>>
>>> The reason is, reading these binary matrices are expensive. In our
>>> current MPI implementation, I am using memory maps for faster loading and
>>> reuse.
>>>
>>> Thank you,
>>> Saliya
>>>
>>> On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> it looks like you are executing two distinct Flink jobs.
>>>> DataSet.count() triggers the execution of a new job. If you have an
>>>> execute() call in your program, this will lead to two Flink jobs being
>>>> executed.
>>>> It is not possible to share state among these jobs.
>>>>
>>>> Maybe you should add a custom count implementation (using a
>>>> ReduceFunction) which is executed in the same program as the other
>>>> ReduceFunction.
>>>>
>>>> Best, Fabian
>>>>
>>>>
>>>>
>>>> 2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>:
>>>>
>>>>> Hi,
>>>>>
>>>>> I see that an InputFormat's open() and nextRecord() methods get called
>>>>> for each terminal operation on a given dataset using that particular
>>>>> InputFormat. Is it possible to avoid this - possibly using some caching
>>>>> technique in Flink?
>>>>>
>>>>> For example, I've some code like below and I see for both the last two
>>>>> statements (reduce() and count()) the above methods in the input format 
>>>>> get
>>>>> called. Btw. this is a custom input format I wrote to represent a binary
>>>>> matrix stored as Short values.
>>>>>
>>>>> ShortMatrixInputFormat smif = new ShortMatrixInputFormat();
>>>>>
>>>>> DataSet<Short[]> ds = env.createInput(smif, 
>>>>> BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
>>>>>
>>>>> MapOperator<Short[], DoubleStatistics> op = ds.map(...)
>>>>>
>>>>> *op.reduce(...)*
>>>>>
>>>>> *op.count(...)*
>>>>>
>>>>>
>>>>> Thank you,
>>>>> Saliya
>>>>> --
>>>>> Saliya Ekanayake
>>>>> Ph.D. Candidate | Research Assistant
>>>>> School of Informatics and Computing | Digital Science Center
>>>>> Indiana University, Bloomington
>>>>> Cell 812-391-4914
>>>>> http://saliya.org
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Saliya Ekanayake
>>> Ph.D. Candidate | Research Assistant
>>> School of Informatics and Computing | Digital Science Center
>>> Indiana University, Bloomington
>>> Cell 812-391-4914
>>> http://saliya.org
>>>
>>
>>
>
>
> --
> Saliya Ekanayake
> Ph.D. Candidate | Research Assistant
> School of Informatics and Computing | Digital Science Center
> Indiana University, Bloomington
> Cell 812-391-4914
> http://saliya.org
>

Reply via email to