> On March 21, 2017, 8:36 p.m., Rohini Palaniswamy wrote:
> > src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
> > Lines 97 (patched)
> > <https://reviews.apache.org/r/57317/diff/3/?file=1666865#file1666865line97>
> >
> >     Why capture metrics at task level? Pig client might run out of memory 
> > when there are lot of tasks

we need get taskMetrics by SparkListenerTaskEnd#taskMetrics in 
JobMetricsListener#onTaskEnd. And we can update latest metrics once the task 
ends. Is it more suitable that we update the taskMetrics when the stage is 
completed? if yes, i will fire jira and will update once we upgrade to 
spark2.0(currently we can not move the code in onTaskEnd to onStageCompleted 
because api does not provide info in spark1.6)


> On March 21, 2017, 8:36 p.m., Rohini Palaniswamy wrote:
> > src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
> > Lines 179 (patched)
> > <https://reviews.apache.org/r/57317/diff/3/?file=1666865#file1666865line179>
> >
> >     What is the point of wait() if you are just returning false? Shouldn't 
> > wait be done in a loop till finishedJobIds.contains(jobId) ?
> >     
> >     jobMetricsListener.waitForJobToEnd(jobID); which calls this method does 
> > not even check for the return type. Should it be changed to void?

I saw following in the commment which explains why we use "wait". But i will 
fire jira to verify whether this problem exists in latest spark version.
// Even though we are not making any async calls to spark,
        // the SparkStatusTracker can still return RUNNING status
        // for a finished job.
        // Looks like there is a race condition between spark
        // "event bus" thread updating it's internal listener and
        // this driver thread calling SparkStatusTracker.
        // To workaround this, we will wait for this job to "finish".
        jobMetricsListener.waitForJobToEnd(jobID);


> On March 21, 2017, 8:36 p.m., Rohini Palaniswamy wrote:
> > src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java
> > Lines 79 (patched)
> > <https://reviews.apache.org/r/57317/diff/3/?file=1666867#file1666867line79>
> >
> >     Any reason for using a IndexedKey to store PigNullableWritable instead 
> > of directly using PigNullableWritable? It is to have Serializable 
> > implementation?
> >     
> >     The comparators for PigNullableWritable have lot of conditions for the 
> > different data types taken care of and IndexedKey can miss some of that. 
> > Also since you are repeating the index and have another object wrapper, the 
> > size of each record will be more.

Indexed Key is imported since PIG-4284. This jira fixed problem like "
(,3) from table a and (,evening) from table b are considered to have same 
key(NULL)." . The function of IndexedKey and PigNullableWritable is similar. 
IndexedKey is used by many classes and maybe a big change if need to replace. 
Filed PIG-5197 to track this.


> On March 21, 2017, 8:36 p.m., Rohini Palaniswamy wrote:
> > src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java
> > Lines 102-103 (patched)
> > <https://reviews.apache.org/r/57317/diff/3/?file=1666867#file1666867line102>
> >
> >     Reflection for method is not required. You can just call getPartition() 
> > directly on the Partitioner

MapReducePartitionerWrapper implements org.apache.spark.Partitioner. Use 
reflection to call org.apache.hadoop.mapreduce.Partitioner. If my understanding 
is wrong, tell me and will contact the contributor of the code.


> On March 21, 2017, 8:36 p.m., Rohini Palaniswamy wrote:
> > src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
> > Lines 387-388 (patched)
> > <https://reviews.apache.org/r/57317/diff/3/?file=1666871#file1666871line387>
> >
> >     Why copy to local directory if already in hdfs?

Copy cache file which is stored in hdfs to local and upload the cache file to 
spark cluster by sparkContext#addFiles in order to let these files can be 
downloaded by spark workers.


> On March 21, 2017, 8:36 p.m., Rohini Palaniswamy wrote:
> > src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
> > Lines 576-580 (patched)
> > <https://reviews.apache.org/r/57317/diff/3/?file=1666871#file1666871line576>
> >
> >     This is redundant. Setting true again for the same setting.

the code is to modify the spark.shuffle.service.enable to true although users 
set the value as “false”, if my understanding is wrong, tell me and will 
contact the contributor of the code.


- kelly


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/57317/#review169547
-----------------------------------------------------------


On March 17, 2017, 6:35 a.m., kelly zhang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/57317/
> -----------------------------------------------------------
> 
> (Updated March 17, 2017, 6:35 a.m.)
> 
> 
> Review request for pig, Daniel Dai and Rohini Palaniswamy.
> 
> 
> Bugs: PIG-4059 and PIG-4854;
>     https://issues.apache.org/jira/browse/PIG-4059
>     https://issues.apache.org/jira/browse/PIG-4854;
> 
> 
> Repository: pig-git
> 
> 
> Description
> -------
> 
> Merge all changes from spark branch
> 
> 
> Diffs
> -----
> 
>   bin/pig e1212fa 
>   build.xml a0d2ca8 
>   ivy.xml 42daec9 
>   ivy/libraries.properties 481066e 
>   src/META-INF/services/org.apache.pig.ExecType 5c034c8 
>   src/docs/src/documentation/content/xdocs/start.xml c9a1491 
>   
> src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
>  e866b28 
>   
> src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
>  0e35273 
>   
> src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
>  ecf780c 
>   
> src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
>  3bad98b 
>   
> src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
>  2376d03 
>   
> src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POBroadcastSpark.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
>  bcbfe2b 
>   
> src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
>  d80951a 
>   
> src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
>  4dc6d54 
>   
> src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java
>  52cfb73 
>   
> src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
>  4923d3f 
>   
> src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
>  13f70c0 
>   
> src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
>  f2830c2 
>   
> src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
>  c3a82c3 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java 
> PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/JobMetricsListener.java
>  PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/KryoSerializer.java 
> PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/MapReducePartitionerWrapper.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/SparkEngineConf.java 
> PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecType.java 
> PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/SparkExecutionEngine.java
>  PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java 
> PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLocalExecType.java
>  PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java 
> PRE-CREATION 
>   src/org/apache/pig/backend/hadoop/executionengine/spark/UDFJarsFinder.java 
> PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/BroadcastConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CounterConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/DistinctConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FilterConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ForEachConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IteratorTransform.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LimitConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LocalRearrangeConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeJoinConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PackageConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PigSecondaryKeyComparatorSpark.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/PoissonSampleConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RDDConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/RankConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ReduceByConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SecondaryKeySortUtil.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SkewedJoinConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SparkSampleSortConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SplitConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StreamConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/converter/UnionConverter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/operator/NativeSparkOperator.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POGlobalRearrangeSpark.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POJoinGroupSpark.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POPoissonSampleSpark.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POReduceBySpark.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/operator/POSampleSortSpark.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/CombinerOptimizer.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/JoinGroupOptimizerSpark.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/MultiQueryOptimizerSpark.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/NoopFilterRemover.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/ParallelismSetter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/SecondaryKeyOptimizerSpark.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/plan/DotSparkPrinter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompilerException.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOpPlanVisitor.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperPlan.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkOperator.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPOPackageAnnotator.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkPrinter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/plan/XMLSparkPrinter.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/spark/streaming/SparkExecutableManager.java
>  PRE-CREATION 
>   
> src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
>  c4b44ad 
>   
> src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
>  889c01b 
>   
> src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
>  0b59c9c 
>   src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java 
> 951146f 
>   src/org/apache/pig/data/SelfSpillBag.java d17f0a8 
>   src/org/apache/pig/impl/plan/OperatorPlan.java 8b2e2e7 
>   src/org/apache/pig/impl/util/UDFContext.java 09afc0a 
>   src/org/apache/pig/tools/pigstats/PigStatsUtil.java e97625f 
>   src/org/apache/pig/tools/pigstats/spark/SparkCounter.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkCounterGroup.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkCounters.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkPigStatusReporter.java 
> PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkScriptState.java PRE-CREATION 
>   src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java PRE-CREATION 
>   test/e2e/pig/build.xml 1ec9cf6 
>   test/e2e/pig/conf/spark.conf PRE-CREATION 
>   test/e2e/pig/drivers/TestDriverPig.pm bcec317 
>   test/e2e/pig/tests/streaming.conf 18f2fb2 
>   test/excluded-tests-spark PRE-CREATION 
>   
> test/org/apache/pig/newplan/logical/relational/TestLocationInPhysicalPlan.java
>  94b34b3 
>   test/org/apache/pig/spark/TestIndexedKey.java PRE-CREATION 
>   test/org/apache/pig/spark/TestSecondarySortSpark.java PRE-CREATION 
>   test/org/apache/pig/test/MiniGenericCluster.java 9347269 
>   test/org/apache/pig/test/SparkMiniCluster.java PRE-CREATION 
>   test/org/apache/pig/test/TestAssert.java 6d4b5c6 
>   test/org/apache/pig/test/TestCase.java c9bb2fa 
>   test/org/apache/pig/test/TestCollectedGroup.java a958d33 
>   test/org/apache/pig/test/TestCombiner.java df44293 
>   test/org/apache/pig/test/TestCubeOperator.java de96e6c 
>   test/org/apache/pig/test/TestEmptyInputDir.java a9a46af 
>   test/org/apache/pig/test/TestEvalPipeline.java 48ece69 
>   test/org/apache/pig/test/TestEvalPipeline2.java c8f51d7 
>   test/org/apache/pig/test/TestEvalPipelineLocal.java c12d595 
>   test/org/apache/pig/test/TestFinish.java f18c103 
>   test/org/apache/pig/test/TestForEachNestedPlanLocal.java 63d8f67 
>   test/org/apache/pig/test/TestGrunt.java f16ff60 
>   test/org/apache/pig/test/TestHBaseStorage.java 864985e 
>   test/org/apache/pig/test/TestLimitVariable.java 53b9dae 
>   test/org/apache/pig/test/TestLineageFindRelVisitor.java e8e6aeb 
>   test/org/apache/pig/test/TestMapSideCogroup.java 2c78b4a 
>   test/org/apache/pig/test/TestMergeJoinOuter.java 81aee55 
>   test/org/apache/pig/test/TestMultiQuery.java c32eab7 
>   test/org/apache/pig/test/TestMultiQueryLocal.java b9ac035 
>   test/org/apache/pig/test/TestNativeMapReduce.java c4f6573 
>   test/org/apache/pig/test/TestNullConstant.java 3ea4509 
>   test/org/apache/pig/test/TestPigRunner.java 25380e4 
>   test/org/apache/pig/test/TestPigServer.java 8e28646 
>   test/org/apache/pig/test/TestPigServerLocal.java fbabd03 
>   test/org/apache/pig/test/TestProjectRange.java 2e3e7b8 
>   test/org/apache/pig/test/TestPruneColumn.java f05e0ec 
>   test/org/apache/pig/test/TestRank1.java 9e4ef62 
>   test/org/apache/pig/test/TestRank2.java fc802a9 
>   test/org/apache/pig/test/TestRank3.java 43af10d 
>   test/org/apache/pig/test/TestSecondarySort.java 8991010 
>   test/org/apache/pig/test/TestSkewedJoin.java 947a31b 
>   test/org/apache/pig/test/TestStoreBase.java eb3b253 
>   test/org/apache/pig/test/TezMiniCluster.java 0bf7c5a 
>   test/org/apache/pig/test/Util.java 18b241e 
>   test/org/apache/pig/test/YarnMiniCluster.java PRE-CREATION 
> 
> 
> Diff: https://reviews.apache.org/r/57317/diff/3/
> 
> 
> Testing
> -------
> 
> all test pass
> 
> 
> Thanks,
> 
> kelly zhang
> 
>

Reply via email to