Hi Todd, Thanks for letting me know. OK - here is what I am trying to do (it is a POC for now):
We have an ETL framework which helps with transforming data - parsing various formats, reading from DBs, aggregating, sorting, etc.. We do have currently concept of a "cluster" which basically allows input data (say datafile) be split/partitioned across several nodes and then one data transformation is executed on those data. The way it works is that we can analyze what the transformation does and if it is supposed to consume data which is spread over cluster, we execute a slightly modified copy/instance of that transformation on each node of the cluster where some piece/partition of data resides. We do not have concept of any "clustered" filesystem - our partitioned data reside in ordinary files and there is no metadata layer on top. If we need one single output data file, then we just perform merge operation. It is quite limiting as if we need to manipulate such data, we need to do it piece by piece (on each participating node). So we essentially do split-transform-merge with merge being optional (can be part of the transformation directly, so we don't create temp files). Here is the idea with HDFS - each of our transformation node becomes HDFS's datanode. Then if we are to process particular input data split over several datanodes, then we just instruct our transformation nodes to read specific block of the file (block/s which happens to be on the same physical machine as our transformation node is also datanode) -hence my interest in BlockReader. I was also considering wrapping our transformation job into map-reduce job of Hadoop, but that seems a bit limiting and also we would need to "take" the whole Hadoop stack and give it control over our jobs. But that still might be the right way. Also, I need to solve writing of partitioned data - here I would like to control the block allocation somehow as ideally transformation running on particular node would be reading locally stored blocks and outputting data to locally allocated block of HDFS file. Well, I hope I explained the situation clearly enough. I will be thankful for any comments. Regards, David. On 9.1.12 6:59 PM, "Todd Lipcon" <t...@cloudera.com> wrote: >Hi David, > >For what it's worth, you should be aware that you're calling internal >APIs that have no guarantee of stability between versions. I can >practically guarantee that your code will have to be modified for any >HDFS upgrade you do. That's why these APIs are undocumented. > >Perhaps you can explain what your high-level goal is, here, and we can >suggest a supported mechanism for achieving it. > >-Todd > >On Mon, Jan 9, 2012 at 9:56 AM, David Pavlis <david.pav...@javlin.eu> >wrote: >> Hi Denny, >> >> Thanks a lot. I was able to make my code work. >> >> I am posting a small example below - in case somebody in the future has >> similar need ;-) >> (not handling replica datablocks). >> >> David. >> >> >>************************************************************************* >>** >> public static void main(String args[]){ >> String filename="/user/hive/warehouse/sample_07/sample_07.csv"; >> int DATANODE_PORT = 50010; >> int NAMENODE_PORT = 8020; >> String HOST_IP = "192.168.1.230"; >> >> byte[] buf=new byte[1000]; >> >> >> try{ >> >> ClientProtocol client= DFSClient.createNamenode(new >> InetSocketAddress(HOST_IP,NAMENODE_PORT), new Configuration()); >> >> >> >> >> LocatedBlocks located= >>client.getBlockLocations(filename, 0, >> Long.MAX_VALUE); >> >> >> >> for(LocatedBlock block : located.getLocatedBlocks()){ >> Socket sock = >>SocketFactory.getDefault().createSocket(); >> InetSocketAddress targetAddr = new >> InetSocketAddress(HOST_IP,DATANODE_PORT); >> NetUtils.connect(sock, targetAddr, 10000); >> sock.setSoTimeout(10000); >> >> >> BlockReader >>reader=BlockReader.newBlockReader(sock, filename, >> block.getBlock().getBlockId(), >>block.getBlockToken(), >> block.getBlock().getGenerationStamp(), 0, >> block.getBlockSize(), >> 1000); >> >> >> int count=0; >> int length; >> while((length=reader.read(buf,0,1000))>0){ >> //System.out.print(new >>String(buf,0,length,"UTF-8")); >> if (length<1000) break; >> } >> reader.close(); >> sock.close(); >> } >> >> >> }catch(IOException ex){ >> ex.printStackTrace(); >> } >> >> } >> >> >> >> >> >> >> >>************************************************************************* >>** >> >> >> >> From: Denny Ye <denny...@gmail.com> >> Reply-To: <hdfs-user@hadoop.apache.org> >> Date: Mon, 9 Jan 2012 16:29:18 +0800 >> To: <hdfs-user@hadoop.apache.org> >> Subject: Re: How-to use DFSClient's BlockReader from Java >> >> >> hi David Please refer to the method "DFSInputStream#blockSeekTo", it >> has same purpose with you. >> >> >>************************************************************************* >>** >> LocatedBlock targetBlock = getBlockAt(target, true); >> assert (target==this.pos) : "Wrong postion " + pos + " expect " + >> target; >> long offsetIntoBlock = target - targetBlock.getStartOffset(); >> >> DNAddrPair retval = chooseDataNode(targetBlock); >> chosenNode = retval.info <http://retval.info>; >> InetSocketAddress targetAddr = retval.addr; >> >> try { >> s = socketFactory.createSocket(); >> NetUtils.connect(s, targetAddr, socketTimeout); >> s.setSoTimeout(socketTimeout); >> Block blk = targetBlock.getBlock(); >> Token<BlockTokenIdentifier> accessToken = >> targetBlock.getBlockToken(); >> >> blockReader = BlockReader.newBlockReader(s, src, >> blk.getBlockId(), >> accessToken, >> blk.getGenerationStamp(), >> offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock, >> buffersize, verifyChecksum, clientName); >> >> >> >>************************************************************************* >>** >> >> >> -Regards >> Denny Ye >> >> 2012/1/6 David Pavlis <david.pav...@javlin.eu> >> >> Hi, >> >> I am relatively new to Hadoop and I am trying to utilize HDFS for own >> application where I want to take advantage of data partitioning HDFS >> performs. >> >> The idea is that I get list of individual blocks - BlockLocations of >> particular file and then directly read those (go to individual >>DataNodes). >> So far I found org.apache.hadoop.hdfs.DFSClient.BlockReader to be the >>way >> to go. >> >> However I am struggling with instantiating the BlockReader() class, >>namely >> creating the "Token<BlockTokenIdentifier>". >> >> Is there an example Java code showing how to access individual blocks of >> particular file stored on HDFS ? >> >> Thanks in advance, >> >> David. >> >> >> >> >> >> >> >> >> >> >> > > > >-- >Todd Lipcon >Software Engineer, Cloudera