Try to replace your UDF with Spark built-in expressions, it should be as simple as `$”x” * (lit(1) - $”y”)`.
> On 14 Jul 2017, at 5:46 PM, 163 <hewenting_...@163.com> wrote: > > I modify the tech query5 to DataFrame: > val forders = > spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders > > <hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders>”).filter("o_orderdate > < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", > "o_orderkey") > val flineitem = > spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem > <hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem>") > val fcustomer = > spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer > <hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer>") > val fsupplier = > spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier > <hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier>") > val fregion = > spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region > > <hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region>”).where("r_name > = 'ASIA'").select($"r_regionkey") > val fnation = > spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation > <hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation>”) > val decrease = udf { (x: Double, y: Double) => x * (1 - y) } > val res = flineitem.join(forders, $"l_orderkey" === forders("o_orderkey")) > .join(fcustomer, $"o_custkey" === fcustomer("c_custkey")) > .join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && > $"c_nationkey" === fsupplier("s_nationkey")) > .join(fnation, $"s_nationkey" === fnation("n_nationkey")) > .join(fregion, $"n_regionkey" === fregion("r_regionkey")) > .select($"n_name", decrease($"l_extendedprice", > $"l_discount").as("value")) > .groupBy($"n_name") > .agg(sum($"value").as("revenue")) > .sort($"revenue".desc).show() > > My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), > each with 40 cores and 128GB memory. TPCH 100G stored on HDFS using parquet > format. > It executed about 1.5m, I found that read these 6 tables using > spark.read.parqeut is sequential, How can I made this to run parallelly ? > I’ve already set data locality and spark.default.parallelism, > spark.serializer, using G1, But the runtime is still not reduced. > And is there any advices for me to tuning this performance? > Thank you. > > Wenting He >