Thanks Prashant.
Yes all relations belong to same schema and go into same target table. As suggested using UNION now.

My final script looks as attached (written with my limited knowledge on Pig scripting) Request all to provide suggestions in improvising and optimizing this further.

Thanks in advance,
Regards,
Sarath.

On Friday 13 April 2012 11:14 PM, Prashant Kommireddi wrote:
Hi Sarath,

Is the schema for all relations you send to DB same? And are these
being sent to the same target table? If no is the answer to any of
these you probably would need multiple DBStorage calls. If yes is the
answer to both, you could do a union on those output relations and
send that to DB.

I am not clear on data being sent to the same table, but if you are
you might append an identifier indicating which relation data belongs
to.

Sent from my iPhone

On Apr 13, 2012, at 4:08 AM, Sarath
<[email protected]>  wrote:

Thanks Prashant.
I would like to know about one more possibility in Pig.

In my case, I'm doing a store to database multiple times as I iteratively do 
the join, filter and find the matching records to be stored.
To avoid this multiple calls for store data into DB, can I collect all the 
matching records in each iteration into a DataBag and finally send this DataBag 
for storage to DB.
I understand its possible by using a UDF  implementing accumulator interface.

Please correct my understanding and also provide some links to documentation 
and examples for doing this.

Thanks&  Regards,
Sarath.

On Saturday 07 April 2012 03:52 PM, Prashant Kommireddi wrote:
HI Sarath,

53000 records is a small dataset for comparison between Hadoop and DB.
Hadoop is really advantageous when the datasets are huge (GBs, TBs, PBs). I
am guessing the 53000 records must be a few MBs, and in most cases an
indexed database would always perform better on Joins on such small
datasets. With hadoop/pig, there is always a cost associated with spawning
up Map and Reduce tasks and reading the datasets. Your script should not
die with a larger dataset, though the database you are using might choke
(again, if your dataset is really huge).

Having said that and looking at your process, seems like you could achieve
the task of finding matching/non-matching records with an Outer Join? Steps
1,3 could be solved with an outer join and step 4,5 with an inner join.



On Sat, Apr 7, 2012 at 2:38 AM, Sarath<
[email protected]>   wrote:

Dear All,

I have 2 data dumps (comma separated) each with around 53,000 records (
just sample data. it could be 10times more than this in real time).
I need to write a script to -

  1. find matching records from these 2 dumps based on a set of matching
fields
  2. store matching records from each dump into database
  3. find the remaining records from each dump
  4. find matching records by excluding one of the matching field
  5. again store matching records from each dump into database

For step 1 I used "cogroup"
For step 3 I split "cogroup" with nulls for dumps 2&   1 respectively to
get the remaining records for dumps 1&   2
For step 2&   4 I used DBStorage UDF to store the records into DB. With
this approach I get 4 store commands (2 commands for each dump at steps 2&
5).

Before storing to DB I'm using another UDF to generate a running sequence
number which will be stored as key for each record being stored.

Problem:
=====
The script for this entire process is creating 6 map-reduce jobs and
taking about 10mins to complete on a cluster of 3 machines (1 master and 2
slaves).
The same requirement when done using a stored procedure is completing in 5
mins. Now I'm worried that my script could kill in real time environments.

Requesting to suggest -
->   What am I missing?
->   What can I do more to improve the performance that is in comparison to
stored procedure?
->   What changes and/or additions to be done so that the script is scalable
to any amounts of data?

Thanks in advance,
Regards,
Sarath.


-- Register JARs
REGISTER /home/hduser/MyProj.jar;
REGISTER /home/hduser/ojdbc6.jar;
-- ********************************************
-- Load Source1
tab1_raw = LOAD '/user/sarath/src1/part-r-00000' using 
myProjUDF.PigStorage(',') as (fld1: chararray,fld2: chararray,fld3: 
chararray,fld4: chararray,fld5: float,fld6: int,fld7: chararray,fld8: 
chararray,fld9: chararray,fld10: chararray,fld11: float);
-- Load Source2
tab2_raw = LOAD '/user/sarath/src2/part-r-00000' using 
myProjUDF.PigStorage(',') as (fld1: chararray,fld2: chararray,fld3: 
chararray,fld4: chararray,fld5: float,fld6: int,fld7: chararray,fld8: 
chararray,fld9: chararray,fld10: chararray,fld11: float);
-- Filter raw input for nulls
tab1 = FILTER tab1_raw BY (fld9 is not null) AND (fld7 is not null) AND (fld4 
is not null) AND (fld11 is not null);
tab2 = FILTER tab2_raw BY (fld9 is not null) AND (fld7 is not null) AND (fld4 
is not null) AND (fld11 is not null);
-- ********************************************
-- Match on all matching fields
tab_grp = COGROUP tab1 BY (fld9,fld7,fld4,fld11),tab2 BY (fld9,fld7,fld4,fld11);
tab_fil = FILTER tab_grp BY NOT IsEmpty(tab1) AND NOT IsEmpty(tab2);
-- Generate keys
tab_tagged = FOREACH tab_fil GENERATE myProjUDF.LinkIDGenerator($1),*;
-- Get matching records from source1
tab1_mat1= FOREACH tab_tagged GENERATE 
FLATTEN(tab1),$0,'1','SYSUSER','08-APR-12 03:45','Active','Matched','System';
-- Get matching records from source2
tab2_mat1= FOREACH tab_tagged GENERATE 
FLATTEN(tab2),$0,'1','SYSUSER','08-APR-12 03:45','Active','Matched','System';
-- ********************************************
-- Split unmatched records for each source
SPLIT tab_grp INTO tab1_temp IF IsEmpty(tab2),tab2_temp IF IsEmpty(tab1);
-- Get unmatched records from source1
tab1 = FOREACH tab1_temp GENERATE FLATTEN(tab1);
-- Get unmatched records from source2
tab2 = FOREACH tab2_temp GENERATE FLATTEN(tab2);
-- ********************************************
-- Exempt matching field fld9 and find matching recs again
tab_grp = COGROUP tab1 BY (fld7,fld4,fld11),tab2 BY (fld7,fld4,fld11);
tab_fil = FILTER tab_grp BY NOT IsEmpty(tab1) AND NOT IsEmpty(tab2);
-- Generate keys
tab_tagged = FOREACH tab_fil GENERATE myProjUDF.LinkIDGenerator($1),*;
-- Get matching records from source1
tab1_mat21= FOREACH tab_tagged GENERATE 
FLATTEN(tab1),$0,'1','SYSUSER','08-APR-12 03:45','Active','Matched','System';
-- Get matching records from source2
tab2_mat21= FOREACH tab_tagged GENERATE 
FLATTEN(tab2),$0,'1','SYSUSER','08-APR-12 03:45','Active','Matched','System';
-- ********************************************
-- Split for unmatched records for each source
SPLIT tab_grp INTO tab1_temp IF IsEmpty(tab2),tab2_temp IF IsEmpty(tab1);
-- Get unmatched records from source1
tab1 = FOREACH tab1_temp GENERATE FLATTEN(tab1);
-- Get unmatched records from source2
tab2 = FOREACH tab2_temp GENERATE FLATTEN(tab2);
-- ********************************************
-- Exempt matching field fld7 and find matching recs again
tab_grp = COGROUP tab1 BY (fld9,fld4,fld11),tab2 BY (fld9,fld4,fld11);
tab_fil = FILTER tab_grp BY NOT IsEmpty(tab1) AND NOT IsEmpty(tab2);
-- Generate keys
tab_tagged = FOREACH tab_fil GENERATE myProjUDF.LinkIDGenerator($1),*;
-- Get matching records from source1
tab1_mat22= FOREACH tab_tagged GENERATE 
FLATTEN(tab1),$0,'1','SYSUSER','08-APR-12 03:45','Active','Matched','System';
-- Get matching records from source2
tab2_mat22= FOREACH tab_tagged GENERATE 
FLATTEN(tab2),$0,'1','SYSUSER','08-APR-12 03:45','Active','Matched','System';
-- ********************************************
-- Split for unmatched records for each source
SPLIT tab_grp INTO tab1_temp IF IsEmpty(tab2),tab2_temp IF IsEmpty(tab1);
-- Get unmatched records from source1
tab1 = FOREACH tab1_temp GENERATE FLATTEN(tab1);
-- Get unmatched records from source2
tab2 = FOREACH tab2_temp GENERATE FLATTEN(tab2);
-- ********************************************
-- Exempt matching field fld4 and find matching recs again
tab_grp = COGROUP tab1 BY (fld7,fld9,fld11),tab2 BY (fld7,fld9,fld11);
tab_fil = FILTER tab_grp BY NOT IsEmpty(tab1) AND NOT IsEmpty(tab2);
-- Generate keys
tab_tagged = FOREACH tab_fil GENERATE myProjUDF.LinkIDGenerator($1),*;
-- Get matching records from source1
tab1_mat23= FOREACH tab_tagged GENERATE 
FLATTEN(tab1),$0,'1','SYSUSER','08-APR-12 03:45','Active','Matched','System';
-- Get matching records from source2
tab2_mat23= FOREACH tab_tagged GENERATE 
FLATTEN(tab2),$0,'1','SYSUSER','08-APR-12 03:45','Active','Matched','System';
-- ********************************************
-- Split for unmatched records for each source
SPLIT tab_grp INTO tab1_temp IF IsEmpty(tab2),tab2_temp IF IsEmpty(tab1);
-- Get unmatched records from source1
tab1 = FOREACH tab1_temp GENERATE FLATTEN(tab1);
-- Get unmatched records from source2
tab2 = FOREACH tab2_temp GENERATE FLATTEN(tab2);
-- ********************************************
-- Collate all matched records
tab_mat = UNION 
tab1_mat1,tab2_mat1,tab1_mat21,tab2_mat21,tab1_mat22,tab2_mat22,tab1_mat23,tab2_mat23;
-- Store matching records to DB
STORE tab_mat INTO 'dummy' USING 
myProjUDF.DBStorage('oracle.jdbc.driver.OracleDriver','jdbc:oracle:thin:@192.168.1.3:1521:ORCL','demo_db','demo','insert
 into 
MATCHED(fld1,fld2,fld3,fld4,fld5,fld6,fld7,fld8,fld9,fld10,fld11,link,user,rec_date,rec_status,match_status,match_type)
 values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)');
-- ********************************************
-- Store remaining unmatched records as exceptions
tab1_exp = FOREACH tab1 GENERATE 
myProjUDF.LinkIDGenerator($1),'Open','Active','SYSUSER';
tab2_exp = FOREACH tab2 GENERATE 
myProjUDF.LinkIDGenerator($1),'Open','Active','SYSUSER';
-- Collate all exception records
tab_exp = UNION tab1_exp,tab2_exp;
-- Store exception records to DB
STORE tab_exp INTO 'dummy' USING 
myProjUDF.DBStorage('oracle.jdbc.driver.OracleDriver','jdbc:oracle:thin:@192.168.1.3:1521:ORCL','demo_db','demo','insert
 into EXCEPTION(fld1,fld2,fld3,fld4) values (?,?,?,?)');

Reply via email to