Hi Fabian,

Thank you, I think I've a better picture of this now. I think if I set
DataSource tasks (a config option I guess?) equal to input splits that
would do as I expected.

Yes, will  keep it at the same place across nodes.

Thank you,
Saliya

On Mon, Jan 25, 2016 at 10:59 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Saliya,
>
> the number of parallel splits is controlled by the number of input splits
> returned by the InputFormat.createInputSplits() method. This method
> receives a parameter minNumSplits with is equal to the number of DataSource
> tasks.
>
> Flink handles input splits a bit different from Hadoop. In Hadoop, each
> input split corresponds to one map task. In Flink you have a fixed number
> of DataSource tasks and input splits are lazily distributed to source
> tasks. If you have more splits than tasks, a data source requests a new
> split when it is done with its last split until all splits are assigned. If
> your createInputSplits method returns less splits than minNumSplits, some
> source tasks won't receive a split.
>
> If you read files from a local FS in a distributed (multi-node) setup, you
> have to be careful. Each node must have an exact copy of the data at
> exactly the same location. Otherwise, it won't work.
>
> Best, Fabian
>
> 2016-01-25 16:46 GMT+01:00 Saliya Ekanayake <esal...@gmail.com>:
>
>> Hi Fabian,
>>
>> Thank you for the information.
>>
>> So, is there a way I can get the task number within the InputFormat? That
>> way I can use it to offset the block of rows.
>>
>> The file size is large to fit in a single process' memory, so the current
>> setup in MPI and Hadoop use the rank (task number) info to memory map the
>> corresponding block of rows. In our experiments, we found this approach to
>> be the fastest because of the memory mapping rather buffered reads. Also,
>> the file is replicated across nodes and the reading (mapping) happens only
>> once.
>>
>> Thank you,
>> Saliya
>>
>> On Mon, Jan 25, 2016 at 4:38 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Saliya,
>>>
>>> yes that is possible, however the requirements for reading a binary file
>>> from local fs are basically the same as for reading it from HDSF.
>>> In order to be able to start reading different sections of a file in
>>> parallel, you need to know the different starting positions. This can be
>>> done by either having fixed offsets for blocks or adding some meta
>>> information for the block start positions. InputFormats can divide the work
>>> of reading a file by generating multiple input splits. Each input split
>>> defines the file, the start offset and the length to read.
>>>
>>> However, are you sure that reading a file in parallel will be faster
>>> than reading it sequentially?
>>> At least for HDDs, IO-bound workloads with "random" reading patterns are
>>> usually much slower than sequential reads.
>>>
>>> Cheers, Fabian
>>>
>>> 2016-01-24 19:10 GMT+01:00 Suneel Marthi <suneel.mar...@gmail.com>:
>>>
>>>> There should be a env.readbinaryfile() IIRC, check that
>>>>
>>>> Sent from my iPhone
>>>>
>>>> On Jan 24, 2016, at 12:44 PM, Saliya Ekanayake <esal...@gmail.com>
>>>> wrote:
>>>>
>>>> Thank you for the response on this, but I still have some doubt.
>>>> Simply, the files is not in HDFS, it's in local storage. In Flink if I run
>>>> the program with, say 5 parallel tasks, what I would like to do is to read
>>>> a block of rows in each task as shown below. I looked at the simple CSV
>>>> reader and was thinking to create a custom one like that, but I would need
>>>> to know the task number to read the relevant block. Is this possible?
>>>>
>>>> <image.png>
>>>>
>>>> Thank you,
>>>> Saliya
>>>>
>>>> On Wed, Jan 20, 2016 at 12:47 PM, Till Rohrmann <trohrm...@apache.org>
>>>> wrote:
>>>>
>>>>> With readHadoopFile you can use all of Hadoop’s FileInputFormats and
>>>>> thus you can also do everything with Flink, what you can do with Hadoop.
>>>>> Simply take the same Hadoop FileInputFormat which you would take for
>>>>> your MapReduce job.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>> ​
>>>>>
>>>>> On Wed, Jan 20, 2016 at 3:16 PM, Saliya Ekanayake <esal...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thank you, I saw the readHadoopFile, but I was not sure how it can be
>>>>>> used to the following, which is what I need. The logic of the code 
>>>>>> requires
>>>>>> an entire row to operate on, so in our current implementation with P 
>>>>>> tasks,
>>>>>> each of them will read a rectangular block of (N/P) x N from the matrix. 
>>>>>> Is
>>>>>> this possible with readHadoopFile? Also, the file may not be in hdfs, so 
>>>>>> is
>>>>>> it possible to refer to local disk in doing this?
>>>>>>
>>>>>> Thank you
>>>>>>
>>>>>> On Wed, Jan 20, 2016 at 1:31 AM, Chiwan Park <chiwanp...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Saliya,
>>>>>>>
>>>>>>> You can use the input format from Hadoop in Flink by using
>>>>>>> readHadoopFile method. The method returns a dataset which of type is
>>>>>>> Tuple2<Key, Value>. Note that MapReduce equivalent transformation in 
>>>>>>> Flink
>>>>>>> is composed of map, groupBy, and reduceGroup.
>>>>>>>
>>>>>>> > On Jan 20, 2016, at 3:04 PM, Suneel Marthi <smar...@apache.org>
>>>>>>> wrote:
>>>>>>> >
>>>>>>> > Guess u r looking for Flink's BinaryInputFormat to be able to read
>>>>>>> blocks of data from HDFS
>>>>>>> >
>>>>>>> >
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/api/java/org/apache/flink/api/common/io/BinaryInputFormat.html
>>>>>>> >
>>>>>>> > On Wed, Jan 20, 2016 at 12:45 AM, Saliya Ekanayake <
>>>>>>> esal...@gmail.com> wrote:
>>>>>>> > Hi,
>>>>>>> >
>>>>>>> > I am trying to use Flink perform a parallel batch operation on a
>>>>>>> NxN matrix represented as a binary file. Each (i,j) element is stored 
>>>>>>> as a
>>>>>>> Java Short value. In a typical MapReduce programming with Hadoop, each 
>>>>>>> map
>>>>>>> task will read a block of rows of this matrix and perform computation on
>>>>>>> that block and emit result to the reducer.
>>>>>>> >
>>>>>>> > How is this done in Flink? I am new to Flink and couldn't find a
>>>>>>> binary reader so far. Any help is greatly appreciated.
>>>>>>> >
>>>>>>> > Thank you,
>>>>>>> > Saliya
>>>>>>> >
>>>>>>> > --
>>>>>>> > Saliya Ekanayake
>>>>>>> > Ph.D. Candidate | Research Assistant
>>>>>>> > School of Informatics and Computing | Digital Science Center
>>>>>>> > Indiana University, Bloomington
>>>>>>> > Cell 812-391-4914
>>>>>>> > http://saliya.org
>>>>>>> >
>>>>>>>
>>>>>>> Regards,
>>>>>>> Chiwan Park
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Saliya Ekanayake
>>>>>> Ph.D. Candidate | Research Assistant
>>>>>> School of Informatics and Computing | Digital Science Center
>>>>>> Indiana University, Bloomington
>>>>>> Cell 812-391-4914
>>>>>> http://saliya.org
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Saliya Ekanayake
>>>> Ph.D. Candidate | Research Assistant
>>>> School of Informatics and Computing | Digital Science Center
>>>> Indiana University, Bloomington
>>>> Cell 812-391-4914
>>>> http://saliya.org
>>>>
>>>>
>>>
>>
>>
>> --
>> Saliya Ekanayake
>> Ph.D. Candidate | Research Assistant
>> School of Informatics and Computing | Digital Science Center
>> Indiana University, Bloomington
>> Cell 812-391-4914
>> http://saliya.org
>>
>
>


-- 
Saliya Ekanayake
Ph.D. Candidate | Research Assistant
School of Informatics and Computing | Digital Science Center
Indiana University, Bloomington
Cell 812-391-4914
http://saliya.org

Reply via email to