Hi Todd,

Thanks for letting me know.  OK - here is what I am trying to do (it is a
POC for now):

We have an ETL framework which helps with transforming data - parsing
various formats, reading from DBs,
aggregating, sorting, etc..
We do have currently concept of a "cluster" which basically allows input
data (say datafile) be split/partitioned
across several nodes and then one data transformation is executed on those
data. The way it works is that
we can analyze what the transformation does and if it is supposed to
consume data which is spread over cluster, we
execute a slightly modified copy/instance of that transformation on each
node of the cluster where some piece/partition
of data resides. We do not have concept of any "clustered" filesystem -
our partitioned data reside in ordinary files
and there is no metadata layer on top. If we need one single output data
file, then we just perform merge operation. It is
quite limiting as if we need to manipulate such data, we need to do it
piece by piece (on each participating node).

So we essentially do split-transform-merge with merge being optional (can
be part of the transformation directly, so we don't
create temp files).

Here is the idea with HDFS - each of our transformation node becomes
HDFS's datanode. Then if we are to process particular
input data split over several datanodes, then we just instruct our
transformation nodes to read specific block of the file (block/s
which happens to be on the same physical machine as our transformation
node is also datanode) -hence my interest in BlockReader.

I was also considering wrapping our transformation job into map-reduce job
of Hadoop, but that seems a bit limiting and also
we would need to "take" the whole Hadoop stack and give it control over
our jobs. But that still might be the right way.
Also, I need to solve writing of partitioned data - here I would like to
control the block allocation somehow as ideally transformation
running on particular node would be reading locally stored blocks and
outputting data to locally allocated block of HDFS file.

Well, I hope I explained the situation clearly enough.

I will be thankful for any comments.

Regards,

David.




On 9.1.12 6:59 PM, "Todd Lipcon" <t...@cloudera.com> wrote:

>Hi David,
>
>For what it's worth, you should be aware that you're calling internal
>APIs that have no guarantee of stability between versions. I can
>practically guarantee that your code will have to be modified for any
>HDFS upgrade you do. That's why these APIs are undocumented.
>
>Perhaps you can explain what your high-level goal is, here, and we can
>suggest a supported mechanism for achieving it.
>
>-Todd
>
>On Mon, Jan 9, 2012 at 9:56 AM, David Pavlis <david.pav...@javlin.eu>
>wrote:
>> Hi Denny,
>>
>> Thanks a lot. I was able to make my code work.
>>
>> I am posting a small example below - in case somebody in the future has
>> similar need ;-)
>> (not handling replica datablocks).
>>
>> David.
>>
>> 
>>*************************************************************************
>>**
>> public static void main(String args[]){
>>        String filename="/user/hive/warehouse/sample_07/sample_07.csv";
>>        int DATANODE_PORT = 50010;
>>        int NAMENODE_PORT = 8020;
>>        String HOST_IP = "192.168.1.230";
>>
>>        byte[] buf=new byte[1000];
>>
>>
>>        try{
>>
>>                ClientProtocol client= DFSClient.createNamenode(new
>> InetSocketAddress(HOST_IP,NAMENODE_PORT), new Configuration());
>>
>>
>>
>>
>>                LocatedBlocks located=
>>client.getBlockLocations(filename, 0,
>> Long.MAX_VALUE);
>>
>>
>>
>>                for(LocatedBlock block : located.getLocatedBlocks()){
>>                        Socket sock =
>>SocketFactory.getDefault().createSocket();
>>                        InetSocketAddress targetAddr = new
>> InetSocketAddress(HOST_IP,DATANODE_PORT);
>>                        NetUtils.connect(sock, targetAddr, 10000);
>>                        sock.setSoTimeout(10000);
>>
>>
>>                        BlockReader
>>reader=BlockReader.newBlockReader(sock,  filename,
>>                                block.getBlock().getBlockId(),
>>block.getBlockToken(),
>> block.getBlock().getGenerationStamp(), 0,
>>                block.getBlockSize(),
>> 1000);
>>
>>
>>                        int count=0;
>>                        int length;
>>                        while((length=reader.read(buf,0,1000))>0){
>>                                //System.out.print(new
>>String(buf,0,length,"UTF-8"));
>>                                if (length<1000) break;
>>                        }
>>                        reader.close();
>>                        sock.close();
>>                }
>>
>>
>>        }catch(IOException ex){
>>                ex.printStackTrace();
>>        }
>>
>> }
>>
>>
>>
>>
>>
>>
>> 
>>*************************************************************************
>>**
>>
>>
>>
>> From:  Denny Ye <denny...@gmail.com>
>> Reply-To:  <hdfs-user@hadoop.apache.org>
>> Date:  Mon, 9 Jan 2012 16:29:18 +0800
>> To:  <hdfs-user@hadoop.apache.org>
>> Subject:  Re: How-to use DFSClient's BlockReader from Java
>>
>>
>> hi David     Please refer to the method "DFSInputStream#blockSeekTo", it
>> has same purpose with you.
>>
>> 
>>*************************************************************************
>>**
>>        LocatedBlock targetBlock = getBlockAt(target, true);
>>        assert (target==this.pos) : "Wrong postion " + pos + " expect " +
>> target;
>>        long offsetIntoBlock = target - targetBlock.getStartOffset();
>>
>>        DNAddrPair retval = chooseDataNode(targetBlock);
>>        chosenNode = retval.info <http://retval.info>;
>>        InetSocketAddress targetAddr = retval.addr;
>>
>>        try {
>>          s = socketFactory.createSocket();
>>          NetUtils.connect(s, targetAddr, socketTimeout);
>>          s.setSoTimeout(socketTimeout);
>>          Block blk = targetBlock.getBlock();
>>          Token<BlockTokenIdentifier> accessToken =
>> targetBlock.getBlockToken();
>>
>>          blockReader = BlockReader.newBlockReader(s, src,
>> blk.getBlockId(),
>>              accessToken,
>>              blk.getGenerationStamp(),
>>              offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
>>              buffersize, verifyChecksum, clientName);
>>
>>
>> 
>>*************************************************************************
>>**
>>
>>
>> -Regards
>> Denny Ye
>>
>> 2012/1/6 David Pavlis <david.pav...@javlin.eu>
>>
>> Hi,
>>
>> I am relatively new to Hadoop and I am trying to utilize HDFS for own
>> application where I want to take advantage of data partitioning HDFS
>> performs.
>>
>> The idea is that I get list of individual blocks - BlockLocations of
>> particular file and then directly read those (go to individual
>>DataNodes).
>> So far I found org.apache.hadoop.hdfs.DFSClient.BlockReader to be the
>>way
>> to go.
>>
>> However I am struggling with instantiating the BlockReader() class,
>>namely
>> creating the "Token<BlockTokenIdentifier>".
>>
>> Is there an example Java code showing how to access individual blocks of
>> particular file stored on HDFS ?
>>
>> Thanks in advance,
>>
>> David.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>
>
>
>-- 
>Todd Lipcon
>Software Engineer, Cloudera


Reply via email to