I change the UDF but the performance seems still slow. What can I do else?

> 在 2017年7月14日,下午8:34,Wenchen Fan <cloud0...@gmail.com> 写道:
> 
> Try to replace your UDF with Spark built-in expressions, it should be as 
> simple as `$”x” * (lit(1) - $”y”)`.
> 
>> On 14 Jul 2017, at 5:46 PM, 163 <hewenting_...@163.com 
>> <mailto:hewenting_...@163.com>> wrote:
>> 
>> I modify the tech query5 to DataFrame:
>> val forders = 
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders
>>  
>> <hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders>”).filter("o_orderdate
>>  < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", 
>> "o_orderkey")
>> val flineitem = 
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem
>>  <hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem>")
>> val fcustomer = 
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer
>>  <hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer>")
>> val fsupplier = 
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier
>>  <hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier>")
>> val fregion = 
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region
>>  
>> <hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region>”).where("r_name
>>  = 'ASIA'").select($"r_regionkey")
>> val fnation = 
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation
>>  <hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation>”)
>> val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
>> val res =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
>>      .join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
>>      .join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && 
>> $"c_nationkey" === fsupplier("s_nationkey"))
>>      .join(fnation, $"s_nationkey" === fnation("n_nationkey"))
>>      .join(fregion, $"n_regionkey" === fregion("r_regionkey"))
>>      .select($"n_name", decrease($"l_extendedprice", 
>> $"l_discount").as("value"))
>>      .groupBy($"n_name")
>>      .agg(sum($"value").as("revenue"))
>>      .sort($"revenue".desc).show()
>> 
>> My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), 
>> each with 40 cores and 128GB memory.  TPCH 100G stored on HDFS using parquet 
>> format.
>> It executed about 1.5m, I found that read these 6 tables using 
>> spark.read.parqeut is sequential, How can I made this to run parallelly ?
>>  I’ve already set data locality and spark.default.parallelism, 
>> spark.serializer, using G1, But the runtime  is still not reduced. 
>> And is there any advices for me to tuning this performance?
>> Thank you.
>> 
>> Wenting He
>> 
> 

Reply via email to