Hi
To read file parallely , you can follow the below code.
case class readData (fileName : String , spark : SparkSession) extends
Callable[Dataset[Row]]{
override def call(): Dataset[Row] = {
spark.read.parquet(fileName)
// spark.read.csv(fileName)
}
}
val spark = SparkSession.builder()
.appName("practice")
.config("spark.scheduler.mode","FAIR")
.enableHiveSupport().getOrCreate()
val pool = Executors.newFixedThreadPool(6)
val list = new util.ArrayList[Future[Dataset[Row]]]()
for(fileName<-"orders,lineitem,customer,supplier,region,nation".split(",")){
val o1 = new readData(fileName,spark)
//pool.submit(o1).
list.add(pool.submit(o1))
}
val rddList = new ArrayBuffer[Dataset[Row]]()
for(result <- list){
rddList += result.get()
}
pool.shutdown()
pool.awaitTermination(Long.MaxValue, TimeUnit.NANOSECONDS)
for(finalData<-rddList){
finalData.show()
}
This will read data in parallel ,which I think is your main bottleneck.
Regards
Pralabh Kumar
On Mon, Jul 17, 2017 at 6:25 PM, vaquar khan <[email protected]> wrote:
> Could you please let us know your Spark version?
>
>
> Regards,
> vaquar khan
>
>
> On Jul 17, 2017 12:18 AM, "163" <[email protected]> wrote:
>
>> I change the UDF but the performance seems still slow. What can I do else?
>>
>>
>> 在 2017年7月14日,下午8:34,Wenchen Fan <[email protected]> 写道:
>>
>> Try to replace your UDF with Spark built-in expressions, it should be as
>> simple as `$”x” * (lit(1) - $”y”)`.
>>
>> On 14 Jul 2017, at 5:46 PM, 163 <[email protected]> wrote:
>>
>> I modify the tech query5 to DataFrame:
>>
>> val forders =
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders*”*).filter("o_orderdate
>> < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey",
>> "o_orderkey")
>> val flineitem =
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem")
>> val fcustomer =
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer")
>> val fsupplier =
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier")
>> val fregion =
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region*”*).where("r_name
>> = 'ASIA'").select($"r_regionkey")
>> val fnation =
>> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation*”*)
>>
>> val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
>>
>> val res = flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
>> .join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
>> .join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") &&
>> $"c_nationkey" === fsupplier("s_nationkey"))
>> .join(fnation, $"s_nationkey" === fnation("n_nationkey"))
>> .join(fregion, $"n_regionkey" === fregion("r_regionkey"))
>> .select($"n_name", decrease($"l_extendedprice",
>> $"l_discount").as("value"))
>> .groupBy($"n_name")
>> .agg(sum($"value").as("revenue"))
>> .sort($"revenue".desc).show()
>>
>>
>> My environment is one master(Hdfs-namenode), four workers(HDFS-datanode),
>> each with 40 cores and 128GB memory. TPCH 100G stored on HDFS using parquet
>> format.
>>
>> It executed about 1.5m, I found that read these 6 tables using
>> spark.read.parqeut is sequential, How can I made this to run parallelly ?
>>
>> I’ve already set data locality and spark.default.parallelism,
>> spark.serializer, using G1, But the runtime is still not reduced.
>>
>> And is there any advices for me to tuning this performance?
>>
>> Thank you.
>>
>> Wenting He
>>
>>
>>
>>
>>