[ 
https://issues.apache.org/jira/browse/PIG-5068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15689248#comment-15689248
 ] 

Xianda Ke commented on PIG-5068:
--------------------------------

[~kellyzly],
1. agree with you. it would be better to remove the conf from system 
environment.

2. from my point of view, it seem no need to add such a simple class which just 
store a single integer value. 

3. in fact, i have also write such a function for skewed join.  i paste it here 
for your information.
SkewedJoinConverter.java
{code}
private int getDefaultParallelism(List<RDD<Tuple>> predRDDs) {

        int parallelism = -1;

        SparkContext sc = predRDDs.get(0).context();
        if (parallelism < 0) {
            if (sc.conf().contains("spark.default.parallelism")) {
                parallelism = sc.defaultParallelism();
            } else {
                // find out max partitions number
                int maxPartitions = -1;
                for (int i = 0; i < predRDDs.size(); i++) {
                    if (predRDDs.get(i).partitions().length > maxPartitions) {
                        maxPartitions = predRDDs.get(i).partitions().length;
                    }
                }
                parallelism = maxPartitions;
            }
        }

        return parallelism;
    }
{code}
in this function, i have handled this case:  when 
sc.conf().contains("spark.default.parallelism") is false. 
I have a glance at SparkContext.scala and MesosSchedculerBackend.scala.  if 
sparkcontext.conf does not contain "spark.default.parallelism", the 
defalutParallelism depands on TaskScheduler. for instance, 
MesosSchedulerBackend will return 8. If we pick the partition number of 
preceding RDD, it would be better.  But I didn't parse PigContext's property 
"spark.reducers".

from my point of view, it would be better if we combine them.
here is my proposal: a util functionin in SparkUtil.java
{code}
    public static int getParallelism(List<RDD<Tuple>> predecessors,
            PhysicalOperator physicalOperator) {

        int parallelism = -1;
        String sparkReducers = 
pigContext.getProperties().getProperty("spark.reducers");
        if (sparkReducers != null) {
                return Integer.parseInt(sparkReducers);
        }

        int parallelism = physicalOperator.getRequestedParallelism();
        if (parallelism > 0) {
                return parallelism;  
        }

        // Parallelism wasn't set in Pig, so set it to whatever Spark thinks
        // is reasonable.
        SparkContext sc = predecessors.get(0).context();
        parallelism = sc.defaultParallelism();
        if (sc.conf().contains("spark.default.parallelism")) {
                parallelism = sc.defaultParallelism();
        } else {
            // find out max partitions number
            int maxPartitions = -1;
            for (int i = 0; i < predecessors.size(); i++) {
                if (predecessors.get(i).partitions().length > maxPartitions) {
                    maxPartitions = predecessors.get(i).partitions().length;
                }
            }
            parallelism = maxPartitions;
        }   

        return parallelism; 
    }
{code}



> Set SPARK_REDUCERS by pig.properties not by system configuration
> ----------------------------------------------------------------
>
>                 Key: PIG-5068
>                 URL: https://issues.apache.org/jira/browse/PIG-5068
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>         Attachments: PIG-5068.patch
>
>
> In SparkUtil.java, we set the SPARK_REDUCERS by system configuration
> {code}
>     public static int getParallelism(List<RDD<Tuple>> predecessors,
>             PhysicalOperator physicalOperator) {
>         String numReducers = System.getenv("SPARK_REDUCERS");
>         if (numReducers != null) {
>             return Integer.parseInt(numReducers);
>         }
>         int parallelism = physicalOperator.getRequestedParallelism();
>         if (parallelism <= 0) {
>             // Parallelism wasn't set in Pig, so set it to whatever Spark 
> thinks
>             // is reasonable.
>             parallelism = predecessors.get(0).context().defaultParallelism();
>         }
>         return parallelism;
>     }
> {code}
> It is better to set it by pig.properties



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to