Re: Fwd: Re: DataFrame equality does not working in 1.5.1

2015-11-06 Thread Seongduk Cheon
Hi, Michael

It works find.

scala> sqlContext.sql("SET
spark.sql.inMemoryColumnarStorage.partitionPruning=false")
res28: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> eventDF.filter($"entityType" ===
"user").select("entityId").distinct.count
res29: Long = 2091

Thank you so much for helping me.

2015-11-07 6:13 GMT+09:00 Michael Armbrust :

> In particular this is sounding like:
> https://issues.apache.org/jira/browse/SPARK-10859
>
> On Fri, Nov 6, 2015 at 1:05 PM, Michael Armbrust 
> wrote:
>
>> I would be great if you could try sql("SET
>> spark.sql.inMemoryColumnarStorage.partitionPruning=false") also, try Spark
>> 1.5.2-RC2
>> 
>>
>> On Fri, Nov 6, 2015 at 4:49 AM, Seongduk Cheon  wrote:
>>
>>> Hi Yanal!
>>>
>>> Yes, exactly. I read from csv file and convert to DF with column names.
>>>
>>> simply look like this.
>>>
>>> val eventDF = sc.textFile(eventFile).map(_.split(",")).filter(_.size >= 6)
>>>   .map { e =>  // To do sometings
>>>   }.toDF(eventTableColumns:_*).cache()
>>>
>>>
>>> The result of <=> function is
>>>
>>> scala> eventDF.filter($"entityType" <=>
>>> "user").select("entityId").distinct.count
>>> res25: Long = 2091
>>>
>>> As you mentioned, It seems related to nullable column.
>>> Using case class works as expected. It is one of the best workaround so
>>> far.
>>>
>>>
>>>
>>> 2015-11-06 19:01 GMT+09:00 Yanal Wazaefi :
>>>
 Hi Sondoku,

 Are you converting an event RDD using toDF("id", "event", "entityType",
 "entityId", "targetEntityType", "targetEntityId", "properties")
 function to get your eventDF ?

 Does <=> give you the correct result too (2091) ?
 eventDF.filter($"entityType" <=>
 "user").select("entityId").distinct.count

 I had the same problem with the DataFrame equality, using toDF("col1",
 "col2", ...) function.

 To resolve this problem (bug?), I used a *case class* and then I
 applied toDF() function.
 Something like that in your case:

 case class Event(id: String, event: String, entityType: String,
 entityId: String, targetEntityType: String, targetEntityId: String,
 properties: String)
 eventRDD.map{case (id, event, entityType, entityId, targetEntityType, 
 targetEntityId,
 properties) =>
 Event(id, event, entityType, entityId, targetEntityType, targetEntityId,
 properties) }.toDF()

 The comparison === should work in this case.

 The problem (I think) comes from some null values in the columns that
 are before the column user (e.g. id or event).

 Yanal

 Subject: Re: DataFrame equality does not working in 1.5.1 Date: Fri, 6
 Nov 2015 02:14:18 +0100 From: Seongduk Cheon 
  To: Yin Huai 
  CC: user 
 


 Hi, Yin

 Thanks for your time. This is the result.

 --
 scala> eventDF.filter($"entityType" ===
 "user").select("entityId").distinct.explain(true)
 == Parsed Logical Plan ==
 Aggregate [entityId#16], [entityId#16]
  Project [entityId#16]
   Filter (entityType#15 = user)
Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3
 AS entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6
 AS properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
 tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
 creationTimeZone#25]
 LogicalRDD
 [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12],
 MapPartitionsRDD[6] at rddToDataFrameHolder at :61

 == Analyzed Logical Plan ==
 entityId: string
 Aggregate [entityId#16], [entityId#16]
  Project [entityId#16]
   Filter (entityType#15 = user)
Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3
 AS entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6
 AS properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
 tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
 creationTimeZone#25]
 LogicalRDD
 [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12],
 MapPartitionsRDD[6] at rddToDataFrameHolder at :61

 == Optimized Logical Plan ==
 Aggregate [entityId#16], [entityId#16]
  Project [entityId#16]
   Filter (entityType#15 = user)
InMemoryRelation
 [id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25],
 true, 1, StorageLevel(true, true, false, true, 1), (TungstenProject
 [_1#0 AS id#13,_2#1 AS 

Re: Fwd: Re: DataFrame equality does not working in 1.5.1

2015-11-06 Thread Seongduk Cheon
Hi Yanal!

Yes, exactly. I read from csv file and convert to DF with column names.

simply look like this.

val eventDF = sc.textFile(eventFile).map(_.split(",")).filter(_.size >= 6)
  .map { e =>  // To do sometings
  }.toDF(eventTableColumns:_*).cache()


The result of <=> function is

scala> eventDF.filter($"entityType" <=>
"user").select("entityId").distinct.count
res25: Long = 2091

As you mentioned, It seems related to nullable column.
Using case class works as expected. It is one of the best workaround so far.



2015-11-06 19:01 GMT+09:00 Yanal Wazaefi :

> Hi Sondoku,
>
> Are you converting an event RDD using toDF("id", "event", "entityType", 
> "entityId",
> "targetEntityType", "targetEntityId", "properties") function to get your
> eventDF ?
>
> Does <=> give you the correct result too (2091) ?
> eventDF.filter($"entityType" <=> "user").select("entityId").distinct.count
>
> I had the same problem with the DataFrame equality, using toDF("col1",
> "col2", ...) function.
>
> To resolve this problem (bug?), I used a *case class* and then I applied
> toDF() function.
> Something like that in your case:
>
> case class Event(id: String, event: String, entityType: String, entityId:
> String, targetEntityType: String, targetEntityId: String, properties:
> String)
> eventRDD.map{case (id, event, entityType, entityId, targetEntityType, 
> targetEntityId,
> properties) =>
> Event(id, event, entityType, entityId, targetEntityType, targetEntityId,
> properties) }.toDF()
>
> The comparison === should work in this case.
>
> The problem (I think) comes from some null values in the columns that are
> before the column user (e.g. id or event).
>
> Yanal
>
> Subject: Re: DataFrame equality does not working in 1.5.1 Date: Fri, 6
> Nov 2015 02:14:18 +0100 From: Seongduk Cheon 
>  To: Yin Huai 
>  CC: user 
> 
>
>
> Hi, Yin
>
> Thanks for your time. This is the result.
>
> --
> scala> eventDF.filter($"entityType" ===
> "user").select("entityId").distinct.explain(true)
> == Parsed Logical Plan ==
> Aggregate [entityId#16], [entityId#16]
>  Project [entityId#16]
>   Filter (entityType#15 = user)
>Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
> creationTimeZone#25]
> LogicalRDD
> [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12],
> MapPartitionsRDD[6] at rddToDataFrameHolder at :61
>
> == Analyzed Logical Plan ==
> entityId: string
> Aggregate [entityId#16], [entityId#16]
>  Project [entityId#16]
>   Filter (entityType#15 = user)
>Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
> creationTimeZone#25]
> LogicalRDD
> [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12],
> MapPartitionsRDD[6] at rddToDataFrameHolder at :61
>
> == Optimized Logical Plan ==
> Aggregate [entityId#16], [entityId#16]
>  Project [entityId#16]
>   Filter (entityType#15 = user)
>InMemoryRelation
> [id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25],
> true, 1, StorageLevel(true, true, false, true, 1), (TungstenProject
> [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
> creationTimeZone#25]), None
>
> == Physical Plan ==
> TungstenAggregate(key=[entityId#16], functions=[], output=[entityId#16])
>  TungstenExchange hashpartitioning(entityId#16)
>   TungstenAggregate(key=[entityId#16], functions=[], output=[entityId#16])
>Project [entityId#16]
> Filter (entityType#15 = user)
>  InMemoryColumnarTableScan [entityId#16,entityType#15],
> [(entityType#15 = user)], (InMemoryRelation
> [id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25],
> true, 1, StorageLevel(true, true, false, true, 1), (TungstenProject
> [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
> 

Re: Fwd: Re: DataFrame equality does not working in 1.5.1

2015-11-06 Thread Michael Armbrust
In particular this is sounding like:
https://issues.apache.org/jira/browse/SPARK-10859

On Fri, Nov 6, 2015 at 1:05 PM, Michael Armbrust 
wrote:

> I would be great if you could try sql("SET
> spark.sql.inMemoryColumnarStorage.partitionPruning=false") also, try Spark
> 1.5.2-RC2
> 
>
> On Fri, Nov 6, 2015 at 4:49 AM, Seongduk Cheon  wrote:
>
>> Hi Yanal!
>>
>> Yes, exactly. I read from csv file and convert to DF with column names.
>>
>> simply look like this.
>>
>> val eventDF = sc.textFile(eventFile).map(_.split(",")).filter(_.size >= 6)
>>   .map { e =>  // To do sometings
>>   }.toDF(eventTableColumns:_*).cache()
>>
>>
>> The result of <=> function is
>>
>> scala> eventDF.filter($"entityType" <=>
>> "user").select("entityId").distinct.count
>> res25: Long = 2091
>>
>> As you mentioned, It seems related to nullable column.
>> Using case class works as expected. It is one of the best workaround so
>> far.
>>
>>
>>
>> 2015-11-06 19:01 GMT+09:00 Yanal Wazaefi :
>>
>>> Hi Sondoku,
>>>
>>> Are you converting an event RDD using toDF("id", "event", "entityType",
>>> "entityId", "targetEntityType", "targetEntityId", "properties")
>>> function to get your eventDF ?
>>>
>>> Does <=> give you the correct result too (2091) ?
>>> eventDF.filter($"entityType" <=>
>>> "user").select("entityId").distinct.count
>>>
>>> I had the same problem with the DataFrame equality, using toDF("col1",
>>> "col2", ...) function.
>>>
>>> To resolve this problem (bug?), I used a *case class* and then I
>>> applied toDF() function.
>>> Something like that in your case:
>>>
>>> case class Event(id: String, event: String, entityType: String, entityId:
>>> String, targetEntityType: String, targetEntityId: String, properties:
>>> String)
>>> eventRDD.map{case (id, event, entityType, entityId, targetEntityType, 
>>> targetEntityId,
>>> properties) =>
>>> Event(id, event, entityType, entityId, targetEntityType, targetEntityId,
>>> properties) }.toDF()
>>>
>>> The comparison === should work in this case.
>>>
>>> The problem (I think) comes from some null values in the columns that
>>> are before the column user (e.g. id or event).
>>>
>>> Yanal
>>>
>>> Subject: Re: DataFrame equality does not working in 1.5.1 Date: Fri, 6
>>> Nov 2015 02:14:18 +0100 From: Seongduk Cheon 
>>>  To: Yin Huai 
>>>  CC: user 
>>> 
>>>
>>>
>>> Hi, Yin
>>>
>>> Thanks for your time. This is the result.
>>>
>>> --
>>> scala> eventDF.filter($"entityType" ===
>>> "user").select("entityId").distinct.explain(true)
>>> == Parsed Logical Plan ==
>>> Aggregate [entityId#16], [entityId#16]
>>>  Project [entityId#16]
>>>   Filter (entityType#15 = user)
>>>Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
>>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
>>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
>>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
>>> creationTimeZone#25]
>>> LogicalRDD
>>> [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12],
>>> MapPartitionsRDD[6] at rddToDataFrameHolder at :61
>>>
>>> == Analyzed Logical Plan ==
>>> entityId: string
>>> Aggregate [entityId#16], [entityId#16]
>>>  Project [entityId#16]
>>>   Filter (entityType#15 = user)
>>>Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
>>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
>>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
>>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
>>> creationTimeZone#25]
>>> LogicalRDD
>>> [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12],
>>> MapPartitionsRDD[6] at rddToDataFrameHolder at :61
>>>
>>> == Optimized Logical Plan ==
>>> Aggregate [entityId#16], [entityId#16]
>>>  Project [entityId#16]
>>>   Filter (entityType#15 = user)
>>>InMemoryRelation
>>> [id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25],
>>> true, 1, StorageLevel(true, true, false, true, 1), (TungstenProject
>>> [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
>>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
>>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
>>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
>>> creationTimeZone#25]), None
>>>
>>> == Physical Plan ==
>>> TungstenAggregate(key=[entityId#16], functions=[], output=[entityId#16])
>>>  TungstenExchange hashpartitioning(entityId#16)
>>>   TungstenAggregate(key=[entityId#16], functions=[],
>>> 

Re: Fwd: Re: DataFrame equality does not working in 1.5.1

2015-11-06 Thread Michael Armbrust
I would be great if you could try sql("SET
spark.sql.inMemoryColumnarStorage.partitionPruning=false") also, try Spark
1.5.2-RC2


On Fri, Nov 6, 2015 at 4:49 AM, Seongduk Cheon  wrote:

> Hi Yanal!
>
> Yes, exactly. I read from csv file and convert to DF with column names.
>
> simply look like this.
>
> val eventDF = sc.textFile(eventFile).map(_.split(",")).filter(_.size >= 6)
>   .map { e =>  // To do sometings
>   }.toDF(eventTableColumns:_*).cache()
>
>
> The result of <=> function is
>
> scala> eventDF.filter($"entityType" <=>
> "user").select("entityId").distinct.count
> res25: Long = 2091
>
> As you mentioned, It seems related to nullable column.
> Using case class works as expected. It is one of the best workaround so
> far.
>
>
>
> 2015-11-06 19:01 GMT+09:00 Yanal Wazaefi :
>
>> Hi Sondoku,
>>
>> Are you converting an event RDD using toDF("id", "event", "entityType", 
>> "entityId",
>> "targetEntityType", "targetEntityId", "properties") function to get your
>> eventDF ?
>>
>> Does <=> give you the correct result too (2091) ?
>> eventDF.filter($"entityType" <=>
>> "user").select("entityId").distinct.count
>>
>> I had the same problem with the DataFrame equality, using toDF("col1",
>> "col2", ...) function.
>>
>> To resolve this problem (bug?), I used a *case class* and then I applied
>> toDF() function.
>> Something like that in your case:
>>
>> case class Event(id: String, event: String, entityType: String, entityId:
>> String, targetEntityType: String, targetEntityId: String, properties:
>> String)
>> eventRDD.map{case (id, event, entityType, entityId, targetEntityType, 
>> targetEntityId,
>> properties) =>
>> Event(id, event, entityType, entityId, targetEntityType, targetEntityId,
>> properties) }.toDF()
>>
>> The comparison === should work in this case.
>>
>> The problem (I think) comes from some null values in the columns that are
>> before the column user (e.g. id or event).
>>
>> Yanal
>>
>> Subject: Re: DataFrame equality does not working in 1.5.1 Date: Fri, 6
>> Nov 2015 02:14:18 +0100 From: Seongduk Cheon 
>>  To: Yin Huai 
>>  CC: user 
>> 
>>
>>
>> Hi, Yin
>>
>> Thanks for your time. This is the result.
>>
>> --
>> scala> eventDF.filter($"entityType" ===
>> "user").select("entityId").distinct.explain(true)
>> == Parsed Logical Plan ==
>> Aggregate [entityId#16], [entityId#16]
>>  Project [entityId#16]
>>   Filter (entityType#15 = user)
>>Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
>> creationTimeZone#25]
>> LogicalRDD
>> [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12],
>> MapPartitionsRDD[6] at rddToDataFrameHolder at :61
>>
>> == Analyzed Logical Plan ==
>> entityId: string
>> Aggregate [entityId#16], [entityId#16]
>>  Project [entityId#16]
>>   Filter (entityType#15 = user)
>>Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
>> creationTimeZone#25]
>> LogicalRDD
>> [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12],
>> MapPartitionsRDD[6] at rddToDataFrameHolder at :61
>>
>> == Optimized Logical Plan ==
>> Aggregate [entityId#16], [entityId#16]
>>  Project [entityId#16]
>>   Filter (entityType#15 = user)
>>InMemoryRelation
>> [id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25],
>> true, 1, StorageLevel(true, true, false, true, 1), (TungstenProject
>> [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
>> creationTimeZone#25]), None
>>
>> == Physical Plan ==
>> TungstenAggregate(key=[entityId#16], functions=[], output=[entityId#16])
>>  TungstenExchange hashpartitioning(entityId#16)
>>   TungstenAggregate(key=[entityId#16], functions=[], output=[entityId#16])
>>Project [entityId#16]
>> Filter (entityType#15 = user)
>>  InMemoryColumnarTableScan [entityId#16,entityType#15],
>> [(entityType#15 = user)], (InMemoryRelation
>>