[ 
https://issues.apache.org/jira/browse/PIG-4899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15701500#comment-15701500
 ] 

Adam Szita edited comment on PIG-4899 at 11/28/16 10:02 AM:
------------------------------------------------------------

Uploaded new patch [^PIG-4899.3IncrFrom2.patch] (incremental patch to patch 2 
which was submitted )that has a different approach - 
LoadConverter#ToTupleFunction#apply may run in different JVMs even, so we 
cannot depend on it figuring out if an input file was already read by another 
node for example.
Instead in SparkStatsUtil#getLoadSparkCoutnerValue we divide the input record 
count with the number of successor plans under Split operator. 


was (Author: szita):
Uploaded new patch [^PIG-4899.3IncrFrom2.patch] that has a different approach - 
LoadConverter#ToTupleFunction#apply may run in different JVMs even, so we 
cannot depend on it figuring out if an input file was already read by another 
node for example.
Instead in SparkStatsUtil#getLoadSparkCoutnerValue we divide the input record 
count with the number of successor plans under Split operator. 

>  The number of records of input file is calculated wrongly in spark mode in 
> multiquery case
> -------------------------------------------------------------------------------------------
>
>                 Key: PIG-4899
>                 URL: https://issues.apache.org/jira/browse/PIG-4899
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: Adam Szita
>             Fix For: spark-branch
>
>         Attachments: PIG-4899.2.patch, PIG-4899.3IncrFrom2.patch, 
> PIG-4899.patch
>
>
> sparkCounter to calucate the records of input 
> file(LoadConverter#ToTupleFunction#apply) will be executed multiple times in 
> multiquery case. This will cause the input records number is calculated 
> wrongly. for example:
> {code}
> #--------------------------------------------------
> # Spark Plan                                  
> #--------------------------------------------------
> Spark node scope-534
> Split - scope-548
> |   |
> |   
> Store(hdfs://localhost:48350/tmp/temp649016960/tmp48836938:org.apache.pig.impl.io.InterStorage)
>  - scope-538
> |   |
> |   |---C: Filter[bag] - scope-495
> |       |   |
> |       |   Less Than or Equal[boolean] - scope-498
> |       |   |
> |       |   |---Project[int][1] - scope-496
> |       |   |
> |       |   |---Constant(5) - scope-497
> |   |
> |   
> Store(hdfs://localhost:48350/tmp/temp649016960/tmp804709981:org.apache.pig.impl.io.InterStorage)
>  - scope-546
> |   |
> |   |---B: Filter[bag] - scope-507
> |       |   |
> |       |   Equal To[boolean] - scope-510
> |       |   |
> |       |   |---Project[int][0] - scope-508
> |       |   |
> |       |   |---Constant(3) - scope-509
> |
> |---A: New For Each(false,false,false)[bag] - scope-491
>     |   |
>     |   Cast[int] - scope-483
>     |   |
>     |   |---Project[bytearray][0] - scope-482
>     |   |
>     |   Cast[int] - scope-486
>     |   |
>     |   |---Project[bytearray][1] - scope-485
>     |   |
>     |   Cast[int] - scope-489
>     |   |
>     |   |---Project[bytearray][2] - scope-488
>     |
>     |---A: 
> Load(hdfs://localhost:48350/user/root/input:org.apache.pig.builtin.PigStorage)
>  - scope-481--------
> Spark node scope-540
> C: 
> Store(hdfs://localhost:48350/user/root/output:org.apache.pig.builtin.PigStorage)
>  - scope-502
> |
> |---Load(hdfs://localhost:48350/tmp/temp649016960/tmp48836938:org.apache.pig.impl.io.InterStorage)
>  - scope-539--------
> Spark node scope-542
> D: 
> Store(hdfs://localhost:48350/user/root/output2:org.apache.pig.builtin.PigStorage)
>  - scope-533
> |
> |---D: FRJoin[tuple] - scope-525
>     |   |
>     |   Project[int][0] - scope-522
>     |   |
>     |   Project[int][0] - scope-523
>     |   |
>     |   Project[int][0] - scope-524
>     |
>     
> |---Load(hdfs://localhost:48350/tmp/temp649016960/tmp48836938:org.apache.pig.impl.io.InterStorage)
>  - scope-541--------
> Spark node scope-545
> Store(hdfs://localhost:48350/tmp/temp649016960/tmp-2036144538:org.apache.pig.impl.io.InterStorage)
>  - scope-547
> |
> |---A1: New For Each(false,false,false)[bag] - scope-521
>     |   |
>     |   Cast[int] - scope-513
>     |   |
>     |   |---Project[bytearray][0] - scope-512
>     |   |
>     |   Cast[int] - scope-516
>     |   |
>     |   |---Project[bytearray][1] - scope-515
>     |   |
>     |   Cast[int] - scope-519
>     |   |
>     |   |---Project[bytearray][2] - scope-518
>     |
>     |---A1: 
> Load(hdfs://localhost:48350/user/root/input2:org.apache.pig.builtin.PigStorage)
>  - scope-511-------
> {code}
> PhysicalOperator (LoadA) will be executed in 
> LoadConverter#ToTupleFunction#apply for more than the correct times because 
> this is a multi-query case. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to