Hi David,

I'd definitely recommend using MapReduce for this. What you've
described is essentially identical to MR.

Otherwise, you should use the public API
FileSystem.getFileBlockLocations(), and then read the host names out
of the returned BlockLocation struct. Then just use a normal
FileSystem open call from that node - it will automatically pick the
local replica for you without any further work.

-Todd

On Mon, Jan 9, 2012 at 12:01 PM, David Pavlis <david.pav...@javlin.eu> wrote:
> Hi Todd,
>
> Thanks for letting me know.  OK - here is what I am trying to do (it is a
> POC for now):
>
> We have an ETL framework which helps with transforming data - parsing
> various formats, reading from DBs,
> aggregating, sorting, etc..
> We do have currently concept of a "cluster" which basically allows input
> data (say datafile) be split/partitioned
> across several nodes and then one data transformation is executed on those
> data. The way it works is that
> we can analyze what the transformation does and if it is supposed to
> consume data which is spread over cluster, we
> execute a slightly modified copy/instance of that transformation on each
> node of the cluster where some piece/partition
> of data resides. We do not have concept of any "clustered" filesystem -
> our partitioned data reside in ordinary files
> and there is no metadata layer on top. If we need one single output data
> file, then we just perform merge operation. It is
> quite limiting as if we need to manipulate such data, we need to do it
> piece by piece (on each participating node).
>
> So we essentially do split-transform-merge with merge being optional (can
> be part of the transformation directly, so we don't
> create temp files).
>
> Here is the idea with HDFS - each of our transformation node becomes
> HDFS's datanode. Then if we are to process particular
> input data split over several datanodes, then we just instruct our
> transformation nodes to read specific block of the file (block/s
> which happens to be on the same physical machine as our transformation
> node is also datanode) -hence my interest in BlockReader.
>
> I was also considering wrapping our transformation job into map-reduce job
> of Hadoop, but that seems a bit limiting and also
> we would need to "take" the whole Hadoop stack and give it control over
> our jobs. But that still might be the right way.
> Also, I need to solve writing of partitioned data - here I would like to
> control the block allocation somehow as ideally transformation
> running on particular node would be reading locally stored blocks and
> outputting data to locally allocated block of HDFS file.
>
> Well, I hope I explained the situation clearly enough.
>
> I will be thankful for any comments.
>
> Regards,
>
> David.
>
>
>
>
> On 9.1.12 6:59 PM, "Todd Lipcon" <t...@cloudera.com> wrote:
>
>>Hi David,
>>
>>For what it's worth, you should be aware that you're calling internal
>>APIs that have no guarantee of stability between versions. I can
>>practically guarantee that your code will have to be modified for any
>>HDFS upgrade you do. That's why these APIs are undocumented.
>>
>>Perhaps you can explain what your high-level goal is, here, and we can
>>suggest a supported mechanism for achieving it.
>>
>>-Todd
>>
>>On Mon, Jan 9, 2012 at 9:56 AM, David Pavlis <david.pav...@javlin.eu>
>>wrote:
>>> Hi Denny,
>>>
>>> Thanks a lot. I was able to make my code work.
>>>
>>> I am posting a small example below - in case somebody in the future has
>>> similar need ;-)
>>> (not handling replica datablocks).
>>>
>>> David.
>>>
>>>
>>>*************************************************************************
>>>**
>>> public static void main(String args[]){
>>>        String filename="/user/hive/warehouse/sample_07/sample_07.csv";
>>>        int DATANODE_PORT = 50010;
>>>        int NAMENODE_PORT = 8020;
>>>        String HOST_IP = "192.168.1.230";
>>>
>>>        byte[] buf=new byte[1000];
>>>
>>>
>>>        try{
>>>
>>>                ClientProtocol client= DFSClient.createNamenode(new
>>> InetSocketAddress(HOST_IP,NAMENODE_PORT), new Configuration());
>>>
>>>
>>>
>>>
>>>                LocatedBlocks located=
>>>client.getBlockLocations(filename, 0,
>>> Long.MAX_VALUE);
>>>
>>>
>>>
>>>                for(LocatedBlock block : located.getLocatedBlocks()){
>>>                        Socket sock =
>>>SocketFactory.getDefault().createSocket();
>>>                        InetSocketAddress targetAddr = new
>>> InetSocketAddress(HOST_IP,DATANODE_PORT);
>>>                        NetUtils.connect(sock, targetAddr, 10000);
>>>                        sock.setSoTimeout(10000);
>>>
>>>
>>>                        BlockReader
>>>reader=BlockReader.newBlockReader(sock,  filename,
>>>                                block.getBlock().getBlockId(),
>>>block.getBlockToken(),
>>> block.getBlock().getGenerationStamp(), 0,
>>>                block.getBlockSize(),
>>> 1000);
>>>
>>>
>>>                        int count=0;
>>>                        int length;
>>>                        while((length=reader.read(buf,0,1000))>0){
>>>                                //System.out.print(new
>>>String(buf,0,length,"UTF-8"));
>>>                                if (length<1000) break;
>>>                        }
>>>                        reader.close();
>>>                        sock.close();
>>>                }
>>>
>>>
>>>        }catch(IOException ex){
>>>                ex.printStackTrace();
>>>        }
>>>
>>> }
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>*************************************************************************
>>>**
>>>
>>>
>>>
>>> From:  Denny Ye <denny...@gmail.com>
>>> Reply-To:  <hdfs-user@hadoop.apache.org>
>>> Date:  Mon, 9 Jan 2012 16:29:18 +0800
>>> To:  <hdfs-user@hadoop.apache.org>
>>> Subject:  Re: How-to use DFSClient's BlockReader from Java
>>>
>>>
>>> hi David     Please refer to the method "DFSInputStream#blockSeekTo", it
>>> has same purpose with you.
>>>
>>>
>>>*************************************************************************
>>>**
>>>        LocatedBlock targetBlock = getBlockAt(target, true);
>>>        assert (target==this.pos) : "Wrong postion " + pos + " expect " +
>>> target;
>>>        long offsetIntoBlock = target - targetBlock.getStartOffset();
>>>
>>>        DNAddrPair retval = chooseDataNode(targetBlock);
>>>        chosenNode = retval.info <http://retval.info>;
>>>        InetSocketAddress targetAddr = retval.addr;
>>>
>>>        try {
>>>          s = socketFactory.createSocket();
>>>          NetUtils.connect(s, targetAddr, socketTimeout);
>>>          s.setSoTimeout(socketTimeout);
>>>          Block blk = targetBlock.getBlock();
>>>          Token<BlockTokenIdentifier> accessToken =
>>> targetBlock.getBlockToken();
>>>
>>>          blockReader = BlockReader.newBlockReader(s, src,
>>> blk.getBlockId(),
>>>              accessToken,
>>>              blk.getGenerationStamp(),
>>>              offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
>>>              buffersize, verifyChecksum, clientName);
>>>
>>>
>>>
>>>*************************************************************************
>>>**
>>>
>>>
>>> -Regards
>>> Denny Ye
>>>
>>> 2012/1/6 David Pavlis <david.pav...@javlin.eu>
>>>
>>> Hi,
>>>
>>> I am relatively new to Hadoop and I am trying to utilize HDFS for own
>>> application where I want to take advantage of data partitioning HDFS
>>> performs.
>>>
>>> The idea is that I get list of individual blocks - BlockLocations of
>>> particular file and then directly read those (go to individual
>>>DataNodes).
>>> So far I found org.apache.hadoop.hdfs.DFSClient.BlockReader to be the
>>>way
>>> to go.
>>>
>>> However I am struggling with instantiating the BlockReader() class,
>>>namely
>>> creating the "Token<BlockTokenIdentifier>".
>>>
>>> Is there an example Java code showing how to access individual blocks of
>>> particular file stored on HDFS ?
>>>
>>> Thanks in advance,
>>>
>>> David.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>>
>>
>>--
>>Todd Lipcon
>>Software Engineer, Cloudera
>
>



-- 
Todd Lipcon
Software Engineer, Cloudera

Reply via email to