Re: Using MapReduce to do table comparing.

2008-07-28 Thread Michael Lee
It is hard to quantify b/c I have severals jobs doing this concurrently 
and the timing is mixed with the extraction from the sources (MS-SQL) 
before loading into UDB server.  I say it takes total of 1.5 hours to 
extract, load, delta calculation and update archive for 130 million 
records. 

The physical server we are using is Sun 490 and I believe it has 8 CPUs 
and 32 GB of memory.  The storage is EMC.  Some big tables we utilizes 
MDC ( multi-dimensional clustering ).  I think you can get comparable 
number with cheaper Linux server but we did not have choice b/c the 
company only certified Sun box at the time of purchase ( that's 2 years 
ago )


BTW - when you do comparison in database, watch out for null comparison.

Also

   ( null  1 ) or ( 2  1 )

is FALSE.

BTW - if you are going to do this - get the *fastest* hard-drive and 
RAID0 to get the maximum throughput.  It is an IO-bound problem.


Amber wrote:

I agree with you this is an acceptable method if time spent on exporting data 
from RDBM, importing file into HDFS and then importing data into RDBM again is 
considered as well, but this is an single-process/thread method. BTW, can you 
tell me how long does it take your method to process those 130 million rows, 
how much is the data volume, and how powerful are your physical computers, 
thanks a lot!
--
From: Michael Lee [EMAIL PROTECTED]
Sent: Thursday, July 24, 2008 11:51 AM
To: core-user@hadoop.apache.org
Subject: Re: Using MapReduce to do table comparing.

  

Amber wrote:


We have a 10 million row table exported from AS400 mainframe every day, the 
table is exported as a csv text file, which is about 30GB in size, then the csv 
file is imported into a RDBMS table which is dropped and recreated every day. 
Now we want to find how many rows are updated during each export-import 
interval, the table has a primary key, so deletes and inserts can be found 
using RDBMS joins quickly, but we must do a column to column comparing in order 
to find the difference between rows ( about 90%) with the same primary keys. 
Our goal is to find a comparing process which takes no more than 10 minutes 
with a 4-node cluster, each server in which has 4 4-core 3.0 GHz CPUs, 8GB 
memory  and a  300G local  RAID5 array.

Bellow is our current solution:
The old data is kept in the RDBMS with index created on the primary key, 
the new data is imported into HDFS as the input file of our Map-Reduce job. 
Every map task connects to the RDBMS database, and selects old data from it for 
every row, map tasks will generate outputs if differences are found, and there 
are no reduce tasks.

As you can see, with the number of concurrent map tasks increasing, the RDBMS 
database will become the bottleneck, so we want to kick off the RDBMS, but we 
have no idea about how to retrieve the old row with a given key quickly from 
HDFS files, any suggestion is welcome.
  
10 million is not bad.  I do this all the time in UDB 8.1 - multiple key 
columns and multiple value columns and calculate delta's - insert, 
delete and update.


What other has suggested works ( I tried very crude version of what 
James Moore suggested in Hadoop with 70+ million records ) but you have 
to remember there are other costs ( dumping out files, putting into 
HDFS, etc. ).  It might be better if you process straight in database or 
do a straight file processing. Also the key is avoiding transaction.


If you are doing outside of database...

you have 'old.csv' and 'new.csv' and sorted by primary keys ( when you 
extract make sure you do order by ).  In your application, you open two 
file handlers and read one line at time.  Create the keys.  If the keys 
are the same, you compare two strings if they are the same.  If key is 
not the same, you have to find out natural orders - it can be insert or 
delete.  Once you decide, you read another line ( if insert/delete - you 
only read one line from one of the file )


Here is the pseudo code

oldFile = File.new(oldFilename, r)
newFile = File.new(newFilename, r)
outFile = File.new(outFilename, w)

oldLine = oldFile.gets
newLine = newFile.gets

while ( true )
{
   oldKey = convertToKey(oldLine)
   newKey = convertToKey(newLine)
  
   if ( oldKey  newKey )

   {
  ## it is deletion
  outFile.puts oldLine + , + DELETE;  
  oldLine = oldFile.gets

   }
   elsif ( oldKey  newKey )
   {
  ## it is insert
  outFile.puts newLine + , + INSERT;
  newLine = newFile.gets
   }
   else
   {
  ## compare
  outFile.puts newLine + , + UPDATE if ( oldLine != newLine )

  oldLine = oldFile.gets
  newLine = newFile.gets
   }
}

Okay - I skipped the part if eof is reached for each file but you get 
the point.


If the both old and new are in database, you can open two databases 
connections and just do the process without dumping files.


I journal about 130 million rows every day for quant financial database...










Re: Using MapReduce to do table comparing.

2008-07-25 Thread James Moore
On Thu, Jul 24, 2008 at 8:03 AM, Amber [EMAIL PROTECTED] wrote:
 Yes, I think this is the simplest method , but there are problems too:

 1. The reduce stage wouldn't begin until the map stage ends, by when we have 
 done a two table scanning, and the comparing will take almost the same time, 
 because about 90% of intermediate key, value pairs will have two values and 
 different keys, if I can specify a number n, by when there are n intermediate 
 pairs with the same key the reduce tasks start, that will be better. In my 
 case I will set the magic number to 2.

I don't think I understood this completely, but I'll try to respond.

First, I think you're going to be doing something like two full table
scans in any case.  Whether it's in an RDBMS or in Hadoop, you need to
read the complete dataset for both day1 and day2.  (Or at least that's
how I interpreted your original mail - you're not trying to keep
deltas over N days, just doing a delta for yesterday/today from
scratch every time)  You could possibly speed this up by keeping some
kind of parsed data in hadoop for previous days, rather than just
text, but I wouldn't do this as my first solution.

It seems like starting the reducers before the maps are done isn't
going to buy you anything.  The same amount of total work needs to be
done; when the work starts doesn't matter much.  In this case, I'm
guessing that you're going to have a setup where (total number of
maps) == (total number of reducers) == 4 * (number of 4-core
machines).

In any case, I'd say you should do some experiments with the most
simple solution you can come up with.  Your problem seems simple
enough that just banging out some throwaway experimental code is going
to a) not take very long, and b) tell you quite a bit about how your
particular solution is going to behave in the real world.


 2. I am not sure about how Hadoop stores intermediate key, value pairs, we 
 would not afford it as data volume increasing if it is kept in memory.

Hadoop is definitely prepared for very large numbers of intermediate
key/value pairs - that's pretty much the normal case for hadoop jobs.
It'll stream to/from disc as necessary.  Take a look at combiners as
well - they may buy you something.

-- 
James Moore | [EMAIL PROTECTED]
Ruby and Ruby on Rails consulting
blog.restphone.com


Re: Using MapReduce to do table comparing.

2008-07-24 Thread Amber
I agree with you this is an acceptable method if time spent on exporting data 
from RDBM, importing file into HDFS and then importing data into RDBM again is 
considered as well, but this is an single-process/thread method. BTW, can you 
tell me how long does it take your method to process those 130 million rows, 
how much is the data volume, and how powerful are your physical computers, 
thanks a lot!
--
From: Michael Lee [EMAIL PROTECTED]
Sent: Thursday, July 24, 2008 11:51 AM
To: core-user@hadoop.apache.org
Subject: Re: Using MapReduce to do table comparing.

 Amber wrote:
 We have a 10 million row table exported from AS400 mainframe every day, the 
 table is exported as a csv text file, which is about 30GB in size, then the 
 csv file is imported into a RDBMS table which is dropped and recreated every 
 day. Now we want to find how many rows are updated during each export-import 
 interval, the table has a primary key, so deletes and inserts can be found 
 using RDBMS joins quickly, but we must do a column to column comparing in 
 order to find the difference between rows ( about 90%) with the same primary 
 keys. Our goal is to find a comparing process which takes no more than 10 
 minutes with a 4-node cluster, each server in which has 4 4-core 3.0 GHz 
 CPUs, 8GB memory  and a  300G local  RAID5 array.

 Bellow is our current solution:
 The old data is kept in the RDBMS with index created on the primary key, 
 the new data is imported into HDFS as the input file of our Map-Reduce job. 
 Every map task connects to the RDBMS database, and selects old data from it 
 for every row, map tasks will generate outputs if differences are found, and 
 there are no reduce tasks.

 As you can see, with the number of concurrent map tasks increasing, the 
 RDBMS database will become the bottleneck, so we want to kick off the RDBMS, 
 but we have no idea about how to retrieve the old row with a given key 
 quickly from HDFS files, any suggestion is welcome.
 10 million is not bad.  I do this all the time in UDB 8.1 - multiple key 
 columns and multiple value columns and calculate delta's - insert, 
 delete and update.
 
 What other has suggested works ( I tried very crude version of what 
 James Moore suggested in Hadoop with 70+ million records ) but you have 
 to remember there are other costs ( dumping out files, putting into 
 HDFS, etc. ).  It might be better if you process straight in database or 
 do a straight file processing. Also the key is avoiding transaction.
 
 If you are doing outside of database...
 
 you have 'old.csv' and 'new.csv' and sorted by primary keys ( when you 
 extract make sure you do order by ).  In your application, you open two 
 file handlers and read one line at time.  Create the keys.  If the keys 
 are the same, you compare two strings if they are the same.  If key is 
 not the same, you have to find out natural orders - it can be insert or 
 delete.  Once you decide, you read another line ( if insert/delete - you 
 only read one line from one of the file )
 
 Here is the pseudo code
 
 oldFile = File.new(oldFilename, r)
 newFile = File.new(newFilename, r)
 outFile = File.new(outFilename, w)
 
 oldLine = oldFile.gets
 newLine = newFile.gets
 
 while ( true )
 {
oldKey = convertToKey(oldLine)
newKey = convertToKey(newLine)
   
if ( oldKey  newKey )
{
   ## it is deletion
   outFile.puts oldLine + , + DELETE;  
   oldLine = oldFile.gets
}
elsif ( oldKey  newKey )
{
   ## it is insert
   outFile.puts newLine + , + INSERT;
   newLine = newFile.gets
}
else
{
   ## compare
   outFile.puts newLine + , + UPDATE if ( oldLine != newLine )
 
   oldLine = oldFile.gets
   newLine = newFile.gets
}
 }
 
 Okay - I skipped the part if eof is reached for each file but you get 
 the point.
 
 If the both old and new are in database, you can open two databases 
 connections and just do the process without dumping files.
 
 I journal about 130 million rows every day for quant financial database...
 
 
 
 
 
 

Re: Using MapReduce to do table comparing.

2008-07-24 Thread Amber
Yes, I think this is the simplest method , but there are problems too:

1. The reduce stage wouldn't begin until the map stage ends, by when we have 
done a two table scanning, and the comparing will take almost the same time, 
because about 90% of intermediate key, value pairs will have two values and 
different keys, if I can specify a number n, by when there are n intermediate 
pairs with the same key the reduce tasks start, that will be better. In my case 
I will set the magic number to 2.

2. I am not sure about how Hadoop stores intermediate key, value pairs, we 
would not afford it as data volume increasing if it is kept in memory.

--
From: James Moore [EMAIL PROTECTED]
Sent: Thursday, July 24, 2008 1:12 AM
To: core-user@hadoop.apache.org
Subject: Re: Using MapReduce to do table comparing.

 On Wed, Jul 23, 2008 at 7:33 AM, Amber [EMAIL PROTECTED] wrote:
 We have a 10 million row table exported from AS400 mainframe every day, the 
 table is exported as a csv text file, which is about 30GB in size, then the 
 csv file is imported into a RDBMS table which is dropped and recreated every 
 day. Now we want to find how many rows are updated during each export-import 
 interval, the table has a primary key, so deletes and inserts can be found 
 using RDBMS joins quickly, but we must do a column to column comparing in 
 order to find the difference between rows ( about 90%) with the same primary 
 keys. Our goal is to find a comparing process which takes no more than 10 
 minutes with a 4-node cluster, each server in which has 4 4-core 3.0 GHz 
 CPUs, 8GB memory  and a  300G local  RAID5 array.

 Bellow is our current solution:
The old data is kept in the RDBMS with index created on the primary key, 
 the new data is imported into HDFS as the input file of our Map-Reduce job. 
 Every map task connects to the RDBMS database, and selects old data from it 
 for every row, map tasks will generate outputs if differences are found, and 
 there are no reduce tasks.

 As you can see, with the number of concurrent map tasks increasing, the 
 RDBMS database will become the bottleneck, so we want to kick off the RDBMS, 
 but we have no idea about how to retrieve the old row with a given key 
 quickly from HDFS files, any suggestion is welcome.
 
 Think of map/reduce as giving you a kind of key/value lookup for free
 - it just falls out of how the system works.
 
 You don't care about the RDBMS.  It's a distraction - you're given a
 set of csv files with unique keys and dates, and you need to find the
 differences between them.
 
 Say the data looks like this:
 
 File for jul 10:
 0x1,stuff
 0x2,more stuff
 
 File for jul 11:
 0x1,stuff
 0x2,apples
 0x3,parrot
 
 Preprocess the csv files to add dates to the values:
 
 File for jul 10:
 0x1,20080710,stuff
 0x2,20080710,more stuff
 
 File for jul 11:
 0x1,20080711,stuff
 0x2,20080711,apples
 0x3,20080711,parrot
 
 Feed two days worth of these files into a hadoop job.
 
 The mapper splits these into k=0x1, v=20080710,stuff etc.
 
 The reducer gets one or two v's per key, and each v has the date
 embedded in it - that's essentially your lookup step.
 
 You'll end up with a system that can do compares for any two dates,
 and could easily be expanded to do all sorts of deltas across these
 files.
 
 The preprocess-the-files-to-add-a-date can probably be included as
 part of your mapper and isn't really a separate step - just depends on
 how easy it is to use one of the off-the-shelf mappers with your data.
 If it turns out to be its own step, it can become a very simple
 hadoop job.
 
 -- 
 James Moore | [EMAIL PROTECTED]
 Ruby and Ruby on Rails consulting
 blog.restphone.com
 

Using MapReduce to do table comparing.

2008-07-23 Thread Amber
We have a 10 million row table exported from AS400 mainframe every day, the 
table is exported as a csv text file, which is about 30GB in size, then the csv 
file is imported into a RDBMS table which is dropped and recreated every day. 
Now we want to find how many rows are updated during each export-import 
interval, the table has a primary key, so deletes and inserts can be found 
using RDBMS joins quickly, but we must do a column to column comparing in order 
to find the difference between rows ( about 90%) with the same primary keys. 
Our goal is to find a comparing process which takes no more than 10 minutes 
with a 4-node cluster, each server in which has 4 4-core 3.0 GHz CPUs, 8GB 
memory  and a  300G local  RAID5 array.

Bellow is our current solution:
The old data is kept in the RDBMS with index created on the primary key, 
the new data is imported into HDFS as the input file of our Map-Reduce job. 
Every map task connects to the RDBMS database, and selects old data from it for 
every row, map tasks will generate outputs if differences are found, and there 
are no reduce tasks.

As you can see, with the number of concurrent map tasks increasing, the RDBMS 
database will become the bottleneck, so we want to kick off the RDBMS, but we 
have no idea about how to retrieve the old row with a given key quickly from 
HDFS files, any suggestion is welcome.

Re: Using MapReduce to do table comparing.

2008-07-23 Thread Jason Venner
If you write a SequenceFile with the results from the RDBM you can use
the join primitives to handle this rapidly.

The key is that you have to write the data in the native key sort order.

Since you have a primary key, you should be able to dump the table in
primary key order, and you can define a comparator (if needed) such that
hadoop will consider that order to be the key sort order.

If you just want to do random lookup of keys, write a MapFile and then
you can do rapid random accesss. (Same rules above apply for writing the
MapFile)


I believe in version of hadoop later than 16.3 the join primitives work
for text files also, if the text file is sorted.


Amber wrote:
 We have a 10 million row table exported from AS400 mainframe every day, the 
 table is exported as a csv text file, which is about 30GB in size, then the 
 csv file is imported into a RDBMS table which is dropped and recreated every 
 day. Now we want to find how many rows are updated during each export-import 
 interval, the table has a primary key, so deletes and inserts can be found 
 using RDBMS joins quickly, but we must do a column to column comparing in 
 order to find the difference between rows ( about 90%) with the same primary 
 keys. Our goal is to find a comparing process which takes no more than 10 
 minutes with a 4-node cluster, each server in which has 4 4-core 3.0 GHz 
 CPUs, 8GB memory  and a  300G local  RAID5 array.

 Bellow is our current solution:
 The old data is kept in the RDBMS with index created on the primary key, 
 the new data is imported into HDFS as the input file of our Map-Reduce job. 
 Every map task connects to the RDBMS database, and selects old data from it 
 for every row, map tasks will generate outputs if differences are found, and 
 there are no reduce tasks.

 As you can see, with the number of concurrent map tasks increasing, the RDBMS 
 database will become the bottleneck, so we want to kick off the RDBMS, but we 
 have no idea about how to retrieve the old row with a given key quickly from 
 HDFS files, any suggestion is welcome.


Re: Using MapReduce to do table comparing.

2008-07-23 Thread James Moore
On Wed, Jul 23, 2008 at 7:33 AM, Amber [EMAIL PROTECTED] wrote:
 We have a 10 million row table exported from AS400 mainframe every day, the 
 table is exported as a csv text file, which is about 30GB in size, then the 
 csv file is imported into a RDBMS table which is dropped and recreated every 
 day. Now we want to find how many rows are updated during each export-import 
 interval, the table has a primary key, so deletes and inserts can be found 
 using RDBMS joins quickly, but we must do a column to column comparing in 
 order to find the difference between rows ( about 90%) with the same primary 
 keys. Our goal is to find a comparing process which takes no more than 10 
 minutes with a 4-node cluster, each server in which has 4 4-core 3.0 GHz 
 CPUs, 8GB memory  and a  300G local  RAID5 array.

 Bellow is our current solution:
The old data is kept in the RDBMS with index created on the primary key, 
 the new data is imported into HDFS as the input file of our Map-Reduce job. 
 Every map task connects to the RDBMS database, and selects old data from it 
 for every row, map tasks will generate outputs if differences are found, and 
 there are no reduce tasks.

 As you can see, with the number of concurrent map tasks increasing, the RDBMS 
 database will become the bottleneck, so we want to kick off the RDBMS, but we 
 have no idea about how to retrieve the old row with a given key quickly from 
 HDFS files, any suggestion is welcome.

Think of map/reduce as giving you a kind of key/value lookup for free
- it just falls out of how the system works.

You don't care about the RDBMS.  It's a distraction - you're given a
set of csv files with unique keys and dates, and you need to find the
differences between them.

Say the data looks like this:

File for jul 10:
0x1,stuff
0x2,more stuff

File for jul 11:
0x1,stuff
0x2,apples
0x3,parrot

Preprocess the csv files to add dates to the values:

File for jul 10:
0x1,20080710,stuff
0x2,20080710,more stuff

File for jul 11:
0x1,20080711,stuff
0x2,20080711,apples
0x3,20080711,parrot

Feed two days worth of these files into a hadoop job.

The mapper splits these into k=0x1, v=20080710,stuff etc.

The reducer gets one or two v's per key, and each v has the date
embedded in it - that's essentially your lookup step.

You'll end up with a system that can do compares for any two dates,
and could easily be expanded to do all sorts of deltas across these
files.

The preprocess-the-files-to-add-a-date can probably be included as
part of your mapper and isn't really a separate step - just depends on
how easy it is to use one of the off-the-shelf mappers with your data.
 If it turns out to be its own step, it can become a very simple
hadoop job.

-- 
James Moore | [EMAIL PROTECTED]
Ruby and Ruby on Rails consulting
blog.restphone.com


Re: Using MapReduce to do table comparing.

2008-07-23 Thread Paco NATHAN
This is merely an in the ballpark calculation, regarding that 10
minute / 4-node requirement...

We have a reasonably similar Hadoop job (slightly more complex in the
reduce phase) running on AWS with:

   * 100+2 nodes (m1.xl config)
   * approx 3x the number of rows and data size
   * completes in 6 minutes

You have faster processors.

So it might require more on the order of 25-35 nodes for 10 min
completion. That's a very rough estimate.

Those other two steps (deletes, inserts) might be performed in the
same pass as the compares -- and potentially quicker overall, when you
consider the time to load and recreate in the RDBMS.

Paco


On Wed, Jul 23, 2008 at 9:33 AM, Amber [EMAIL PROTECTED] wrote:
 We have a 10 million row table exported from AS400 mainframe every day, the 
 table is exported as a csv text file, which is about 30GB in size, then the 
 csv file is imported into a RDBMS table which is dropped and recreated every 
 day. Now we want to find how many rows are updated during each export-import 
 interval, the table has a primary key, so deletes and inserts can be found 
 using RDBMS joins quickly, but we must do a column to column comparing in 
 order to find the difference between rows ( about 90%) with the same primary 
 keys. Our goal is to find a comparing process which takes no more than 10 
 minutes with a 4-node cluster, each server in which has 4 4-core 3.0 GHz 
 CPUs, 8GB memory  and a  300G local  RAID5 array.