Fabian,

I've a quick follow-up question on what you suggested. When streaming the
same data through different maps, were you implying that everything goes as
single job in Flink, so data read happens only once?

Thanks,
Saliya

On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> It is not possible to "pin" data sets in memory, yet.
> However, you can stream the same data set through two different mappers at
> the same time.
>
> For instance you can have a job like:
>
>                  /---> Map 1 --> SInk1
> Source --<
>                  \---> Map 2 --> SInk2
>
> and execute it at once.
> For that you define you data flow and call execute once after all sinks
> have been created.
>
> Best, Fabian
>
> 2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>:
>
>> Fabian,
>>
>> count() was just an example. What I would like to do is say run two map
>> operations on the dataset (ds). Each map will have it's own reduction, so
>> is there a way to avoid creating two jobs for such scenario?
>>
>> The reason is, reading these binary matrices are expensive. In our
>> current MPI implementation, I am using memory maps for faster loading and
>> reuse.
>>
>> Thank you,
>> Saliya
>>
>> On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> it looks like you are executing two distinct Flink jobs.
>>> DataSet.count() triggers the execution of a new job. If you have an
>>> execute() call in your program, this will lead to two Flink jobs being
>>> executed.
>>> It is not possible to share state among these jobs.
>>>
>>> Maybe you should add a custom count implementation (using a
>>> ReduceFunction) which is executed in the same program as the other
>>> ReduceFunction.
>>>
>>> Best, Fabian
>>>
>>>
>>>
>>> 2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>:
>>>
>>>> Hi,
>>>>
>>>> I see that an InputFormat's open() and nextRecord() methods get called
>>>> for each terminal operation on a given dataset using that particular
>>>> InputFormat. Is it possible to avoid this - possibly using some caching
>>>> technique in Flink?
>>>>
>>>> For example, I've some code like below and I see for both the last two
>>>> statements (reduce() and count()) the above methods in the input format get
>>>> called. Btw. this is a custom input format I wrote to represent a binary
>>>> matrix stored as Short values.
>>>>
>>>> ShortMatrixInputFormat smif = new ShortMatrixInputFormat();
>>>>
>>>> DataSet<Short[]> ds = env.createInput(smif, 
>>>> BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
>>>>
>>>> MapOperator<Short[], DoubleStatistics> op = ds.map(...)
>>>>
>>>> *op.reduce(...)*
>>>>
>>>> *op.count(...)*
>>>>
>>>>
>>>> Thank you,
>>>> Saliya
>>>> --
>>>> Saliya Ekanayake
>>>> Ph.D. Candidate | Research Assistant
>>>> School of Informatics and Computing | Digital Science Center
>>>> Indiana University, Bloomington
>>>> Cell 812-391-4914
>>>> http://saliya.org
>>>>
>>>
>>>
>>
>>
>> --
>> Saliya Ekanayake
>> Ph.D. Candidate | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>> Cell 812-391-4914
>> http://saliya.org
>>
>
>


-- 
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http://saliya.org

Reply via email to