Hi Fabian, Thank you, I think I've a better picture of this now. I think if I set DataSource tasks (a config option I guess?) equal to input splits that would do as I expected.
Yes, will keep it at the same place across nodes. Thank you, Saliya On Mon, Jan 25, 2016 at 10:59 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Saliya, > > the number of parallel splits is controlled by the number of input splits > returned by the InputFormat.createInputSplits() method. This method > receives a parameter minNumSplits with is equal to the number of DataSource > tasks. > > Flink handles input splits a bit different from Hadoop. In Hadoop, each > input split corresponds to one map task. In Flink you have a fixed number > of DataSource tasks and input splits are lazily distributed to source > tasks. If you have more splits than tasks, a data source requests a new > split when it is done with its last split until all splits are assigned. If > your createInputSplits method returns less splits than minNumSplits, some > source tasks won't receive a split. > > If you read files from a local FS in a distributed (multi-node) setup, you > have to be careful. Each node must have an exact copy of the data at > exactly the same location. Otherwise, it won't work. > > Best, Fabian > > 2016-01-25 16:46 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>: > >> Hi Fabian, >> >> Thank you for the information. >> >> So, is there a way I can get the task number within the InputFormat? That >> way I can use it to offset the block of rows. >> >> The file size is large to fit in a single process' memory, so the current >> setup in MPI and Hadoop use the rank (task number) info to memory map the >> corresponding block of rows. In our experiments, we found this approach to >> be the fastest because of the memory mapping rather buffered reads. Also, >> the file is replicated across nodes and the reading (mapping) happens only >> once. >> >> Thank you, >> Saliya >> >> On Mon, Jan 25, 2016 at 4:38 AM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Saliya, >>> >>> yes that is possible, however the requirements for reading a binary file >>> from local fs are basically the same as for reading it from HDSF. >>> In order to be able to start reading different sections of a file in >>> parallel, you need to know the different starting positions. This can be >>> done by either having fixed offsets for blocks or adding some meta >>> information for the block start positions. InputFormats can divide the work >>> of reading a file by generating multiple input splits. Each input split >>> defines the file, the start offset and the length to read. >>> >>> However, are you sure that reading a file in parallel will be faster >>> than reading it sequentially? >>> At least for HDDs, IO-bound workloads with "random" reading patterns are >>> usually much slower than sequential reads. >>> >>> Cheers, Fabian >>> >>> 2016-01-24 19:10 GMT+01:00 Suneel Marthi <suneel.mar...@gmail.com>: >>> >>>> There should be a env.readbinaryfile() IIRC, check that >>>> >>>> Sent from my iPhone >>>> >>>> On Jan 24, 2016, at 12:44 PM, Saliya Ekanayake <esal...@gmail.com> >>>> wrote: >>>> >>>> Thank you for the response on this, but I still have some doubt. >>>> Simply, the files is not in HDFS, it's in local storage. In Flink if I run >>>> the program with, say 5 parallel tasks, what I would like to do is to read >>>> a block of rows in each task as shown below. I looked at the simple CSV >>>> reader and was thinking to create a custom one like that, but I would need >>>> to know the task number to read the relevant block. Is this possible? >>>> >>>> <image.png> >>>> >>>> Thank you, >>>> Saliya >>>> >>>> On Wed, Jan 20, 2016 at 12:47 PM, Till Rohrmann <trohrm...@apache.org> >>>> wrote: >>>> >>>>> With readHadoopFile you can use all of Hadoop’s FileInputFormats and >>>>> thus you can also do everything with Flink, what you can do with Hadoop. >>>>> Simply take the same Hadoop FileInputFormat which you would take for >>>>> your MapReduce job. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> >>>>> On Wed, Jan 20, 2016 at 3:16 PM, Saliya Ekanayake <esal...@gmail.com> >>>>> wrote: >>>>> >>>>>> Thank you, I saw the readHadoopFile, but I was not sure how it can be >>>>>> used to the following, which is what I need. The logic of the code >>>>>> requires >>>>>> an entire row to operate on, so in our current implementation with P >>>>>> tasks, >>>>>> each of them will read a rectangular block of (N/P) x N from the matrix. >>>>>> Is >>>>>> this possible with readHadoopFile? Also, the file may not be in hdfs, so >>>>>> is >>>>>> it possible to refer to local disk in doing this? >>>>>> >>>>>> Thank you >>>>>> >>>>>> On Wed, Jan 20, 2016 at 1:31 AM, Chiwan Park <chiwanp...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi Saliya, >>>>>>> >>>>>>> You can use the input format from Hadoop in Flink by using >>>>>>> readHadoopFile method. The method returns a dataset which of type is >>>>>>> Tuple2<Key, Value>. Note that MapReduce equivalent transformation in >>>>>>> Flink >>>>>>> is composed of map, groupBy, and reduceGroup. >>>>>>> >>>>>>> > On Jan 20, 2016, at 3:04 PM, Suneel Marthi <smar...@apache.org> >>>>>>> wrote: >>>>>>> > >>>>>>> > Guess u r looking for Flink's BinaryInputFormat to be able to read >>>>>>> blocks of data from HDFS >>>>>>> > >>>>>>> > >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java/org/apache/flink/api/common/io/BinaryInputFormat.html >>>>>>> > >>>>>>> > On Wed, Jan 20, 2016 at 12:45 AM, Saliya Ekanayake < >>>>>>> esal...@gmail.com> wrote: >>>>>>> > Hi, >>>>>>> > >>>>>>> > I am trying to use Flink perform a parallel batch operation on a >>>>>>> NxN matrix represented as a binary file. Each (i,j) element is stored >>>>>>> as a >>>>>>> Java Short value. In a typical MapReduce programming with Hadoop, each >>>>>>> map >>>>>>> task will read a block of rows of this matrix and perform computation on >>>>>>> that block and emit result to the reducer. >>>>>>> > >>>>>>> > How is this done in Flink? I am new to Flink and couldn't find a >>>>>>> binary reader so far. Any help is greatly appreciated. >>>>>>> > >>>>>>> > Thank you, >>>>>>> > Saliya >>>>>>> > >>>>>>> > -- >>>>>>> > Saliya Ekanayake >>>>>>> > Ph.D. Candidate | Research Assistant >>>>>>> > School of Informatics and Computing | Digital Science Center >>>>>>> > Indiana University, Bloomington >>>>>>> > Cell 812-391-4914 >>>>>>> > http://saliya.org >>>>>>> > >>>>>>> >>>>>>> Regards, >>>>>>> Chiwan Park >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Saliya Ekanayake >>>>>> Ph.D. Candidate | Research Assistant >>>>>> School of Informatics and Computing | Digital Science Center >>>>>> Indiana University, Bloomington >>>>>> Cell 812-391-4914 >>>>>> http://saliya.org >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Saliya Ekanayake >>>> Ph.D. Candidate | Research Assistant >>>> School of Informatics and Computing | Digital Science Center >>>> Indiana University, Bloomington >>>> Cell 812-391-4914 >>>> http://saliya.org >>>> >>>> >>> >> >> >> -- >> Saliya Ekanayake >> Ph.D. Candidate | Research Assistant >> School of Informatics and Computing | Digital Science Center >> Indiana University, Bloomington >> Cell 812-391-4914 >> http://saliya.org >> > > -- Saliya Ekanayake Ph.D. Candidate | Research Assistant School of Informatics and Computing | Digital Science Center Indiana University, Bloomington Cell 812-391-4914 http://saliya.org