Re: [SQL] Syntax "case when" doesn't be supported in JOIN

2017-07-14 Thread Liang-Chi Hsieh

A possible workaround is to add the rand column into tbl1 with a projection
before the join.

SELECT a.col1
FROM (
  SELECT col1,
CASE
 WHEN col2 IS NULL
   THEN cast(rand(9)*1000 - 99 as string)
 ELSE
   col2
END AS col2
FROM tbl1) a
LEFT OUTER JOIN tbl2 b
ON a.col2 = b.col3;



Chang Chen wrote
> Hi Wenchen
> 
> Yes. We also find this error is caused by Rand. However, this is classic
> way to solve data skew in Hive.  Is there any equivalent way in Spark?
> 
> Thanks
> Chang
> 
> On Thu, Jul 13, 2017 at 8:25 PM, Wenchen Fan <

> cloud0fan@

> > wrote:
> 
>> It’s not about case when, but about rand(). Non-deterministic expressions
>> are not allowed in join condition.
>>
>> > On 13 Jul 2017, at 6:43 PM, wangshuang <

> cn_wss@

> > wrote:
>> >
>> > I'm trying to execute hive sql on spark sql (Also on spark
>> thriftserver), For
>> > optimizing data skew, we use "case when" to handle null.
>> > Simple sql as following:
>> >
>> >
>> > SELECT a.col1
>> > FROM tbl1 a
>> > LEFT OUTER JOIN tbl2 b
>> > ON
>> > * CASE
>> >   WHEN a.col2 IS NULL
>> >   TNEN cast(rand(9)*1000 - 99 as string)
>> >   ELSE
>> >   a.col2 END *
>> >   = b.col3;
>> >
>> >
>> > But I get the error:
>> >
>> > == Physical Plan ==
>> > *org.apache.spark.sql.AnalysisException: nondeterministic expressions
>> are
>> > only allowed in
>> > Project, Filter, Aggregate or Window, found:*
>> > (((CASE WHEN (a.`nav_tcdt` IS NULL) THEN CAST(((rand(9) * CAST(1000 AS
>> > DOUBLE)) - CAST(99L AS DOUBLE)) AS STRING) ELSE a.`nav_tcdt`
>> END
>> =
>> > c.`site_categ_id`) AND (CAST(a.`nav_tcd` AS INT) = 9)) AND
>> (c.`cur_flag`
>> =
>> > 1))
>> > in operator Join LeftOuter, (((CASE WHEN isnull(nav_tcdt#25) THEN
>> > cast(((rand(9) * cast(1000 as double)) - cast(99 as double)) as
>> > string) ELSE nav_tcdt#25 END = site_categ_id#80) && (cast(nav_tcd#26 as
>> int)
>> > = 9)) && (cur_flag#77 = 1))
>> >   ;;
>> > GlobalLimit 10
>> > +- LocalLimit 10
>> >   +- Aggregate [date_id#7, CASE WHEN (cast(city_id#10 as string) IN
>> > (cast(19596 as string),cast(20134 as string),cast(10997 as string)) &&
>> > nav_tcdt#25 RLIKE ^[0-9]+$) THEN city_id#10 ELSE nav_tpa_id#21 END],
>> > [date_id#7]
>> >  +- Filter (date_id#7 = 2017-07-12)
>> > +- Join LeftOuter, (((CASE WHEN isnull(nav_tcdt#25) THEN
>> > cast(((rand(9) * cast(1000 as double)) - cast(99 as double)) as
>> > string) ELSE nav_tcdt#25 END = site_categ_id#80) && (cast(nav_tcd#26 as
>> int)
>> > = 9)) && (cur_flag#77 = 1))
>> >:- SubqueryAlias a
>> >:  +- SubqueryAlias tmp_lifan_trfc_tpa_hive
>> >: +- CatalogRelation `tmp`.`tmp_lifan_trfc_tpa_hive`,
>> > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [date_id#7,
>> chanl_id#8L,
>> > pltfm_id#9, city_id#10, sessn_id#11, gu_id#12,
>> nav_refer_page_type_id#13,
>> > nav_refer_page_value#14, nav_refer_tpa#15, nav_refer_tpa_id#16,
>> > nav_refer_tpc#17, nav_refer_tpi#18, nav_page_type_id#19,
>> nav_page_value#20,
>> > nav_tpa_id#21, nav_tpa#22, nav_tpc#23, nav_tpi#24, nav_tcdt#25,
>> nav_tcd#26,
>> > nav_tci#27, nav_tce#28, detl_refer_page_type_id#29,
>> > detl_refer_page_value#30, ... 33 more fields]
>> >+- SubqueryAlias c
>> >   +- SubqueryAlias dim_site_categ_ext
>> >  +- CatalogRelation `dw`.`dim_site_categ_ext`,
>> > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
>> [site_categ_skid#64L,
>> > site_categ_type#65, site_categ_code#66, site_categ_name#67,
>> > site_categ_parnt_skid#68L, site_categ_kywrd#69, leaf_flg#70L,
>> sort_seq#71L,
>> > site_categ_srch_name#72, vsbl_flg#73, delet_flag#74, etl_batch_id#75L,
>> > updt_time#76, cur_flag#77, bkgrnd_categ_skid#78L, bkgrnd_categ_id#79L,
>> > site_categ_id#80, site_categ_parnt_id#81]
>> >
>> > Does spark sql not support syntax "case when" in JOIN?  Additional, my
>> spark
>> > version is 2.2.0.
>> > Any help would be greatly appreciated.
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context: http://apache-spark-developers
>> -list.1001551.n3.nabble.com/SQL-Syntax-case-when-doesn-t-
>> be-supported-in-JOIN-tp21953.html
>> > Sent from the Apache Spark Developers List mailing list archive at
>> Nabble.com.
>> >
>> > -
>> > To unsubscribe e-mail: 

> dev-unsubscribe@.apache

>> >
>>
>>
>> -
>> To unsubscribe e-mail: 

> dev-unsubscribe@.apache

>>
>>





-
Liang-Chi Hsieh | @viirya 
Spark Technology Center 
http://www.spark.tc/ 
--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/SQL-Syntax-case-when-doesn-t-be-supported-in-JOIN-tp21953p21961.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---

How to tune the performance of Tpch query5 within Spark

2017-07-14 Thread 163

> 
> I modify the tech query5 to DataFrame:
> val forders = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders
>  
> ”).filter("o_orderdate
>  < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", 
> "o_orderkey")
> val flineitem = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem
>  ")
> val fcustomer = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer
>  ")
> val fsupplier = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier
>  ")
> val fregion = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region
>  
> ”).where("r_name 
> = 'ASIA'").select($"r_regionkey")
> val fnation = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation
>  ”)
> val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
> val res =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
>  .join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
>  .join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && 
> $"c_nationkey" === fsupplier("s_nationkey"))
>  .join(fnation, $"s_nationkey" === fnation("n_nationkey"))
>  .join(fregion, $"n_regionkey" === fregion("r_regionkey"))
>  .select($"n_name", decrease($"l_extendedprice", 
> $"l_discount").as("value"))
>  .groupBy($"n_name")
>  .agg(sum($"value").as("revenue"))
>  .sort($"revenue".desc).show()
> 
> My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), 
> each with 40 cores and 128GB memory.  TPCH 100G, stored data on hdfs with 
> parquet format.
> It executed about 1.5m, I found that read these 6 tables using 
> spark.read.parqeut is sequential, How can I made this to run parallelly ?
>  I’ve already set data locality and spark.default.parallelism, 
> spark.serializer, using G1, But the runtime  is still not reduced. 
> And is there any advices for me to tuning this performance?
> Thank you.
Wenting He.



How to tune the performance of Tpch query5 within Spark

2017-07-14 Thread 163
I modify the tech query5 to DataFrame:
val forders = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders 
”).filter("o_orderdate
 < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", "o_orderkey")
val flineitem = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem
 ")
val fcustomer = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer
 ")
val fsupplier = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier
 ")
val fregion = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region 
”).where("r_name = 
'ASIA'").select($"r_regionkey")
val fnation = 
spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation 
”)
val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
val res =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
 .join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
 .join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && $"c_nationkey" 
=== fsupplier("s_nationkey"))
 .join(fnation, $"s_nationkey" === fnation("n_nationkey"))
 .join(fregion, $"n_regionkey" === fregion("r_regionkey"))
 .select($"n_name", decrease($"l_extendedprice", $"l_discount").as("value"))
 .groupBy($"n_name")
 .agg(sum($"value").as("revenue"))
 .sort($"revenue".desc).show()

My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), each 
with 40 cores and 128GB memory.  TPCH 100G stored on HDFS using parquet format.
It executed about 1.5m, I found that read these 6 tables using 
spark.read.parqeut is sequential, How can I made this to run parallelly ?
 I’ve already set data locality and spark.default.parallelism, 
spark.serializer, using G1, But the runtime  is still not reduced. 
And is there any advices for me to tuning this performance?
Thank you.

Wenting He



Re: How to tune the performance of Tpch query5 within Spark

2017-07-14 Thread Wenchen Fan
Try to replace your UDF with Spark built-in expressions, it should be as simple 
as `$”x” * (lit(1) - $”y”)`.

> On 14 Jul 2017, at 5:46 PM, 163  wrote:
> 
> I modify the tech query5 to DataFrame:
> val forders = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/orders
>  
> ”).filter("o_orderdate
>  < 1995-01-01 and o_orderdate >= 1994-01-01").select("o_custkey", 
> "o_orderkey")
> val flineitem = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/lineitem
>  ")
> val fcustomer = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/customer
>  ")
> val fsupplier = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/supplier
>  ")
> val fregion = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/region
>  
> ”).where("r_name 
> = 'ASIA'").select($"r_regionkey")
> val fnation = 
> spark.read.parquet("hdfs://dell127:20500/SparkParquetDoubleTimestamp100G/nation
>  ”)
> val decrease = udf { (x: Double, y: Double) => x * (1 - y) }
> val res =   flineitem.join(forders, $"l_orderkey" === forders("o_orderkey"))
>  .join(fcustomer, $"o_custkey" === fcustomer("c_custkey"))
>  .join(fsupplier, $"l_suppkey" === fsupplier("s_suppkey") && 
> $"c_nationkey" === fsupplier("s_nationkey"))
>  .join(fnation, $"s_nationkey" === fnation("n_nationkey"))
>  .join(fregion, $"n_regionkey" === fregion("r_regionkey"))
>  .select($"n_name", decrease($"l_extendedprice", 
> $"l_discount").as("value"))
>  .groupBy($"n_name")
>  .agg(sum($"value").as("revenue"))
>  .sort($"revenue".desc).show()
> 
> My environment is one master(Hdfs-namenode), four workers(HDFS-datanode), 
> each with 40 cores and 128GB memory.  TPCH 100G stored on HDFS using parquet 
> format.
> It executed about 1.5m, I found that read these 6 tables using 
> spark.read.parqeut is sequential, How can I made this to run parallelly ?
>  I’ve already set data locality and spark.default.parallelism, 
> spark.serializer, using G1, But the runtime  is still not reduced. 
> And is there any advices for me to tuning this performance?
> Thank you.
> 
> Wenting He
> 



Fwd: Testing Apache Spark with JDK 9 Early Access builds

2017-07-14 Thread Matei Zaharia
FYI, the JDK group at Oracle is reaching out to see whether anyone
wants to test with JDK 9 and give them feedback. Just contact them
directly if you'd like to.


-- Forwarded message --
From: dalibor topic 
Date: Wed, Jul 12, 2017 at 3:16 AM
Subject: Testing Apache Spark with JDK 9 Early Access builds
To: ma...@cs.stanford.edu
Cc: Rory O'Donnell 


Hi Matei,

As part of evaluating how far along various popular open source
projects are regarding testing with upcoming JDK releases, I thought
I'd reach out to you about adding your projects to the Quality
Outreach [1][2] effort that Rory (CC:ed, as the OpenJDK Quality Group
Lead) leads.

Through that effort, we're trying to encourage more community testing
of JDK Early Access (EA) builds, and assist those projects that
participate in filing, tracking and (hopefully) resolving issues they
find along the way. Currently, about 80 FOSS projects participate in
the effort.

I'm curious you have had a chance to test your projects with JDK 9, if
you have run into any showstopper issues, and if so, if you have filed
any issues against the JDK discovered while testing with JDK 9 (or an
earlier release).

Last but not least, I'd be curious if you'd be interested in joining
the Quality Outreach effort with your projects. Rory can fill you in
on the details of how it all works.

cheers,
dalibor topic

[1] https://wiki.openjdk.java.net/display/quality/Quality+Outreach
[2] 
https://wiki.openjdk.java.net/download/attachments/21430310/TheWisdomOfCrowdTestingOpenJDK.pdf
--
 Dalibor Topic | Principal Product Manager
Phone: +494089091214  | Mobile: +491737185961


ORACLE Deutschland B.V. & Co. KG | Kühnehöfe 5 | 22761 Hamburg

ORACLE Deutschland B.V. & Co. KG
Hauptverwaltung: Riesstr. 25, D-80992 München
Registergericht: Amtsgericht München, HRA 95603

Komplementärin: ORACLE Deutschland Verwaltung B.V.
Hertogswetering 163/167, 3543 AS Utrecht, Niederlande
Handelsregister der Handelskammer Midden-Niederlande, Nr. 30143697
Geschäftsführer: Alexander van der Ven, Jan Schultheiss, Val Maher

 Oracle is committed to developing
practices and products that help protect the environment

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Testing Apache Spark with JDK 9 Early Access builds

2017-07-14 Thread Sean Owen
IIRC Scala 2.11 doesn't work on Java 9, and full support will come in 2.13.
I think that may be the biggest gating factor for Spark. At least, we can
get going on 2.12 support now that 2.10 support is dropped.

On Fri, Jul 14, 2017 at 5:23 PM Matei Zaharia 
wrote:

> FYI, the JDK group at Oracle is reaching out to see whether anyone
> wants to test with JDK 9 and give them feedback. Just contact them
> directly if you'd like to.
>
>
> -- Forwarded message --
> From: dalibor topic 
> Date: Wed, Jul 12, 2017 at 3:16 AM
> Subject: Testing Apache Spark with JDK 9 Early Access builds
> To: ma...@cs.stanford.edu
> Cc: Rory O'Donnell 
>
>
> Hi Matei,
>
> As part of evaluating how far along various popular open source
> projects are regarding testing with upcoming JDK releases, I thought
> I'd reach out to you about adding your projects to the Quality
> Outreach [1][2] effort that Rory (CC:ed, as the OpenJDK Quality Group
> Lead) leads.
>
> Through that effort, we're trying to encourage more community testing
> of JDK Early Access (EA) builds, and assist those projects that
> participate in filing, tracking and (hopefully) resolving issues they
> find along the way. Currently, about 80 FOSS projects participate in
> the effort.
>
> I'm curious you have had a chance to test your projects with JDK 9, if
> you have run into any showstopper issues, and if so, if you have filed
> any issues against the JDK discovered while testing with JDK 9 (or an
> earlier release).
>
> Last but not least, I'd be curious if you'd be interested in joining
> the Quality Outreach effort with your projects. Rory can fill you in
> on the details of how it all works.
>
> cheers,
> dalibor topic
>
> [1] https://wiki.openjdk.java.net/display/quality/Quality+Outreach
> [2]
> https://wiki.openjdk.java.net/download/attachments/21430310/TheWisdomOfCrowdTestingOpenJDK.pdf
> --
>  Dalibor Topic | Principal Product Manager
> Phone: +494089091214 <+49%2040%2089091214>  <+49%2040%2089091214>> | Mobile: +491737185961 <+49%20173%207185961>
> >
>
> ORACLE Deutschland B.V. & Co. KG | Kühnehöfe 5 | 22761 Hamburg
>
> ORACLE Deutschland B.V. & Co. KG
> Hauptverwaltung: Riesstr. 25, D-80992 München
> Registergericht: Amtsgericht München, HRA 95603
>
> Komplementärin: ORACLE Deutschland Verwaltung B.V.
> Hertogswetering 163/167, 3543 AS Utrecht, Niederlande
> Handelsregister der Handelskammer Midden-Niederlande, Nr. 30143697
> Geschäftsführer: Alexander van der Ven, Jan Schultheiss, Val Maher
>
>  Oracle is committed to developing
> practices and products that help protect the environment
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>