Hi,
I have a script which executes multiple jobs , and there is a considerable amount of multiquery optimization done. But it looks like the script generates wrong output with multiquery enabled. The output is fine with -M option.

Attached a trimmed down version of the actual script. The data is getting messed up in the nested foreach, which is defined inside a macro.


The UDF aaa.RANKING() add a simple rank over the ordered data.


The a sample output that is expected is like below (without multiquery);

/1,3,1,1378339200,9779,http:///www.abc12345.com/JQueryAddUserControl.aspx,68445,3333,6,99999,6,0//
//1,3,2,1378339200,9779,http:///www.abc12345.com/EN/IN/Home.aspx,113961,3333,3,99999,0,0//
//1,3,3,1378339200,9779,http:///images.abc12345.com/Img/Tabs/servicestab_expandshadow.gif,2686,3333,2,99999,0,0//
//1,3,4,1378339200,9779,http:///www.abc12345.com/Images/Rent_a_Car_414x207.jpg,30616,3333,2,99999,0,0//
//1,3,5,1378339200,9779,http:///images.abc12345.com/Img/Tabs/servicestabon_linehide.gif,2203,3333,2,99999,0,0//
//1,3,6,1378339200,9779,http:///images.abc12345.com/Img/Common/dottedlinehr.gif,2108,3333,2,99999,0,0//
//1,3,7,1378339200,9779,http:///www.abc12345.com/WebResource.axd,2688,3333,2,99999,0,0//
//1,3,8,1378339200,9779,http:///www.abc12345.com/Scripts/Button/mouseoverbutton.js,2526,3333,2,99999,0,0/

But with multi query on, the data is received like below ;

/*1,3,52*,1378339200,*9779*,http:///www.abc12345.com/Images/UAE_Visa_Marhaba_Services_382x208_New.jpg,1228,3333,1,99999,0,0//
//1,3,18,1378339200,9779,http:///images.abc12345.com/Img/TooltipYellow/TooltipYellowArrowBottom.png,1695,3333,1,99999,0,0//
//1,3,56,1378339200,9779,http:///www.abc12345.com/App_Themes/Default/Img/Common/arrowblue_right.gif,1226,3333,1,99999,0,0//
//1,3,90,1378339200,9779,http:///www.abc12345.com/Scripts/PNRStatus.js,1205,3333,1,99999,0,0//
//1,3,51,1378339200,9779,http:///images.abc12345.com/Img/Obe/obe_bg.gif,1081,3333,1,99999,0,0//
//*1,3,52*,1378339200,*9779*,http:///static.abc12345.com/Scripts/Obe/Obe.js,1076,3333,1,99999,0,0//
/
Note : the ordering is lost and there are two rows that end up with the same key. Happens in both 0.11.1 and 0.10.
-t All also did not help.



Would like to understand if I am doing something wrong in the script that causes this behavior. So far I couldn't figure out a workaround other than disabling multiquery.

Thanks
Vivek
register 'udf.jar';
register 'mysql-connector-java-5.1.11-bin.jar';
register 'piggybank.jar';

SET job.name 'DAILY_2013/09/18'

DEFINE find_top_url_client_traffic(groupedData,bagName, sortkey) RETURNS 
finalResult {
    
    A =  FOREACH $groupedData {
                                                x1 = ORDER      $bagName BY  
$sortkey DESC;
                                                x2 = LIMIT x1 100;
                                                
                                                GENERATE  flatten (group) as 
(logs_domain_id,adn_domain_id,mod_time)  ,
                                                flatten (aaa.RANKING(x2)) as 
                                                (logs_domain_id1: 
long,adn_domain_id1: long,mod_time1: int,timestamp: long,url: 
chararray,client_traffic: long,
                                                client_request: 
long,origin_traffic: long,origin_request: long,passthrough_count: long,
                                                cacheMissOriginCount: 
long,cache_hit_rate: long,cacheMissRate,rank_id :int);
                                                
                                };
                                
        $finalResult = FOREACH A GENERATE 
logs_domain_id,mod_time,rank_id,timestamp,adn_domain_id,url,
                                                        
client_traffic,origin_traffic,client_request,origin_request,passthrough_count,cache_hit_rate;
   
                                                                                
        
    
}


raw = LOAD '/user/sample_input' USING PigStorage (' ') AS (
        date            : chararray,
        time            : chararray,
        log_type        : chararray,
        domain_id       : long,
        serve_from      : chararray,
        cs_protocol     : chararray,
        c_ip            : chararray,
        cs_uri_stem     : chararray,
        sc_status       : chararray,
        sc_bytes        : long,
        cs_bytes        : long,
        cs_host         : chararray
        );



raw_projected = FOREACH raw GENERATE 
date,time,log_type,domain_id,serve_from,cs_protocol,c_ip,cs_uri_stem,sc_status,sc_bytes,cs_bytes,cs_host;

logtype_filtered = FILTER raw_projected BY log_type=='F' OR log_type=='L';


--- FIND OUT THE TIMESTAMP,MOD AND URL
curedData0 = FOREACH logtype_filtered GENERATE 
flatten(aaa.EXTRACT_TIMEPARAMS(date,time,'DAILY',10)) as (timestamp,mod), 
                                
log_type,domain_id,serve_from,c_ip,sc_status,sc_bytes,cs_bytes,
                                CONCAT ( CONCAT(CONCAT ( cs_protocol,':///' ) , 
cs_host),cs_uri_stem) as url;


domainMapping = LOAD 
'/user/logs/domainMapping/DAILY_1379505027444_19934933.txt' USING 
PigStorage(',') as (logs_domain_id,adn_domain_id);

-- Get the customer domain mapping
curedData1 = JOIN curedData0 BY domain_id, domainMapping BY adn_domain_id USING 
'replicated';

curedData = FOREACH curedData1 GENERATE 
timestamp,mod,log_type,logs_domain_id,adn_domain_id,serve_from,c_ip,SUBSTRING(sc_status,0,1)
 as statuscode:int,sc_bytes,cs_bytes,url;


------------------##############  END OF BLOCK


--------------------------------------&-----------------------------------------------
---------------------- Find out client traffic in bytes and client requests

urlClientTraffic1 = FILTER curedData BY log_type=='F';

urlClientTraffic2 = GROUP urlClientTraffic1 BY 
(timestamp,mod,logs_domain_id,adn_domain_id,url);

urlClientTraffic3 = FOREACH urlClientTraffic2 GENERATE flatten(group) as 
(timestamp,mod,logs_domain_id,adn_domain_id,url), 
(SUM(urlClientTraffic1.sc_bytes)+ SUM(urlClientTraffic1.cs_bytes)) as 
clientTraffic,
                                COUNT(urlClientTraffic1) as clientRequests;




--------------------------------------&-----------------------------------------------


urlPassThroughCount1 = FILTER urlClientTraffic1 BY serve_from=='PO';
-- Trim the result for better aggregation
urlPassThroughCount2 = FOREACH urlPassThroughCount1 GENERATE 
timestamp,mod,logs_domain_id,adn_domain_id,url,serve_from;
urlPassThroughCount3 = GROUP urlPassThroughCount2 BY 
(timestamp,mod,logs_domain_id,adn_domain_id,url,serve_from);
urlPassThroughCount4 = FOREACH urlPassThroughCount3 GENERATE flatten (group) as 
(timestamp,mod,logs_domain_id,adn_domain_id,url,serve_from),COUNT(urlPassThroughCount2)
 as passthrough_count;




--------------------------------------&-----------------------------------------------


urlCacheHitCount1 = FILTER urlClientTraffic1 BY serve_from=='MO'; 
urlCacheHitCount2 = FOREACH urlCacheHitCount1 GENERATE 
timestamp,mod,logs_domain_id,adn_domain_id,url,serve_from;
urlCacheHitCount3 = GROUP urlCacheHitCount2 BY 
(timestamp,mod,logs_domain_id,adn_domain_id,url,serve_from);
urlCacheHitCount4 = FOREACH urlCacheHitCount3 GENERATE flatten (group) as 
(timestamp,mod,logs_domain_id,adn_domain_id,url,serve_from),COUNT(urlCacheHitCount2)
 as cacheMissOriginCount;



--------------------------------------&-----------------------------------------------


--urlDataConsolidated3= JOIN urlDataConsolidated2 BY 
(timestamp,mod,logs_domain_id,adn_domain_id,url) LEFT OUTER , 
urlPassThroughCount4 BY (timestamp,mod,logs_domain_id,adn_domain_id,url);

urlDataConsolidated2 = foreach urlClientTraffic3 generate 
timestamp,mod,logs_domain_id,adn_domain_id,url,clientTraffic,clientRequests,3333
 as originTraffic,99999 as originRequests ;

urlDataConsolidated3= JOIN urlDataConsolidated2  BY 
(timestamp,mod,logs_domain_id,adn_domain_id,url) LEFT OUTER , 
urlPassThroughCount4 BY (timestamp,mod,logs_domain_id,adn_domain_id,url);


urlDataConsolidated4= FOREACH urlDataConsolidated3 GENERATE 
urlDataConsolidated2::timestamp as timestamp,
                                        urlDataConsolidated2::mod as mod ,
                                        urlDataConsolidated2::logs_domain_id as 
logs_domain_id,
                                        urlDataConsolidated2::adn_domain_id as 
adn_domain_id,
                                        urlDataConsolidated2::url as url,
                                        urlDataConsolidated2::clientTraffic as 
clientTraffic,
                                        urlDataConsolidated2::clientRequests as 
clientRequests,
                                        urlDataConsolidated2::originTraffic as 
originTraffic,
                                        urlDataConsolidated2::originRequests as 
originRequests,
                                        urlPassThroughCount4::passthrough_count 
as passthrough_count;
 



--------------------------------------&------------------------------------------------------


urlDataConsolidated5= JOIN urlDataConsolidated4 BY 
(timestamp,mod,logs_domain_id,adn_domain_id,url) LEFT OUTER , urlCacheHitCount4 
BY (timestamp,mod,logs_domain_id,adn_domain_id,url);
urlDataConsolidated6= FOREACH urlDataConsolidated5  GENERATE 
urlDataConsolidated4::timestamp as timestamp,
                                        urlDataConsolidated4::mod as mod ,
                                        urlDataConsolidated4::logs_domain_id as 
logs_domain_id,
                                        urlDataConsolidated4::adn_domain_id as 
adn_domain_id,
                                        urlDataConsolidated4::url as url,
                                        (urlDataConsolidated4::clientTraffic is 
null ? 0 : urlDataConsolidated4::clientTraffic ) as clientTraffic,
                                        (urlDataConsolidated4::clientRequests 
is null ? 0 : urlDataConsolidated4::clientRequests) as clientRequests,
                                        (urlDataConsolidated4::originTraffic is 
null ? 0 : urlDataConsolidated4::originTraffic ) as originTraffic,
                                        (urlDataConsolidated4::originRequests 
is null ? 0 : urlDataConsolidated4::originRequests) as originRequests,
                                        
(urlDataConsolidated4::passthrough_count is null ? 0 : 
urlDataConsolidated4::passthrough_count ) as passthrough_count,
                                        
(urlCacheHitCount4::cacheMissOriginCount is null ? 0 : 
urlCacheHitCount4::cacheMissOriginCount)  as cacheMissOriginCount;

urlDataConsolidated6= FOREACH urlDataConsolidated6 GENERATE 
logs_domain_id,adn_domain_id,mod as mod_time,timestamp,url,
                        clientTraffic as  client_traffic,
                        clientRequests as client_request,
                        originTraffic as origin_traffic,
                        originRequests as origin_request,
                        passthrough_count,
                        cacheMissOriginCount,
                        (clientRequests == passthrough_count ? 0 :
                         ( 
((clientRequests-passthrough_count)-cacheMissOriginCount)/(clientRequests-passthrough_count)
 ) ) as cache_hit_rate;  
                        
urlDataConsolidated6=   FOREACH urlDataConsolidated6 GENERATE 
logs_domain_id,adn_domain_id,mod_time,timestamp,url,
                        
client_traffic,client_request,origin_traffic,origin_request,passthrough_count,cacheMissOriginCount,
                        cache_hit_rate,(100-cache_hit_rate) as cacheMissRate;
                                
                                
                                
--------------------------------------&------------------------------------------------------

                        
allDataGrouped = GROUP urlDataConsolidated6 BY 
(logs_domain_id,adn_domain_id,mod_time);


topUrlByClientTraffic = 
find_top_url_client_traffic(allDataGrouped,urlDataConsolidated6,client_traffic);


store topUrlByClientTraffic into 
'/user/logs/output/DAILY_1379505027500/topUrlByClientTraffic' using 
PigStorage(',');

topUrlByClientRequests = find_top_url_client_traffic 
(allDataGrouped,urlDataConsolidated6,client_request);

--explain topUrlByClientRequests;

store topUrlByClientRequests into 
'/user/logs/output/DAILY_1379505027500/topUrlByClientRequests' using 
PigStorage(',');


topUrlByCacheMissRate = find_top_url_client_traffic 
(allDataGrouped,urlDataConsolidated6,cacheMissRate);

store topUrlByCacheMissRate into 
'/user/logs/output/DAILY_1379505027500/topUrlByCacheMissRate' using 
PigStorage(',');










Reply via email to