Hi To read file parallely , you can follow the below code.
case class readData (fileName : String , spark : SparkSession) extends Callable[Dataset[Row]]{ override def call(): Dataset[Row] = { spark.read.parquet(fileName) // spark.read.csv(fileName) } } val spark = SparkSession.builder() .appName("practice") .config("spark.scheduler.mode","FAIR") .enableHiveSupport().getOrCreate() val pool = Executors.newFixedThreadPool(6) val list = new util.ArrayList[Future[Dataset[Row]]]() for(fileName<-"orders,lineitem,customer,supplier,region,nation".split(",")){ val o1 = new readData(fileName,spark) //pool.submit(o1). list.add(pool.submit(o1)) } val rddList = new ArrayBuffer[Dataset[Row]]() for(result <- list){ rddList += result.get() } pool.shutdown() pool.awaitTermination(Long.MaxValue, TimeUnit.NANOSECONDS) for(finalData<-rddList){ finalData.show() } This will read data in parallel ,which I think is your main bottleneck. Regards Pralabh Kumar On Mon, Jul 17, 2017 at 6:25 PM, vaquar khan <vaquar.k...@gmail.com> wrote: > Could you please let us know your Spark version? > > > Regards, > vaquar khan > > > On Jul 17, 2017 12:18 AM, "163" <hewenting_...@163.com> wrote: > >> I change the UDF but the performance seems still slow. What can I do else? >> >> >> 在 2017年7月14日,下午8:34,Wenchen Fan <cloud0...@gmail.com> 写道: >> >> Try to replace your UDF with Spark built-in expressions, it should be as >> simple as `$”x” * (lit(1) - $”y”)`. >> >> On 14 Jul 2017, at 5:46 PM, 163 <hewenting_...@163.com> wrote: >> >> I modify the tech query5 to DataFrame: >> >> val forders = >> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders*”*).filter("o_orderdate >> < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", >> "o_orderkey") >> val flineitem = >> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem") >> val fcustomer = >> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer") >> val fsupplier = >> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier") >> val fregion = >> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region*”*).where("r_name >> = 'ASIA'").select($"r_regionkey") >> val fnation = >> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation*”*) >> >> val decrease = udf { (x: Double, y: Double) => x * (1 - y) } >> >> val res = flineitem.join(forders, $"l_orderkey" === forders("o_orderkey")) >> .join(fcustomer, $"o_custkey" === fcustomer("c_custkey")) >> .join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && >> $"c_nationkey" === fsupplier("s_nationkey")) >> .join(fnation, $"s_nationkey" === fnation("n_nationkey")) >> .join(fregion, $"n_regionkey" === fregion("r_regionkey")) >> .select($"n_name", decrease($"l_extendedprice", >> $"l_discount").as("value")) >> .groupBy($"n_name") >> .agg(sum($"value").as("revenue")) >> .sort($"revenue".desc).show() >> >> >> My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), >> each with 40 cores and 128GB memory. TPCH 100G stored on HDFS using parquet >> format. >> >> It executed about 1.5m, I found that read these 6 tables using >> spark.read.parqeut is sequential, How can I made this to run parallelly ? >> >> I’ve already set data locality and spark.default.parallelism, >> spark.serializer, using G1, But the runtime is still not reduced. >> >> And is there any advices for me to tuning this performance? >> >> Thank you. >> >> Wenting He >> >> >> >> >>