[ 
https://issues.apache.org/jira/browse/SPARK-10896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tamas Szuromi updated SPARK-10896:
----------------------------------
    Description: 
After loading parquet files join is not working.
How to reproduce:
{code:java}
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val arr1 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), 
Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), 
Row.apply(7, 7))
val schema1 = StructType(
      StructField("id", IntegerType) ::
      StructField("value1", IntegerType) :: Nil)
val df1 = sqlContext.createDataFrame(sc.parallelize(arr1), schema1)
val arr2 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), 
Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), 
Row.apply(7, 7))
val schema2 = StructType(
      StructField("otherId", IntegerType) ::
      StructField("value2", IntegerType) :: Nil)
val df2 = sqlContext.createDataFrame(sc.parallelize(arr2), schema2)

val res = df1.join(df2, df1("id")===df2("otherId"))
df1.take(10)
df2.take(10)
res.count()
res.take(10)

df1.write.format("parquet").save("hdfs:///tmp/df1")
df2.write.format("parquet").save("hdfs:///tmp/df2")

val df1=sqlContext.read.parquet("hdfs:///tmp/df1/*.parquet")
val df2=sqlContext.read.parquet("hdfs:///tmp/df2/*.parquet")

val res = df1.join(df2, df1("id")===df2("otherId"))
df1.take(10)
df2.take(10)
res.count()
res.take(10)
{code}

Output

{code:java}
Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], 
[5,5], [6,6], [7,7]) 
Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], 
[5,5], [6,6], [7,7]) 
Long = 8 
Array[org.apache.spark.sql.Row] = Array([0,0,0,0], [1,1,1,1], [2,2,2,2], 
[3,3,3,3], [4,4,4,4], [5,5,5,5], [6,6,6,6], [7,7,7,7]) 
{code}

After reading back:

{code:java}
Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], 
[5,5], [6,6], [7,7]) 
Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], 
[5,5], [6,6], [7,7]) 
Long = 4 
Array[org.apache.spark.sql.Row] = Array([0,0,0,5], [2,2,2,null], [4,4,4,5], 
[6,6,6,null])
{code}

  was:
After loading parquet files join is not working.
How to reproduce:
{code:java}
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val arr1 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), 
Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), 
Row.apply(7, 7))
val schema1 = StructType(
      StructField("id", IntegerType) ::
      StructField("value1", IntegerType) :: Nil)
val df1 = sqlContext.createDataFrame(sc.parallelize(arr1), schema1)
val arr2 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), 
Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), 
Row.apply(7, 7))
val schema2 = StructType(
      StructField("otherId", IntegerType) ::
      StructField("value2", IntegerType) :: Nil)
val df2 = sqlContext.createDataFrame(sc.parallelize(arr2), schema2)

val res = df1.join(df2, df1("id")===df2("otherId"))
df1.take(10)
df2.take(10)
res.count()
res.take(10)

df1.write.format("parquet").save("hdfs://10.1.1.235/tmp/df1")
df2.write.format("parquet").save("hdfs://10.1.1.235/tmp/df2")

val df1=sqlContext.read.parquet("hdfs://10.1.1.235/tmp/df1/*.parquet")
val df2=sqlContext.read.parquet("hdfs://10.1.1.235/tmp/df2/*.parquet")

val res = df1.join(df2, df1("id")===df2("otherId"))
df1.take(10)
df2.take(10)
res.count()
res.take(10)
{code}

Output

{code:java}
Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], 
[5,5], [6,6], [7,7]) 
Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], 
[5,5], [6,6], [7,7]) 
Long = 8 
Array[org.apache.spark.sql.Row] = Array([0,0,0,0], [1,1,1,1], [2,2,2,2], 
[3,3,3,3], [4,4,4,4], [5,5,5,5], [6,6,6,6], [7,7,7,7]) 
{code}

After reading back:

{code:java}
Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], 
[5,5], [6,6], [7,7]) 
Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], 
[5,5], [6,6], [7,7]) 
Long = 4 
Array[org.apache.spark.sql.Row] = Array([0,0,0,5], [2,2,2,null], [4,4,4,5], 
[6,6,6,null])
{code}


> Parquet join issue
> ------------------
>
>                 Key: SPARK-10896
>                 URL: https://issues.apache.org/jira/browse/SPARK-10896
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.5.0
>         Environment: spark-1.5.0-bin-hadoop2.6.tgz with HDP 2.3
>            Reporter: Tamas Szuromi
>              Labels: dataframe, hdfs, join, parquet, sql
>
> After loading parquet files join is not working.
> How to reproduce:
> {code:java}
> import org.apache.spark.sql._
> import org.apache.spark.sql.types._
> val arr1 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), 
> Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), 
> Row.apply(7, 7))
> val schema1 = StructType(
>       StructField("id", IntegerType) ::
>       StructField("value1", IntegerType) :: Nil)
> val df1 = sqlContext.createDataFrame(sc.parallelize(arr1), schema1)
> val arr2 = Array[Row](Row.apply(0, 0), Row.apply(1,1), Row.apply(2,2), 
> Row.apply(3, 3), Row.apply(4, 4), Row.apply(5, 5), Row.apply(6, 6), 
> Row.apply(7, 7))
> val schema2 = StructType(
>       StructField("otherId", IntegerType) ::
>       StructField("value2", IntegerType) :: Nil)
> val df2 = sqlContext.createDataFrame(sc.parallelize(arr2), schema2)
> val res = df1.join(df2, df1("id")===df2("otherId"))
> df1.take(10)
> df2.take(10)
> res.count()
> res.take(10)
> df1.write.format("parquet").save("hdfs:///tmp/df1")
> df2.write.format("parquet").save("hdfs:///tmp/df2")
> val df1=sqlContext.read.parquet("hdfs:///tmp/df1/*.parquet")
> val df2=sqlContext.read.parquet("hdfs:///tmp/df2/*.parquet")
> val res = df1.join(df2, df1("id")===df2("otherId"))
> df1.take(10)
> df2.take(10)
> res.count()
> res.take(10)
> {code}
> Output
> {code:java}
> Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], 
> [5,5], [6,6], [7,7]) 
> Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], 
> [5,5], [6,6], [7,7]) 
> Long = 8 
> Array[org.apache.spark.sql.Row] = Array([0,0,0,0], [1,1,1,1], [2,2,2,2], 
> [3,3,3,3], [4,4,4,4], [5,5,5,5], [6,6,6,6], [7,7,7,7]) 
> {code}
> After reading back:
> {code:java}
> Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], 
> [5,5], [6,6], [7,7]) 
> Array[org.apache.spark.sql.Row] = Array([0,0], [1,1], [2,2], [3,3], [4,4], 
> [5,5], [6,6], [7,7]) 
> Long = 4 
> Array[org.apache.spark.sql.Row] = Array([0,0,0,5], [2,2,2,null], [4,4,4,5], 
> [6,6,6,null])
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to