I'll be interested to hear more about this when you implement it.

Thank you

On Wed, Feb 17, 2016 at 4:44 AM, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> In my use case I though to persist the dataset to reuse on Tachyon in
> order to speed up its reading..do you think it could help?
>
>
> On Tue, Feb 16, 2016 at 10:28 PM, Saliya Ekanayake <esal...@gmail.com>
> wrote:
>
>> Thank you. I'll check this
>>
>> On Tue, Feb 16, 2016 at 4:01 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Broadcasted DataSets are stored on the JVM heap of each task manager
>>> (but shared among multiple slots on the same TM), hence the size
>>> restriction.
>>>
>>> There are two ways to retrieve a DataSet (such as the result of a
>>> reduce).
>>> 1) if you want to fetch the result into your client program use
>>> DataSet.collect(). This immediately triggers an execution and fetches the
>>> result from the cluster.
>>> 2) if you want to use the result for a computation in the cluster use
>>> broadcast sets as described above.
>>>
>>> 2016-02-16 21:54 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>:
>>>
>>>> Thank you, yes, this makes sense. The broadcasted data in my case would
>>>> a large array of 3D coordinates,
>>>>
>>>> On a side note, how can I take the output from a reduce function? I can
>>>> see methods to write it to a given output, but is it possible to retrieve
>>>> the reduced result back to the program - like a double value representing
>>>> the average in the previous example.
>>>>
>>>>
>>>> On Tue, Feb 16, 2016 at 3:47 PM, Fabian Hueske <fhue...@gmail.com>
>>>> wrote:
>>>>
>>>>> You can use so-called BroadcastSets to send any sufficiently small
>>>>> DataSet (such as a computed average) to any other function and use it 
>>>>> there.
>>>>> However, in your case you'll end up with a data flow that branches (at
>>>>> the source) and merges again (when the average is send to the second map).
>>>>> Such patterns can cause deadlocks and can therefore not be pipelined
>>>>> which means that the data before the branch is written to disk and read
>>>>> again.
>>>>> In your case it might be even better to read the data twice instead of
>>>>> reading, writing, and reading it.
>>>>>
>>>>> Fabian
>>>>>
>>>>> 2016-02-16 21:15 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>:
>>>>>
>>>>>> I looked at the samples and I think what you meant is clear, but I
>>>>>> didn't find a solution for my need. In my case, I want to use the result
>>>>>> from first map operation before I can apply the second map on the
>>>>>> *same* data set. For simplicity, let's say I've a bunch of short
>>>>>> values represented as my data set. Then I need to find their average, so 
>>>>>> I
>>>>>> use a map and reduce. Then I want to map these short values with another
>>>>>> function, but it needs that average computed in the beginning to work
>>>>>> correctly.
>>>>>>
>>>>>> Is this possible without doing multiple reads of the input data to
>>>>>> create the same dataset?
>>>>>>
>>>>>> Thank you,
>>>>>> saliya
>>>>>>
>>>>>> On Tue, Feb 16, 2016 at 12:03 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Yes, if you implement both maps in a single job, data is read once.
>>>>>>>
>>>>>>> 2016-02-16 15:53 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>:
>>>>>>>
>>>>>>>> Fabian,
>>>>>>>>
>>>>>>>> I've a quick follow-up question on what you suggested. When
>>>>>>>> streaming the same data through different maps, were you implying that
>>>>>>>> everything goes as single job in Flink, so data read happens only once?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Saliya
>>>>>>>>
>>>>>>>> On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <fhue...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> It is not possible to "pin" data sets in memory, yet.
>>>>>>>>> However, you can stream the same data set through two different
>>>>>>>>> mappers at the same time.
>>>>>>>>>
>>>>>>>>> For instance you can have a job like:
>>>>>>>>>
>>>>>>>>>                  /---> Map 1 --> SInk1
>>>>>>>>> Source --<
>>>>>>>>>                  \---> Map 2 --> SInk2
>>>>>>>>>
>>>>>>>>> and execute it at once.
>>>>>>>>> For that you define you data flow and call execute once after all
>>>>>>>>> sinks have been created.
>>>>>>>>>
>>>>>>>>> Best, Fabian
>>>>>>>>>
>>>>>>>>> 2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>:
>>>>>>>>>
>>>>>>>>>> Fabian,
>>>>>>>>>>
>>>>>>>>>> count() was just an example. What I would like to do is say run
>>>>>>>>>> two map operations on the dataset (ds). Each map will have it's own
>>>>>>>>>> reduction, so is there a way to avoid creating two jobs for such 
>>>>>>>>>> scenario?
>>>>>>>>>>
>>>>>>>>>> The reason is, reading these binary matrices are expensive. In
>>>>>>>>>> our current MPI implementation, I am using memory maps for faster 
>>>>>>>>>> loading
>>>>>>>>>> and reuse.
>>>>>>>>>>
>>>>>>>>>> Thank you,
>>>>>>>>>> Saliya
>>>>>>>>>>
>>>>>>>>>> On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <fhue...@gmail.com
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> it looks like you are executing two distinct Flink jobs.
>>>>>>>>>>> DataSet.count() triggers the execution of a new job. If you have
>>>>>>>>>>> an execute() call in your program, this will lead to two Flink jobs 
>>>>>>>>>>> being
>>>>>>>>>>> executed.
>>>>>>>>>>> It is not possible to share state among these jobs.
>>>>>>>>>>>
>>>>>>>>>>> Maybe you should add a custom count implementation (using a
>>>>>>>>>>> ReduceFunction) which is executed in the same program as the other
>>>>>>>>>>> ReduceFunction.
>>>>>>>>>>>
>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> I see that an InputFormat's open() and nextRecord() methods get
>>>>>>>>>>>> called for each terminal operation on a given dataset using that 
>>>>>>>>>>>> particular
>>>>>>>>>>>> InputFormat. Is it possible to avoid this - possibly using some 
>>>>>>>>>>>> caching
>>>>>>>>>>>> technique in Flink?
>>>>>>>>>>>>
>>>>>>>>>>>> For example, I've some code like below and I see for both the
>>>>>>>>>>>> last two statements (reduce() and count()) the above methods in 
>>>>>>>>>>>> the input
>>>>>>>>>>>> format get called. Btw. this is a custom input format I wrote to 
>>>>>>>>>>>> represent
>>>>>>>>>>>> a binary matrix stored as Short values.
>>>>>>>>>>>>
>>>>>>>>>>>> ShortMatrixInputFormat smif = new ShortMatrixInputFormat();
>>>>>>>>>>>>
>>>>>>>>>>>> DataSet<Short[]> ds = env.createInput(smif, 
>>>>>>>>>>>> BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO);
>>>>>>>>>>>>
>>>>>>>>>>>> MapOperator<Short[], DoubleStatistics> op = ds.map(...)
>>>>>>>>>>>>
>>>>>>>>>>>> *op.reduce(...)*
>>>>>>>>>>>>
>>>>>>>>>>>> *op.count(...)*
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you,
>>>>>>>>>>>> Saliya
>>>>>>>>>>>> --
>>>>>>>>>>>> Saliya Ekanayake
>>>>>>>>>>>> Ph.D. Candidate | Research Assistant
>>>>>>>>>>>> School of Informatics and Computing | Digital Science Center
>>>>>>>>>>>> Indiana University, Bloomington
>>>>>>>>>>>> Cell 812-391-4914
>>>>>>>>>>>> http://saliya.org
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Saliya Ekanayake
>>>>>>>>>> Ph.D. Candidate | Research Assistant
>>>>>>>>>> School of Informatics and Computing | Digital Science Center
>>>>>>>>>> Indiana University, Bloomington
>>>>>>>>>> Cell 812-391-4914
>>>>>>>>>> http://saliya.org
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Saliya Ekanayake
>>>>>>>> Ph.D. Candidate | Research Assistant
>>>>>>>> School of Informatics and Computing | Digital Science Center
>>>>>>>> Indiana University, Bloomington
>>>>>>>> Cell 812-391-4914
>>>>>>>> http://saliya.org
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Saliya Ekanayake
>>>>>> Ph.D. Candidate | Research Assistant
>>>>>> School of Informatics and Computing | Digital Science Center
>>>>>> Indiana University, Bloomington
>>>>>> Cell 812-391-4914
>>>>>> http://saliya.org
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Saliya Ekanayake
>>>> Ph.D. Candidate | Research Assistant
>>>> School of Informatics and Computing | Digital Science Center
>>>> Indiana University, Bloomington
>>>> Cell 812-391-4914
>>>> http://saliya.org
>>>>
>>>
>>>
>>
>>
>> --
>> Saliya Ekanayake
>> Ph.D. Candidate | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>> Cell 812-391-4914
>> http://saliya.org
>>
>


-- 
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http://saliya.org

Reply via email to