Hi I have the following Spark driver program/job which reads ORC files (i.e.
hive partitions as HDFS directories) process them in DataFrame and use them
as table in hiveContext.sql(). Job runs fine it gives correct results but it
hits physical memory limit after one hour or so and YARN kills executor and
things gets slower and slower. Please see the following code and help me
identify problem. I created 20 Threads from driver program and spawn them.
Thread logic contains lambda function which gets executed on executors.
Please guide I am new to Spark. Thanks much.

  public class DataSpark {

        public static final Map<String,String> dMap = new LinkedHashMap<>();

        public static final String[] colNameArr = new String[]
{"_col0","col2","bla bla 45 columns"};

    public static void main(String[] args) throws Exception {


            Set<DataWorker> workers = new HashSet<>();

            SparkConf sparkConf = new SparkConf().setAppName("SparkTest");
            setSparkConfProperties(sparkConf);
            SparkContext sc = new SparkContext(sparkConf);
            final FileSystem fs = FileSystem.get(sc.hadoopConfiguration());
            HiveContext hiveContext = createHiveContext(sc);

            declareHiveUDFs(hiveContext);

            DateTimeFormatter df = DateTimeFormat.forPattern("yyyyMMdd");
            String yestday = "20150912";
            hiveContext.sql(" use xyz ");
            createTables(hiveContext);
            DataFrame partitionFrame = hiveContext.sql(" show partitions
data partition(date=\""+ yestday + "\")");

            //add csv files to distributed cache
            Row[] rowArr = partitionFrame.collect();
            for(Row row : rowArr) {
                String[] splitArr = row.getString(0).split("/");
                String entity = splitArr[0].split("=")[1];
                int date =  Integer.parseInt(splitArr[1].split("=")[1]);

                String sourcePath =
"/user/xyz/Hive/xyz.db/data/entity="+entity+"/date="+date;
                Path spath = new Path(sourcePath);
                if(fs.getContentSummary(spath).getFileCount() > 0) {
                    DataWorker worker = new DataWorker(hiveContext,entity,
date);
                    workers.add(worker);
                }
            }

            ExecutorService executorService =
Executors.newFixedThreadPool(20);
            executorService.invokeAll(workers);
            executorService.shutdown();


            sc.stop();
        }

        private static void setSparkConfProperties(SparkConf sparkConf) {
            sparkConf.set("spark.rdd.compress","true");

            sparkConf.set("spark.shuffle.consolidateFiles","true");
           
sparkConf.set("spark.executor.logs.rolling.maxRetainedFiles","90");
            sparkConf.set("spark.executor.logs.rolling.strategy","time");
            sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
            sparkConf.set("spark.shuffle.manager","tungsten-sort");

           sparkConf.set("spark.shuffle.memoryFraction","0.5");
           sparkConf.set("spark.storage.memoryFraction","0.2");

        }

        private static HiveContext createHiveContext(SparkContext sc) {
            HiveContext hiveContext = new HiveContext(sc);
            hiveContext.setConf("spark.sql.codgen","true");
            hiveContext.setConf("spark.sql.unsafe.enabled","true");

            hiveContext.setConf("spark.sql.shuffle.partitions","15");//need
to set this to avoid large no of small files by default spark creates 200
output part files
            hiveContext.setConf("spark.sql.orc.filterPushdown","true");
            return hiveContext;
        }

        private static void declareHiveUDFs(HiveContext hiveContext) {
            hiveContext.sql("CREATE TEMPORARY FUNCTION UDF1 AS
'com.blab.blab.UDF1'");
            hiveContext.sql("CREATE TEMPORARY FUNCTION UDF2 AS
'com.blab.blab.UDF2'");
        }

        private static void createTables(HiveContext hiveContext) {

            hiveContext.sql(" create table if not exists abc blab bla );

             hiveContext.sql(" create table if not exists def blab bla );

        }



        private static void createBaseTableAfterProcessing(HiveContext
hiveContext,String entity,int date) {
            String sourcePath =
"/user/xyz/Hive/xyz.db/data/entity="+entity+"/date="+date;

            DataFrame sourceFrame =
hiveContext.read().format("orc").load(sourcePath);

            //rename fields from _col* to actual column names
            DataFrame renamedSourceFrame = sourceFrame.toDF(colNameArr);
            //filter data from fields
            DataFrame dFrame =
renamedSourceFrame.filter(renamedSourceFrame.col("col1").contains("ABC").
                   
or(renamedSourceFrame.col("col1").contains("col3"))).orderBy("col2", "col3",
"col4");

 DataFrame dRemovedFrame = renamedSourceFrame.except(dslFrame);

            JavaRDD<Row> dRemovedRDD = dsqlRemovedFrame.toJavaRDD();
            JavaRDD<Row> sourceRdd = dFrame.toJavaRDD();

            JavaRDD<Row> indexedRdd = sourceRdd.mapPartitionsWithIndex(new
Function2<Integer, Iterator&lt;Row>, Iterator<Row>>() {
                @Override
                public Iterator<Row> call(Integer ind, Iterator<Row>
rowIterator) throws Exception {
                    List<Row> rowList = new ArrayList<>();

                    while (rowIterator.hasNext()) {
                        Row row = rowIterator.next();
                        List rowAsList =
iterate(JavaConversions.seqAsJavaList(row.toSeq()));
                        Row updatedRow =
RowFactory.create(rowAsList.toArray());
                        rowList.add(updatedRow);
                    }
                    return rowList.iterator();
                }
            },false).union(dRemovedRDD).persist(StorageLevels.MEM_DISK_SER);
             DataFrame dUpdatedDataFrame =
hiveContext.createDataFrame(indexedRdd,renamedSourceFrame.schema());
            hiveContext.registerDataFrameAsTable(dUpdatedDataFrame,
basetable);

        }


        private static void createXView(HiveContext hiveContext,String
entity, int date) {
    hiveContext.sql("insert into table def partition(entity='" + entity +
"',date=" + date + ") from baseTable group by bla bla" );
        }

        private static void createYView(HiveContext hiveContext,String
entity, int date) {

    hiveContext.sql("insert into table abc partition(entity='" + entity +
"',date=" + date + ") from baseTable group by bla bla" );

        }


        private static final List iterate(List row) {

            List rowAsList = null;
            String request = null;

                rowAsList = new ArrayList<>(row);
                request = row.get(2).toString();
                int n = Integer.parseInt(row.get(10).toString());
                if (n == 0)
                    dMap.clear();
                if (request != null && request.contains("ABC")) {
                    String key = request.substring(request.indexOf(":") + 2,
request.indexOf("*", request.indexOf(":")) - 1);
                    if (request.contains("ABC DEF")) {
                        dMap.put(key,
request.substring(request.indexOf("as") + 3));
                        request = request.replaceAll(key, "");
                    } else if (request.contains("ABC GHI")) {
                        if (dMap.containsKey(key)) {
                            request = request.replaceAll(key,
dMap.get(key));
                        }
                    } else if (request.contains("ABC IJK")) {
                        if (dMap.containsKey(key)) {
                            request = request.replaceAll(key,
dMap.get(key));
                            dMap.remove(key);
                        }
                    }
                }
                rowAsList.set(2, request);
            }

            return rowAsList;
        }

        public static class DataWorker implements Callable<Void> {

            private String entity;
            private int date;
            private HiveContext hiveContext;

            public DataWorker(HiveContext hiveContext,String entity,int
date) {
                this.hiveContext = hiveContext;
                this.entity = entity;
                this.date = date;
            }

            @Override
            public Void call() throws Exception {
                createBaseTableAfterProcessing(this.hiveContext, entity,
date);
                createXView(this.hiveContext, entity, date);
                createYView(this.hiveContext, entity, date);

                this.hiveContext.clearCache();
                return null;
            }
        }

    }



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-my-Spark-job-is-slow-and-it-throws-OOM-which-leads-YARN-killing-executors-tp24671.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to