I'll be interested to hear more about this when you implement it. Thank you
On Wed, Feb 17, 2016 at 4:44 AM, Flavio Pompermaier <pomperma...@okkam.it> wrote: > In my use case I though to persist the dataset to reuse on Tachyon in > order to speed up its reading..do you think it could help? > > > On Tue, Feb 16, 2016 at 10:28 PM, Saliya Ekanayake <esal...@gmail.com> > wrote: > >> Thank you. I'll check this >> >> On Tue, Feb 16, 2016 at 4:01 PM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Broadcasted DataSets are stored on the JVM heap of each task manager >>> (but shared among multiple slots on the same TM), hence the size >>> restriction. >>> >>> There are two ways to retrieve a DataSet (such as the result of a >>> reduce). >>> 1) if you want to fetch the result into your client program use >>> DataSet.collect(). This immediately triggers an execution and fetches the >>> result from the cluster. >>> 2) if you want to use the result for a computation in the cluster use >>> broadcast sets as described above. >>> >>> 2016-02-16 21:54 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>: >>> >>>> Thank you, yes, this makes sense. The broadcasted data in my case would >>>> a large array of 3D coordinates, >>>> >>>> On a side note, how can I take the output from a reduce function? I can >>>> see methods to write it to a given output, but is it possible to retrieve >>>> the reduced result back to the program - like a double value representing >>>> the average in the previous example. >>>> >>>> >>>> On Tue, Feb 16, 2016 at 3:47 PM, Fabian Hueske <fhue...@gmail.com> >>>> wrote: >>>> >>>>> You can use so-called BroadcastSets to send any sufficiently small >>>>> DataSet (such as a computed average) to any other function and use it >>>>> there. >>>>> However, in your case you'll end up with a data flow that branches (at >>>>> the source) and merges again (when the average is send to the second map). >>>>> Such patterns can cause deadlocks and can therefore not be pipelined >>>>> which means that the data before the branch is written to disk and read >>>>> again. >>>>> In your case it might be even better to read the data twice instead of >>>>> reading, writing, and reading it. >>>>> >>>>> Fabian >>>>> >>>>> 2016-02-16 21:15 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>: >>>>> >>>>>> I looked at the samples and I think what you meant is clear, but I >>>>>> didn't find a solution for my need. In my case, I want to use the result >>>>>> from first map operation before I can apply the second map on the >>>>>> *same* data set. For simplicity, let's say I've a bunch of short >>>>>> values represented as my data set. Then I need to find their average, so >>>>>> I >>>>>> use a map and reduce. Then I want to map these short values with another >>>>>> function, but it needs that average computed in the beginning to work >>>>>> correctly. >>>>>> >>>>>> Is this possible without doing multiple reads of the input data to >>>>>> create the same dataset? >>>>>> >>>>>> Thank you, >>>>>> saliya >>>>>> >>>>>> On Tue, Feb 16, 2016 at 12:03 PM, Fabian Hueske <fhue...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Yes, if you implement both maps in a single job, data is read once. >>>>>>> >>>>>>> 2016-02-16 15:53 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>: >>>>>>> >>>>>>>> Fabian, >>>>>>>> >>>>>>>> I've a quick follow-up question on what you suggested. When >>>>>>>> streaming the same data through different maps, were you implying that >>>>>>>> everything goes as single job in Flink, so data read happens only once? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Saliya >>>>>>>> >>>>>>>> On Mon, Feb 15, 2016 at 3:58 PM, Fabian Hueske <fhue...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> It is not possible to "pin" data sets in memory, yet. >>>>>>>>> However, you can stream the same data set through two different >>>>>>>>> mappers at the same time. >>>>>>>>> >>>>>>>>> For instance you can have a job like: >>>>>>>>> >>>>>>>>> /---> Map 1 --> SInk1 >>>>>>>>> Source --< >>>>>>>>> \---> Map 2 --> SInk2 >>>>>>>>> >>>>>>>>> and execute it at once. >>>>>>>>> For that you define you data flow and call execute once after all >>>>>>>>> sinks have been created. >>>>>>>>> >>>>>>>>> Best, Fabian >>>>>>>>> >>>>>>>>> 2016-02-15 21:32 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>: >>>>>>>>> >>>>>>>>>> Fabian, >>>>>>>>>> >>>>>>>>>> count() was just an example. What I would like to do is say run >>>>>>>>>> two map operations on the dataset (ds). Each map will have it's own >>>>>>>>>> reduction, so is there a way to avoid creating two jobs for such >>>>>>>>>> scenario? >>>>>>>>>> >>>>>>>>>> The reason is, reading these binary matrices are expensive. In >>>>>>>>>> our current MPI implementation, I am using memory maps for faster >>>>>>>>>> loading >>>>>>>>>> and reuse. >>>>>>>>>> >>>>>>>>>> Thank you, >>>>>>>>>> Saliya >>>>>>>>>> >>>>>>>>>> On Mon, Feb 15, 2016 at 3:15 PM, Fabian Hueske <fhue...@gmail.com >>>>>>>>>> > wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> it looks like you are executing two distinct Flink jobs. >>>>>>>>>>> DataSet.count() triggers the execution of a new job. If you have >>>>>>>>>>> an execute() call in your program, this will lead to two Flink jobs >>>>>>>>>>> being >>>>>>>>>>> executed. >>>>>>>>>>> It is not possible to share state among these jobs. >>>>>>>>>>> >>>>>>>>>>> Maybe you should add a custom count implementation (using a >>>>>>>>>>> ReduceFunction) which is executed in the same program as the other >>>>>>>>>>> ReduceFunction. >>>>>>>>>>> >>>>>>>>>>> Best, Fabian >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 2016-02-15 21:05 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> I see that an InputFormat's open() and nextRecord() methods get >>>>>>>>>>>> called for each terminal operation on a given dataset using that >>>>>>>>>>>> particular >>>>>>>>>>>> InputFormat. Is it possible to avoid this - possibly using some >>>>>>>>>>>> caching >>>>>>>>>>>> technique in Flink? >>>>>>>>>>>> >>>>>>>>>>>> For example, I've some code like below and I see for both the >>>>>>>>>>>> last two statements (reduce() and count()) the above methods in >>>>>>>>>>>> the input >>>>>>>>>>>> format get called. Btw. this is a custom input format I wrote to >>>>>>>>>>>> represent >>>>>>>>>>>> a binary matrix stored as Short values. >>>>>>>>>>>> >>>>>>>>>>>> ShortMatrixInputFormat smif = new ShortMatrixInputFormat(); >>>>>>>>>>>> >>>>>>>>>>>> DataSet<Short[]> ds = env.createInput(smif, >>>>>>>>>>>> BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO); >>>>>>>>>>>> >>>>>>>>>>>> MapOperator<Short[], DoubleStatistics> op = ds.map(...) >>>>>>>>>>>> >>>>>>>>>>>> *op.reduce(...)* >>>>>>>>>>>> >>>>>>>>>>>> *op.count(...)* >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Thank you, >>>>>>>>>>>> Saliya >>>>>>>>>>>> -- >>>>>>>>>>>> Saliya Ekanayake >>>>>>>>>>>> Ph.D. Candidate | Research Assistant >>>>>>>>>>>> School of Informatics and Computing | Digital Science Center >>>>>>>>>>>> Indiana University, Bloomington >>>>>>>>>>>> Cell 812-391-4914 >>>>>>>>>>>> http://saliya.org >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Saliya Ekanayake >>>>>>>>>> Ph.D. Candidate | Research Assistant >>>>>>>>>> School of Informatics and Computing | Digital Science Center >>>>>>>>>> Indiana University, Bloomington >>>>>>>>>> Cell 812-391-4914 >>>>>>>>>> http://saliya.org >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Saliya Ekanayake >>>>>>>> Ph.D. Candidate | Research Assistant >>>>>>>> School of Informatics and Computing | Digital Science Center >>>>>>>> Indiana University, Bloomington >>>>>>>> Cell 812-391-4914 >>>>>>>> http://saliya.org >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Saliya Ekanayake >>>>>> Ph.D. Candidate | Research Assistant >>>>>> School of Informatics and Computing | Digital Science Center >>>>>> Indiana University, Bloomington >>>>>> Cell 812-391-4914 >>>>>> http://saliya.org >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Saliya Ekanayake >>>> Ph.D. Candidate | Research Assistant >>>> School of Informatics and Computing | Digital Science Center >>>> Indiana University, Bloomington >>>> Cell 812-391-4914 >>>> http://saliya.org >>>> >>> >>> >> >> >> -- >> Saliya Ekanayake >> Ph.D. Candidate | Research Assistant >> School of Informatics and Computing | Digital Science Center >> Indiana University, Bloomington >> Cell 812-391-4914 >> http://saliya.org >> > -- Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center Indiana University, Bloomington Cell 812-391-4914 http://saliya.org