2.1.1

发自网易邮箱大师
On 07/17/2017 20:55, vaquar khan wrote:
Could you please let us know your Spark version?




Regards, 
vaquar khan 



On Jul 17, 2017 12:18 AM, "163" <hewenting_...@163.com> wrote:

I change the UDF but the performance seems still slow. What can I do else?




在 2017年7月14日,下午8:34,Wenchen Fan <cloud0...@gmail.com> 写道:


Try to replace your UDF with Spark built-in expressions, it should be as simple 
as `$”x” * (lit(1) - $”y”)`.


On 14 Jul 2017, at 5:46 PM, 163 <hewenting_...@163.com> wrote:


I modify the tech query5 to DataFrame:
val forders = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders”).filter("o_orderdate
 < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", "o_orderkey")
val flineitem = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem")
val fcustomer = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer")
val fsupplier = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier")
val fregion = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region”).where("r_name
 = 'ASIA'").select($"r_regionkey")
val fnation = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation”)
valdecrease = udf { (x: Double, y: Double) => x * (1 - y) }
valres =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
     .join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
     .join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && $"c_nationkey" 
=== fsupplier("s_nationkey"))
     .join(fnation, $"s_nationkey" === fnation("n_nationkey"))
     .join(fregion, $"n_regionkey" === fregion("r_regionkey"))
     .select($"n_name", decrease($"l_extendedprice", $"l_discount").as("value"))
     .groupBy($"n_name")
     .agg(sum($"value").as("revenue"))
     .sort($"revenue".desc).show()


My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), each 
with 40 cores and 128GB memory.  TPCH 100G stored on HDFS using parquet format.
It executed about 1.5m, I found that read these 6 tables using 
spark.read.parqeut is sequential, How can I made this to run parallelly ?
 I’ve already set data locality and spark.default.parallelism, 
spark.serializer, using G1, But the runtime  is still not reduced. 
And is there any advices for me to tuning this performance?
Thank you.


Wenting He





Reply via email to