[ https://issues.apache.org/jira/browse/SPARK-17211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15453541#comment-15453541 ]
gurmukh singh edited comment on SPARK-17211 at 8/31/16 10:24 PM: ----------------------------------------------------------------- Hi I can see this in Apache Spark 2.0 as well, running with same node configurations as mentioned above. Apache Hadoop 2.72., Spark 2.0: ------------------------------------------- [hadoop@sp1 ~]$ hadoop version Hadoop 2.7.2 Subversion Unknown -r Unknown Compiled by root on 2016-05-16T03:56Z Compiled with protoc 2.5.0 >From source with checksum d0fda26633fa762bff87ec759ebe689c This command was run using /opt/cluster/hadoop-2.7.2/share/hadoop/common/hadoop-common-2.7.2.jar [hadoop@sp1 ~]$ spark-shell --version Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.0.0 /_/ Branch Compiled by user jenkins on 2016-07-19T21:16:09Z Revision [hadoop@sp1 hadoop]$ spark-shell --master yarn --deploy-mode client --num-executors 20 --executor-cores 2 --executor-memory 12g --driver-memory 48g --conf spark.yarn.executor.memoryOverhead=4096 --conf spark.sql.shuffle.partitions=1024 --conf spark.yarn.maxAppAttempts=1 Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). 16/08/31 04:29:48 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16/08/31 04:29:49 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME. 16/08/31 04:30:19 WARN spark.SparkContext: Use an existing SparkContext, some configuration may not take effect. Spark context Web UI available at http://10.0.0.227:4040 Spark context available as 'sc' (master = yarn, app id = application_1472617754154_0001). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.0.0 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_101) Type in expressions to have them evaluated. Type :help for more information. scala> val a1 = Array((123,1),(234,2),(432,5)) a1: Array[(Int, Int)] = Array((123,1), (234,2), (432,5)) scala> val a2 = Array(("abc",1),("bcd",2),("dcb",5)) a2: Array[(String, Int)] = Array((abc,1), (bcd,2), (dcb,5)) scala> val df1 = sc.parallelize(a1).toDF("gid","id") df1: org.apache.spark.sql.DataFrame = [gid: int, id: int] scala> val df2 = sc.parallelize(a2).toDF("gname","id") df2: org.apache.spark.sql.DataFrame = [gname: string, id: int] scala> df1.join(df2,"id").show() +---+---+-----+ | id|gid|gname| +---+---+-----+ | 1|123| abc| | 2|234| bcd| | 5|432| dcb| +---+---+-----+ scala> val df2 = sc.parallelize(a2).toDF("gname","id") df2: org.apache.spark.sql.DataFrame = [gname: string, id: int] scala> df1.join(broadcast(df2),"id").show() +---+---+-----+ | id|gid|gname| +---+---+-----+ | 1|123| null| | 2|234| null| | 5|432| null| +---+---+-----+ scala> broadcast(df1).join(df2,"id").show() +---+---+-----+ | id|gid|gname| +---+---+-----+ | 0| 1| abc| | 0| 2| bcd| | 0| 5| dcb| +---+---+-----+ If I reduce the driver memory, this works as well. It works on Apache spark 1.6 As lot of things have changed in Spark 2.0, it needs to be looked upon. It should give error or OOM, instead of returning NULL or ZERO values. [~himanish] Although, it will be interesting to understand the use case that on a node with 61 GB, executing with driver memory=48GB, leaving just 12 GB for so many other things, when there are other overheads on the system. On DataBricks, are you running with same parameters ? was (Author: gurmukhd): Hi I can see this in Apache Spark 2.0 as well, running with same node configurations as mentioned above. Apache Hadoop 2.72., Spark 2.0: ------------------------------------------- [hadoop@sp1 ~]$ hadoop version Hadoop 2.7.2 Subversion Unknown -r Unknown Compiled by root on 2016-05-16T03:56Z Compiled with protoc 2.5.0 >From source with checksum d0fda26633fa762bff87ec759ebe689c This command was run using /opt/cluster/hadoop-2.7.2/share/hadoop/common/hadoop-common-2.7.2.jar [hadoop@sp1 ~]$ spark-shell --version Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.0.0 /_/ Branch Compiled by user jenkins on 2016-07-19T21:16:09Z Revision [hadoop@sp1 hadoop]$ spark-shell --master yarn --deploy-mode client --num-executors 20 --executor-cores 2 --executor-memory 12g --driver-memory 48g --conf spark.yarn.executor.memoryOverhead=4096 --conf spark.sql.shuffle.partitions=1024 --conf spark.yarn.maxAppAttempts=1 Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). 16/08/31 04:29:48 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16/08/31 04:29:49 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME. 16/08/31 04:30:19 WARN spark.SparkContext: Use an existing SparkContext, some configuration may not take effect. Spark context Web UI available at http://10.0.0.227:4040 Spark context available as 'sc' (master = yarn, app id = application_1472617754154_0001). Spark session available as 'spark'. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.0.0 /_/ Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_101) Type in expressions to have them evaluated. Type :help for more information. scala> val a1 = Array((123,1),(234,2),(432,5)) a1: Array[(Int, Int)] = Array((123,1), (234,2), (432,5)) scala> val a2 = Array(("abc",1),("bcd",2),("dcb",5)) a2: Array[(String, Int)] = Array((abc,1), (bcd,2), (dcb,5)) scala> val df1 = sc.parallelize(a1).toDF("gid","id") df1: org.apache.spark.sql.DataFrame = [gid: int, id: int] scala> val df2 = sc.parallelize(a2).toDF("gname","id") df2: org.apache.spark.sql.DataFrame = [gname: string, id: int] scala> df1.join(df2,"id").show() +---+---+-----+ | id|gid|gname| +---+---+-----+ | 1|123| abc| | 2|234| bcd| | 5|432| dcb| +---+---+-----+ scala> val df2 = sc.parallelize(a2).toDF("gname","id") df2: org.apache.spark.sql.DataFrame = [gname: string, id: int] scala> df1.join(broadcast(df2),"id").show() +---+---+-----+ | id|gid|gname| +---+---+-----+ | 1|123| null| | 2|234| null| | 5|432| null| +---+---+-----+ scala> broadcast(df1).join(df2,"id").show() +---+---+-----+ | id|gid|gname| +---+---+-----+ | 0| 1| abc| | 0| 2| bcd| | 0| 5| dcb| +---+---+-----+ If I reduce the driver memory, this works as well. It works on Apache spark 1.6 As lot of things have changed in Spark 2.0, it needs to be looked upon. It should give error or OOM, instead of returning NULL or ZERO values. [~himanish] Although, it will be interesting to understand the use case that on a node with 61 GB, executing with driver memory=48GB, leaving just 12 GB for so many other things, when there are other overheads on the system. > Broadcast join produces incorrect results on EMR with large driver memory > ------------------------------------------------------------------------- > > Key: SPARK-17211 > URL: https://issues.apache.org/jira/browse/SPARK-17211 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.0.0 > Reporter: Jarno Seppanen > > Broadcast join produces incorrect columns in join result, see below for an > example. The same join but without using broadcast gives the correct columns. > Running PySpark on YARN on Amazon EMR 5.0.0. > {noformat} > import pyspark.sql.functions as func > keys = [ > (54000000, 0), > (54000001, 1), > (54000002, 2), > ] > keys_df = spark.createDataFrame(keys, ['key_id', 'value']).coalesce(1) > keys_df.show() > # +--------+-----+ > # | key_id|value| > # +--------+-----+ > # |54000000| 0| > # |54000001| 1| > # |54000002| 2| > # +--------+-----+ > data = [ > (54000002, 1), > (54000000, 2), > (54000001, 3), > ] > data_df = spark.createDataFrame(data, ['key_id', 'foo']) > data_df.show() > # +--------+---+ > > # | key_id|foo| > # +--------+---+ > # |54000002| 1| > # |54000000| 2| > # |54000001| 3| > # +--------+---+ > ### INCORRECT ### > data_df.join(func.broadcast(keys_df), 'key_id').show() > # +--------+---+--------+ > > # | key_id|foo| value| > # +--------+---+--------+ > # |54000002| 1|54000002| > # |54000000| 2|54000000| > # |54000001| 3|54000001| > # +--------+---+--------+ > ### CORRECT ### > data_df.join(keys_df, 'key_id').show() > # +--------+---+-----+ > # | key_id|foo|value| > # +--------+---+-----+ > # |54000000| 2| 0| > # |54000001| 3| 1| > # |54000002| 1| 2| > # +--------+---+-----+ > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org