[jira] [Commented] (SPARK-10309) Some tasks failed with Unable to acquire memory
[ https://issues.apache.org/jira/browse/SPARK-10309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14996818#comment-14996818 ] Tamas Szuromi commented on SPARK-10309: --- The current 1.6 branch also looks good. > Some tasks failed with Unable to acquire memory > --- > > Key: SPARK-10309 > URL: https://issues.apache.org/jira/browse/SPARK-10309 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0 >Reporter: Davies Liu >Assignee: Davies Liu > > *=== Update ===* > This is caused by a mismatch between > `Runtime.getRuntime.availableProcessors()` and the number of active tasks in > `ShuffleMemoryManager`. A quick reproduction is the following: > {code} > // My machine only has 8 cores > $ bin/spark-shell --master local[32] > scala> val df = sc.parallelize(Seq((1, 1), (2, 2))).toDF("a", "b") > scala> df.as("x").join(df.as("y"), $"x.a" === $"y.a").count() > Caused by: java.io.IOException: Unable to acquire 2097152 bytes of memory > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:138) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:106) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:68) > at > org.apache.spark.sql.execution.TungstenSort.org$apache$spark$sql$execution$TungstenSort$$preparePartition$1(sort.scala:120) > at > org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$2.apply(sort.scala:143) > at > org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$2.apply(sort.scala:143) > at > org.apache.spark.rdd.MapPartitionsWithPreparationRDD.prepare(MapPartitionsWithPreparationRDD.scala:50) > {code} > *=== Original ===* > While running Q53 of TPCDS (scale = 1500) on 24 nodes cluster (12G memory on > executor): > {code} > java.io.IOException: Unable to acquire 33554432 bytes of memory > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:138) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:106) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:68) > at > org.apache.spark.sql.execution.TungstenSort.org$apache$spark$sql$execution$TungstenSort$$preparePartition$1(sort.scala:146) > at > org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169) > at > org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169) > at > org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:45) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {code} > The task could finished after retry. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10309) Some tasks failed with Unable to acquire memory
[ https://issues.apache.org/jira/browse/SPARK-10309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14996813#comment-14996813 ] Tamas Szuromi commented on SPARK-10309: --- it's working for me > Some tasks failed with Unable to acquire memory > --- > > Key: SPARK-10309 > URL: https://issues.apache.org/jira/browse/SPARK-10309 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0 >Reporter: Davies Liu >Assignee: Davies Liu > > *=== Update ===* > This is caused by a mismatch between > `Runtime.getRuntime.availableProcessors()` and the number of active tasks in > `ShuffleMemoryManager`. A quick reproduction is the following: > {code} > // My machine only has 8 cores > $ bin/spark-shell --master local[32] > scala> val df = sc.parallelize(Seq((1, 1), (2, 2))).toDF("a", "b") > scala> df.as("x").join(df.as("y"), $"x.a" === $"y.a").count() > Caused by: java.io.IOException: Unable to acquire 2097152 bytes of memory > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:138) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:106) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:68) > at > org.apache.spark.sql.execution.TungstenSort.org$apache$spark$sql$execution$TungstenSort$$preparePartition$1(sort.scala:120) > at > org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$2.apply(sort.scala:143) > at > org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$2.apply(sort.scala:143) > at > org.apache.spark.rdd.MapPartitionsWithPreparationRDD.prepare(MapPartitionsWithPreparationRDD.scala:50) > {code} > *=== Original ===* > While running Q53 of TPCDS (scale = 1500) on 24 nodes cluster (12G memory on > executor): > {code} > java.io.IOException: Unable to acquire 33554432 bytes of memory > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:138) > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:106) > at > org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:68) > at > org.apache.spark.sql.execution.TungstenSort.org$apache$spark$sql$execution$TungstenSort$$preparePartition$1(sort.scala:146) > at > org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169) > at > org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169) > at > org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:45) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) > at org.apache.spark.scheduler.Task.run(Task.scala:88) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > {code} > The task could finished after retry. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10309) Some tasks failed with Unable to acquire memory
[ https://issues.apache.org/jira/browse/SPARK-10309?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14974381#comment-14974381 ] Tamas Szuromi commented on SPARK-10309: --- I guess same issue here also. {code} 15/10/26 15:11:33 INFO UnsafeExternalSorter: Thread 4524 spilling sort data of 64.0 KB to disk (0 time so far) 15/10/26 15:11:33 INFO Executor: Executor is trying to kill task 135.0 in stage 394.0 (TID 11069) 15/10/26 15:11:33 INFO UnsafeExternalSorter: Thread 4607 spilling sort data of 64.0 KB to disk (0 time so far) 15/10/26 15:11:33 ERROR Executor: Managed memory leak detected; size = 67108864 bytes, TID = 11149 15/10/26 15:11:33 ERROR Executor: Exception in task 92.3 in stage 394.0 (TID 11149) java.io.IOException: Unable to acquire 67108864 bytes of memory at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.(UnsafeExternalSorter.java:138) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.create(UnsafeExternalSorter.java:106) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.(UnsafeExternalRowSorter.java:68) at org.apache.spark.sql.execution.TungstenSort.org$apache$spark$sql$execution$TungstenSort$$preparePartition$1(sort.scala:146) at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169) at org.apache.spark.sql.execution.TungstenSort$$anonfun$doExecute$3.apply(sort.scala:169) at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.prepare(MapPartitionsWithPreparationRDD.scala:50) at org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:83) at org.apache.spark.rdd.ZippedPartitionsBaseRDD$$anonfun$tryPrepareParents$1.applyOrElse(ZippedPartitionsRDD.scala:82) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at scala.collection.TraversableLike$$anonfun$collect$1.apply(TraversableLike.scala:278) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.collect(TraversableLike.scala:278) at scala.collection.AbstractTraversable.collect(Traversable.scala:105) at org.apache.spark.rdd.ZippedPartitionsBaseRDD.tryPrepareParents(ZippedPartitionsRDD.scala:82) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:97) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:88) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {code} > Some tasks failed with Unable to acquire memory > --- > > Key: SPARK-10309 > URL: https://issues.apache.org/jira/browse/SPARK-10309 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0 >Reporter: Davies Liu > > *=== Update ===* > This is caused by a mismatch between > `Runtime.getRuntime.availableProcessors()` and the number of active tasks in > `ShuffleMemoryManager`. A quick reproduction is the following: > {code} > // My machine only has 8 cores > $ bin/spark-shell --master local[32] > scala> val df = sc.parallelize(Seq((1, 1), (2, 2))).toDF("a", "b") > scala> df.as("x").join(df.as("y"), $"x.a" === $"y.a").count() > Caused by: java.io.IOException: Unable to acquire 2097152 bytes of memory > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:351) > at
[jira] [Commented] (SPARK-10896) Parquet join issue
[ https://issues.apache.org/jira/browse/SPARK-10896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14944862#comment-14944862 ] Tamas Szuromi commented on SPARK-10896: --- It was an environmental issue. > Parquet join issue > -- > > Key: SPARK-10896 > URL: https://issues.apache.org/jira/browse/SPARK-10896 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0, 1.5.1 > Environment: spark-1.5.0-bin-hadoop2.6.tgz with HDP 2.3 >Reporter: Tamas Szuromi > Labels: dataframe, hdfs, join, parquet, sql > > After loading parquet files join is not working. > How to reproduce: > {code:java} > import org.apache.spark.sql._ > import org.apache.spark.sql.types._ > val arr1 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), > Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), > Row.apply(7, 7)) > val schema1 = StructType( > StructField("id", IntegerType) :: > StructField("value1", IntegerType) :: Nil) > val df1 = sqlContext.createDataFrame(sc.parallelize(arr1), schema1) > val arr2 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), > Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), > Row.apply(7, 7)) > val schema2 = StructType( > StructField("otherId", IntegerType) :: > StructField("value2", IntegerType) :: Nil) > val df2 = sqlContext.createDataFrame(sc.parallelize(arr2), schema2) > val res = df1.join(df2, df1("id")===df2("otherId")) > df1.take(10) > df2.take(10) > res.count() > res.take(10) > df1.write.format("parquet").save("hdfs:///tmp/df1") > df2.write.format("parquet").save("hdfs:///tmp/df2") > val df1=sqlContext.read.parquet("hdfs:///tmp/df1/*.parquet") > val df2=sqlContext.read.parquet("hdfs:///tmp/df2/*.parquet") > val res = df1.join(df2, df1("id")===df2("otherId")) > df1.take(10) > df2.take(10) > res.count() > res.take(10) > {code} > Output > {code:java} > Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], > [5,5], [6,6], [7,7]) > Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], > [5,5], [6,6], [7,7]) > Long = 8 > Array[org.apache.spark.sql.Row] = Array([0,0,0,0], [1,1,1,1], [2,2,2,2], > [3,3,3,3], [4,4,4,4], [5,5,5,5], [6,6,6,6], [7,7,7,7]) > {code} > After reading back: > {code:java} > Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], > [5,5], [6,6], [7,7]) > Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], > [5,5], [6,6], [7,7]) > Long = 4 > Array[org.apache.spark.sql.Row] = Array([0,0,0,5], [2,2,2,null], [4,4,4,5], > [6,6,6,null]) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-10896) Parquet join issue
[ https://issues.apache.org/jira/browse/SPARK-10896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tamas Szuromi closed SPARK-10896. - Resolution: Not A Problem > Parquet join issue > -- > > Key: SPARK-10896 > URL: https://issues.apache.org/jira/browse/SPARK-10896 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0, 1.5.1 > Environment: spark-1.5.0-bin-hadoop2.6.tgz with HDP 2.3 >Reporter: Tamas Szuromi > Labels: dataframe, hdfs, join, parquet, sql > > After loading parquet files join is not working. > How to reproduce: > {code:java} > import org.apache.spark.sql._ > import org.apache.spark.sql.types._ > val arr1 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), > Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), > Row.apply(7, 7)) > val schema1 = StructType( > StructField("id", IntegerType) :: > StructField("value1", IntegerType) :: Nil) > val df1 = sqlContext.createDataFrame(sc.parallelize(arr1), schema1) > val arr2 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), > Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), > Row.apply(7, 7)) > val schema2 = StructType( > StructField("otherId", IntegerType) :: > StructField("value2", IntegerType) :: Nil) > val df2 = sqlContext.createDataFrame(sc.parallelize(arr2), schema2) > val res = df1.join(df2, df1("id")===df2("otherId")) > df1.take(10) > df2.take(10) > res.count() > res.take(10) > df1.write.format("parquet").save("hdfs:///tmp/df1") > df2.write.format("parquet").save("hdfs:///tmp/df2") > val df1=sqlContext.read.parquet("hdfs:///tmp/df1/*.parquet") > val df2=sqlContext.read.parquet("hdfs:///tmp/df2/*.parquet") > val res = df1.join(df2, df1("id")===df2("otherId")) > df1.take(10) > df2.take(10) > res.count() > res.take(10) > {code} > Output > {code:java} > Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], > [5,5], [6,6], [7,7]) > Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], > [5,5], [6,6], [7,7]) > Long = 8 > Array[org.apache.spark.sql.Row] = Array([0,0,0,0], [1,1,1,1], [2,2,2,2], > [3,3,3,3], [4,4,4,4], [5,5,5,5], [6,6,6,6], [7,7,7,7]) > {code} > After reading back: > {code:java} > Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], > [5,5], [6,6], [7,7]) > Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], > [5,5], [6,6], [7,7]) > Long = 4 > Array[org.apache.spark.sql.Row] = Array([0,0,0,5], [2,2,2,null], [4,4,4,5], > [6,6,6,null]) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-10896) Parquet join issue
[ https://issues.apache.org/jira/browse/SPARK-10896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tamas Szuromi updated SPARK-10896: -- Fix Version/s: 1.6.0 1.5.1 > Parquet join issue > -- > > Key: SPARK-10896 > URL: https://issues.apache.org/jira/browse/SPARK-10896 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0, 1.5.1 > Environment: spark-1.5.0-bin-hadoop2.6.tgz with HDP 2.3 >Reporter: Tamas Szuromi > Labels: dataframe, hdfs, join, parquet, sql > Fix For: 1.5.1, 1.6.0 > > > After loading parquet files join is not working. > How to reproduce: > {code:java} > import org.apache.spark.sql._ > import org.apache.spark.sql.types._ > val arr1 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), > Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), > Row.apply(7, 7)) > val schema1 = StructType( > StructField("id", IntegerType) :: > StructField("value1", IntegerType) :: Nil) > val df1 = sqlContext.createDataFrame(sc.parallelize(arr1), schema1) > val arr2 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), > Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), > Row.apply(7, 7)) > val schema2 = StructType( > StructField("otherId", IntegerType) :: > StructField("value2", IntegerType) :: Nil) > val df2 = sqlContext.createDataFrame(sc.parallelize(arr2), schema2) > val res = df1.join(df2, df1("id")===df2("otherId")) > df1.take(10) > df2.take(10) > res.count() > res.take(10) > df1.write.format("parquet").save("hdfs:///tmp/df1") > df2.write.format("parquet").save("hdfs:///tmp/df2") > val df1=sqlContext.read.parquet("hdfs:///tmp/df1/*.parquet") > val df2=sqlContext.read.parquet("hdfs:///tmp/df2/*.parquet") > val res = df1.join(df2, df1("id")===df2("otherId")) > df1.take(10) > df2.take(10) > res.count() > res.take(10) > {code} > Output > {code:java} > Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], > [5,5], [6,6], [7,7]) > Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], > [5,5], [6,6], [7,7]) > Long = 8 > Array[org.apache.spark.sql.Row] = Array([0,0,0,0], [1,1,1,1], [2,2,2,2], > [3,3,3,3], [4,4,4,4], [5,5,5,5], [6,6,6,6], [7,7,7,7]) > {code} > After reading back: > {code:java} > Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], > [5,5], [6,6], [7,7]) > Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], > [5,5], [6,6], [7,7]) > Long = 4 > Array[org.apache.spark.sql.Row] = Array([0,0,0,5], [2,2,2,null], [4,4,4,5], > [6,6,6,null]) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-10896) Parquet join issue
[ https://issues.apache.org/jira/browse/SPARK-10896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tamas Szuromi updated SPARK-10896: -- Fix Version/s: (was: 1.5.1) (was: 1.6.0) > Parquet join issue > -- > > Key: SPARK-10896 > URL: https://issues.apache.org/jira/browse/SPARK-10896 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0, 1.5.1 > Environment: spark-1.5.0-bin-hadoop2.6.tgz with HDP 2.3 >Reporter: Tamas Szuromi > Labels: dataframe, hdfs, join, parquet, sql > > After loading parquet files join is not working. > How to reproduce: > {code:java} > import org.apache.spark.sql._ > import org.apache.spark.sql.types._ > val arr1 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), > Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), > Row.apply(7, 7)) > val schema1 = StructType( > StructField("id", IntegerType) :: > StructField("value1", IntegerType) :: Nil) > val df1 = sqlContext.createDataFrame(sc.parallelize(arr1), schema1) > val arr2 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), > Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), > Row.apply(7, 7)) > val schema2 = StructType( > StructField("otherId", IntegerType) :: > StructField("value2", IntegerType) :: Nil) > val df2 = sqlContext.createDataFrame(sc.parallelize(arr2), schema2) > val res = df1.join(df2, df1("id")===df2("otherId")) > df1.take(10) > df2.take(10) > res.count() > res.take(10) > df1.write.format("parquet").save("hdfs:///tmp/df1") > df2.write.format("parquet").save("hdfs:///tmp/df2") > val df1=sqlContext.read.parquet("hdfs:///tmp/df1/*.parquet") > val df2=sqlContext.read.parquet("hdfs:///tmp/df2/*.parquet") > val res = df1.join(df2, df1("id")===df2("otherId")) > df1.take(10) > df2.take(10) > res.count() > res.take(10) > {code} > Output > {code:java} > Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], > [5,5], [6,6], [7,7]) > Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], > [5,5], [6,6], [7,7]) > Long = 8 > Array[org.apache.spark.sql.Row] = Array([0,0,0,0], [1,1,1,1], [2,2,2,2], > [3,3,3,3], [4,4,4,4], [5,5,5,5], [6,6,6,6], [7,7,7,7]) > {code} > After reading back: > {code:java} > Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], > [5,5], [6,6], [7,7]) > Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], > [5,5], [6,6], [7,7]) > Long = 4 > Array[org.apache.spark.sql.Row] = Array([0,0,0,5], [2,2,2,null], [4,4,4,5], > [6,6,6,null]) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-10896) Parquet join issue
[ https://issues.apache.org/jira/browse/SPARK-10896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tamas Szuromi updated SPARK-10896: -- Affects Version/s: 1.5.1 > Parquet join issue > -- > > Key: SPARK-10896 > URL: https://issues.apache.org/jira/browse/SPARK-10896 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0, 1.5.1 > Environment: spark-1.5.0-bin-hadoop2.6.tgz with HDP 2.3 >Reporter: Tamas Szuromi > Labels: dataframe, hdfs, join, parquet, sql > > After loading parquet files join is not working. > How to reproduce: > {code:java} > import org.apache.spark.sql._ > import org.apache.spark.sql.types._ > val arr1 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), > Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), > Row.apply(7, 7)) > val schema1 = StructType( > StructField("id", IntegerType) :: > StructField("value1", IntegerType) :: Nil) > val df1 = sqlContext.createDataFrame(sc.parallelize(arr1), schema1) > val arr2 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), > Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), > Row.apply(7, 7)) > val schema2 = StructType( > StructField("otherId", IntegerType) :: > StructField("value2", IntegerType) :: Nil) > val df2 = sqlContext.createDataFrame(sc.parallelize(arr2), schema2) > val res = df1.join(df2, df1("id")===df2("otherId")) > df1.take(10) > df2.take(10) > res.count() > res.take(10) > df1.write.format("parquet").save("hdfs:///tmp/df1") > df2.write.format("parquet").save("hdfs:///tmp/df2") > val df1=sqlContext.read.parquet("hdfs:///tmp/df1/*.parquet") > val df2=sqlContext.read.parquet("hdfs:///tmp/df2/*.parquet") > val res = df1.join(df2, df1("id")===df2("otherId")) > df1.take(10) > df2.take(10) > res.count() > res.take(10) > {code} > Output > {code:java} > Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], > [5,5], [6,6], [7,7]) > Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], > [5,5], [6,6], [7,7]) > Long = 8 > Array[org.apache.spark.sql.Row] = Array([0,0,0,0], [1,1,1,1], [2,2,2,2], > [3,3,3,3], [4,4,4,4], [5,5,5,5], [6,6,6,6], [7,7,7,7]) > {code} > After reading back: > {code:java} > Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], > [5,5], [6,6], [7,7]) > Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], > [5,5], [6,6], [7,7]) > Long = 4 > Array[org.apache.spark.sql.Row] = Array([0,0,0,5], [2,2,2,null], [4,4,4,5], > [6,6,6,null]) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-10896) Parquet join issue
Tamas Szuromi created SPARK-10896: - Summary: Parquet join issue Key: SPARK-10896 URL: https://issues.apache.org/jira/browse/SPARK-10896 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.5.0 Environment: spark-1.5.0-bin-hadoop2.6.tgz with HDP 2.3 Reporter: Tamas Szuromi After loading parquet files join is not working. How to reproduce: import org.apache.spark.sql._ import org.apache.spark.sql.types._ val arr1 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), Row.apply(7, 7)) val schema1 = StructType( StructField("id", IntegerType) :: StructField("value1", IntegerType) :: Nil) val df1 = sqlContext.createDataFrame(sc.parallelize(arr1), schema1) val arr2 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), Row.apply(7, 7)) val schema2 = StructType( StructField("otherId", IntegerType) :: StructField("value2", IntegerType) :: Nil) val df2 = sqlContext.createDataFrame(sc.parallelize(arr2), schema2) val res = df1.join(df2, df1("id")===df2("otherId")) df1.take(10) df2.take(10) res.count() res.take(10) df1.write.format("parquet").save("hdfs://10.1.1.235/tmp/df1") df2.write.format("parquet").save("hdfs://10.1.1.235/tmp/df2") val df1=sqlContext.read.parquet("hdfs://10.1.1.235/tmp/df1/*.parquet") val df2=sqlContext.read.parquet("hdfs://10.1.1.235/tmp/df2/*.parquet") val res = df1.join(df2, df1("id")===df2("otherId")) df1.take(10) df2.take(10) res.count() res.take(10) ### Output Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Long = 8 Array[org.apache.spark.sql.Row] = Array([0,0,0,0], [1,1,1,1], [2,2,2,2], [3,3,3,3], [4,4,4,4], [5,5,5,5], [6,6,6,6], [7,7,7,7]) After reading back: Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Long = 4 Array[org.apache.spark.sql.Row] = Array([0,0,0,5], [2,2,2,null], [4,4,4,5], [6,6,6,null]) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-10896) Parquet join issue
[ https://issues.apache.org/jira/browse/SPARK-10896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tamas Szuromi updated SPARK-10896: -- Description: After loading parquet files join is not working. How to reproduce: {code:scala} import org.apache.spark.sql._ import org.apache.spark.sql.types._ val arr1 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), Row.apply(7, 7)) val schema1 = StructType( StructField("id", IntegerType) :: StructField("value1", IntegerType) :: Nil) val df1 = sqlContext.createDataFrame(sc.parallelize(arr1), schema1) val arr2 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), Row.apply(7, 7)) val schema2 = StructType( StructField("otherId", IntegerType) :: StructField("value2", IntegerType) :: Nil) val df2 = sqlContext.createDataFrame(sc.parallelize(arr2), schema2) val res = df1.join(df2, df1("id")===df2("otherId")) df1.take(10) df2.take(10) res.count() res.take(10) df1.write.format("parquet").save("hdfs://10.1.1.235/tmp/df1") df2.write.format("parquet").save("hdfs://10.1.1.235/tmp/df2") val df1=sqlContext.read.parquet("hdfs://10.1.1.235/tmp/df1/*.parquet") val df2=sqlContext.read.parquet("hdfs://10.1.1.235/tmp/df2/*.parquet") val res = df1.join(df2, df1("id")===df2("otherId")) df1.take(10) df2.take(10) res.count() res.take(10) {code} Output {code:scala} Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Long = 8 Array[org.apache.spark.sql.Row] = Array([0,0,0,0], [1,1,1,1], [2,2,2,2], [3,3,3,3], [4,4,4,4], [5,5,5,5], [6,6,6,6], [7,7,7,7]) {code} After reading back: {code:scala} Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Long = 4 Array[org.apache.spark.sql.Row] = Array([0,0,0,5], [2,2,2,null], [4,4,4,5], [6,6,6,null]) {code} was: After loading parquet files join is not working. How to reproduce: import org.apache.spark.sql._ import org.apache.spark.sql.types._ val arr1 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), Row.apply(7, 7)) val schema1 = StructType( StructField("id", IntegerType) :: StructField("value1", IntegerType) :: Nil) val df1 = sqlContext.createDataFrame(sc.parallelize(arr1), schema1) val arr2 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), Row.apply(7, 7)) val schema2 = StructType( StructField("otherId", IntegerType) :: StructField("value2", IntegerType) :: Nil) val df2 = sqlContext.createDataFrame(sc.parallelize(arr2), schema2) val res = df1.join(df2, df1("id")===df2("otherId")) df1.take(10) df2.take(10) res.count() res.take(10) df1.write.format("parquet").save("hdfs://10.1.1.235/tmp/df1") df2.write.format("parquet").save("hdfs://10.1.1.235/tmp/df2") val df1=sqlContext.read.parquet("hdfs://10.1.1.235/tmp/df1/*.parquet") val df2=sqlContext.read.parquet("hdfs://10.1.1.235/tmp/df2/*.parquet") val res = df1.join(df2, df1("id")===df2("otherId")) df1.take(10) df2.take(10) res.count() res.take(10) ### Output Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Long = 8 Array[org.apache.spark.sql.Row] = Array([0,0,0,0], [1,1,1,1], [2,2,2,2], [3,3,3,3], [4,4,4,4], [5,5,5,5], [6,6,6,6], [7,7,7,7]) After reading back: Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Long = 4 Array[org.apache.spark.sql.Row] = Array([0,0,0,5], [2,2,2,null], [4,4,4,5], [6,6,6,null]) > Parquet join issue > -- > > Key: SPARK-10896 > URL: https://issues.apache.org/jira/browse/SPARK-10896 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0 > Environment: spark-1.5.0-bin-hadoop2.6.tgz with HDP 2.3 >Reporter: Tamas Szuromi > Labels: dataframe, hdfs, join, parquet, sql > > After loading parquet files join is not working. > How to reproduce: > {code:scala} > import org.apache.spark.sql._ > import org.apache.spark.sql.types._ > val arr1 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), > Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), > Row.apply(7, 7)) > val schema1 = StructType( >
[jira] [Updated] (SPARK-10896) Parquet join issue
[ https://issues.apache.org/jira/browse/SPARK-10896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tamas Szuromi updated SPARK-10896: -- Description: After loading parquet files join is not working. How to reproduce: {code:java} import org.apache.spark.sql._ import org.apache.spark.sql.types._ val arr1 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), Row.apply(7, 7)) val schema1 = StructType( StructField("id", IntegerType) :: StructField("value1", IntegerType) :: Nil) val df1 = sqlContext.createDataFrame(sc.parallelize(arr1), schema1) val arr2 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), Row.apply(7, 7)) val schema2 = StructType( StructField("otherId", IntegerType) :: StructField("value2", IntegerType) :: Nil) val df2 = sqlContext.createDataFrame(sc.parallelize(arr2), schema2) val res = df1.join(df2, df1("id")===df2("otherId")) df1.take(10) df2.take(10) res.count() res.take(10) df1.write.format("parquet").save("hdfs:///tmp/df1") df2.write.format("parquet").save("hdfs:///tmp/df2") val df1=sqlContext.read.parquet("hdfs:///tmp/df1/*.parquet") val df2=sqlContext.read.parquet("hdfs:///tmp/df2/*.parquet") val res = df1.join(df2, df1("id")===df2("otherId")) df1.take(10) df2.take(10) res.count() res.take(10) {code} Output {code:java} Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Long = 8 Array[org.apache.spark.sql.Row] = Array([0,0,0,0], [1,1,1,1], [2,2,2,2], [3,3,3,3], [4,4,4,4], [5,5,5,5], [6,6,6,6], [7,7,7,7]) {code} After reading back: {code:java} Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Long = 4 Array[org.apache.spark.sql.Row] = Array([0,0,0,5], [2,2,2,null], [4,4,4,5], [6,6,6,null]) {code} was: After loading parquet files join is not working. How to reproduce: {code:java} import org.apache.spark.sql._ import org.apache.spark.sql.types._ val arr1 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), Row.apply(7, 7)) val schema1 = StructType( StructField("id", IntegerType) :: StructField("value1", IntegerType) :: Nil) val df1 = sqlContext.createDataFrame(sc.parallelize(arr1), schema1) val arr2 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), Row.apply(7, 7)) val schema2 = StructType( StructField("otherId", IntegerType) :: StructField("value2", IntegerType) :: Nil) val df2 = sqlContext.createDataFrame(sc.parallelize(arr2), schema2) val res = df1.join(df2, df1("id")===df2("otherId")) df1.take(10) df2.take(10) res.count() res.take(10) df1.write.format("parquet").save("hdfs://10.1.1.235/tmp/df1") df2.write.format("parquet").save("hdfs://10.1.1.235/tmp/df2") val df1=sqlContext.read.parquet("hdfs://10.1.1.235/tmp/df1/*.parquet") val df2=sqlContext.read.parquet("hdfs://10.1.1.235/tmp/df2/*.parquet") val res = df1.join(df2, df1("id")===df2("otherId")) df1.take(10) df2.take(10) res.count() res.take(10) {code} Output {code:java} Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Long = 8 Array[org.apache.spark.sql.Row] = Array([0,0,0,0], [1,1,1,1], [2,2,2,2], [3,3,3,3], [4,4,4,4], [5,5,5,5], [6,6,6,6], [7,7,7,7]) {code} After reading back: {code:java} Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Long = 4 Array[org.apache.spark.sql.Row] = Array([0,0,0,5], [2,2,2,null], [4,4,4,5], [6,6,6,null]) {code} > Parquet join issue > -- > > Key: SPARK-10896 > URL: https://issues.apache.org/jira/browse/SPARK-10896 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0 > Environment: spark-1.5.0-bin-hadoop2.6.tgz with HDP 2.3 >Reporter: Tamas Szuromi > Labels: dataframe, hdfs, join, parquet, sql > > After loading parquet files join is not working. > How to reproduce: > {code:java} > import org.apache.spark.sql._ > import org.apache.spark.sql.types._ > val arr1 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), > Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), > Row.apply(7, 7)) > val schema1 = StructType( >
[jira] [Updated] (SPARK-10896) Parquet join issue
[ https://issues.apache.org/jira/browse/SPARK-10896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tamas Szuromi updated SPARK-10896: -- Description: After loading parquet files join is not working. How to reproduce: {code:java} import org.apache.spark.sql._ import org.apache.spark.sql.types._ val arr1 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), Row.apply(7, 7)) val schema1 = StructType( StructField("id", IntegerType) :: StructField("value1", IntegerType) :: Nil) val df1 = sqlContext.createDataFrame(sc.parallelize(arr1), schema1) val arr2 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), Row.apply(7, 7)) val schema2 = StructType( StructField("otherId", IntegerType) :: StructField("value2", IntegerType) :: Nil) val df2 = sqlContext.createDataFrame(sc.parallelize(arr2), schema2) val res = df1.join(df2, df1("id")===df2("otherId")) df1.take(10) df2.take(10) res.count() res.take(10) df1.write.format("parquet").save("hdfs://10.1.1.235/tmp/df1") df2.write.format("parquet").save("hdfs://10.1.1.235/tmp/df2") val df1=sqlContext.read.parquet("hdfs://10.1.1.235/tmp/df1/*.parquet") val df2=sqlContext.read.parquet("hdfs://10.1.1.235/tmp/df2/*.parquet") val res = df1.join(df2, df1("id")===df2("otherId")) df1.take(10) df2.take(10) res.count() res.take(10) {code} Output {code:java} Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Long = 8 Array[org.apache.spark.sql.Row] = Array([0,0,0,0], [1,1,1,1], [2,2,2,2], [3,3,3,3], [4,4,4,4], [5,5,5,5], [6,6,6,6], [7,7,7,7]) {code} After reading back: {code:java} Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Long = 4 Array[org.apache.spark.sql.Row] = Array([0,0,0,5], [2,2,2,null], [4,4,4,5], [6,6,6,null]) {code} was: After loading parquet files join is not working. How to reproduce: {code:scala} import org.apache.spark.sql._ import org.apache.spark.sql.types._ val arr1 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), Row.apply(7, 7)) val schema1 = StructType( StructField("id", IntegerType) :: StructField("value1", IntegerType) :: Nil) val df1 = sqlContext.createDataFrame(sc.parallelize(arr1), schema1) val arr2 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), Row.apply(7, 7)) val schema2 = StructType( StructField("otherId", IntegerType) :: StructField("value2", IntegerType) :: Nil) val df2 = sqlContext.createDataFrame(sc.parallelize(arr2), schema2) val res = df1.join(df2, df1("id")===df2("otherId")) df1.take(10) df2.take(10) res.count() res.take(10) df1.write.format("parquet").save("hdfs://10.1.1.235/tmp/df1") df2.write.format("parquet").save("hdfs://10.1.1.235/tmp/df2") val df1=sqlContext.read.parquet("hdfs://10.1.1.235/tmp/df1/*.parquet") val df2=sqlContext.read.parquet("hdfs://10.1.1.235/tmp/df2/*.parquet") val res = df1.join(df2, df1("id")===df2("otherId")) df1.take(10) df2.take(10) res.count() res.take(10) {code} Output {code:scala} Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Long = 8 Array[org.apache.spark.sql.Row] = Array([0,0,0,0], [1,1,1,1], [2,2,2,2], [3,3,3,3], [4,4,4,4], [5,5,5,5], [6,6,6,6], [7,7,7,7]) {code} After reading back: {code:scala} Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], [5,5], [6,6], [7,7]) Long = 4 Array[org.apache.spark.sql.Row] = Array([0,0,0,5], [2,2,2,null], [4,4,4,5], [6,6,6,null]) {code} > Parquet join issue > -- > > Key: SPARK-10896 > URL: https://issues.apache.org/jira/browse/SPARK-10896 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0 > Environment: spark-1.5.0-bin-hadoop2.6.tgz with HDP 2.3 >Reporter: Tamas Szuromi > Labels: dataframe, hdfs, join, parquet, sql > > After loading parquet files join is not working. > How to reproduce: > {code:java} > import org.apache.spark.sql._ > import org.apache.spark.sql.types._ > val arr1 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), > Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), >
[jira] [Commented] (SPARK-10896) Parquet join issue
[ https://issues.apache.org/jira/browse/SPARK-10896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14939938#comment-14939938 ] Tamas Szuromi commented on SPARK-10896: --- Working with file:/// and Spark 1.4.1 > Parquet join issue > -- > > Key: SPARK-10896 > URL: https://issues.apache.org/jira/browse/SPARK-10896 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.5.0 > Environment: spark-1.5.0-bin-hadoop2.6.tgz with HDP 2.3 >Reporter: Tamas Szuromi > Labels: dataframe, hdfs, join, parquet, sql > > After loading parquet files join is not working. > How to reproduce: > {code:java} > import org.apache.spark.sql._ > import org.apache.spark.sql.types._ > val arr1 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), > Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), > Row.apply(7, 7)) > val schema1 = StructType( > StructField("id", IntegerType) :: > StructField("value1", IntegerType) :: Nil) > val df1 = sqlContext.createDataFrame(sc.parallelize(arr1), schema1) > val arr2 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), > Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), > Row.apply(7, 7)) > val schema2 = StructType( > StructField("otherId", IntegerType) :: > StructField("value2", IntegerType) :: Nil) > val df2 = sqlContext.createDataFrame(sc.parallelize(arr2), schema2) > val res = df1.join(df2, df1("id")===df2("otherId")) > df1.take(10) > df2.take(10) > res.count() > res.take(10) > df1.write.format("parquet").save("hdfs:///tmp/df1") > df2.write.format("parquet").save("hdfs:///tmp/df2") > val df1=sqlContext.read.parquet("hdfs:///tmp/df1/*.parquet") > val df2=sqlContext.read.parquet("hdfs:///tmp/df2/*.parquet") > val res = df1.join(df2, df1("id")===df2("otherId")) > df1.take(10) > df2.take(10) > res.count() > res.take(10) > {code} > Output > {code:java} > Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], > [5,5], [6,6], [7,7]) > Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], > [5,5], [6,6], [7,7]) > Long = 8 > Array[org.apache.spark.sql.Row] = Array([0,0,0,0], [1,1,1,1], [2,2,2,2], > [3,3,3,3], [4,4,4,4], [5,5,5,5], [6,6,6,6], [7,7,7,7]) > {code} > After reading back: > {code:java} > Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], > [5,5], [6,6], [7,7]) > Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], > [5,5], [6,6], [7,7]) > Long = 4 > Array[org.apache.spark.sql.Row] = Array([0,0,0,5], [2,2,2,null], [4,4,4,5], > [6,6,6,null]) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org