Re: Fwd: Re: DataFrame equality does not working in 1.5.1

2015-11-06 Thread Seongduk Cheon
Hi, Michael

It works find.

scala> sqlContext.sql("SET
spark.sql.inMemoryColumnarStorage.partitionPruning=false")
res28: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> eventDF.filter($"entityType" ===
"user").select("entityId").distinct.count
res29: Long = 2091

Thank you so much for helping me.

2015-11-07 6:13 GMT+09:00 Michael Armbrust <mich...@databricks.com>:

> In particular this is sounding like:
> https://issues.apache.org/jira/browse/SPARK-10859
>
> On Fri, Nov 6, 2015 at 1:05 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> I would be great if you could try sql("SET
>> spark.sql.inMemoryColumnarStorage.partitionPruning=false") also, try Spark
>> 1.5.2-RC2
>> <http://people.apache.org/~pwendell/spark-releases/spark-1.5.2-rc2-bin/>
>>
>> On Fri, Nov 6, 2015 at 4:49 AM, Seongduk Cheon <s.c...@opt.ne.jp> wrote:
>>
>>> Hi Yanal!
>>>
>>> Yes, exactly. I read from csv file and convert to DF with column names.
>>>
>>> simply look like this.
>>>
>>> val eventDF = sc.textFile(eventFile).map(_.split(",")).filter(_.size >= 6)
>>>   .map { e =>  // To do sometings
>>>   }.toDF(eventTableColumns:_*).cache()
>>>
>>>
>>> The result of <=> function is
>>>
>>> scala> eventDF.filter($"entityType" <=>
>>> "user").select("entityId").distinct.count
>>> res25: Long = 2091
>>>
>>> As you mentioned, It seems related to nullable column.
>>> Using case class works as expected. It is one of the best workaround so
>>> far.
>>>
>>>
>>>
>>> 2015-11-06 19:01 GMT+09:00 Yanal Wazaefi <yanal.waza...@kelkoo.com>:
>>>
>>>> Hi Sondoku,
>>>>
>>>> Are you converting an event RDD using toDF("id", "event", "entityType",
>>>> "entityId", "targetEntityType", "targetEntityId", "properties")
>>>> function to get your eventDF ?
>>>>
>>>> Does <=> give you the correct result too (2091) ?
>>>> eventDF.filter($"entityType" <=>
>>>> "user").select("entityId").distinct.count
>>>>
>>>> I had the same problem with the DataFrame equality, using toDF("col1",
>>>> "col2", ...) function.
>>>>
>>>> To resolve this problem (bug?), I used a *case class* and then I
>>>> applied toDF() function.
>>>> Something like that in your case:
>>>>
>>>> case class Event(id: String, event: String, entityType: String,
>>>> entityId: String, targetEntityType: String, targetEntityId: String,
>>>> properties: String)
>>>> eventRDD.map{case (id, event, entityType, entityId, targetEntityType, 
>>>> targetEntityId,
>>>> properties) =>
>>>> Event(id, event, entityType, entityId, targetEntityType, targetEntityId,
>>>> properties) }.toDF()
>>>>
>>>> The comparison === should work in this case.
>>>>
>>>> The problem (I think) comes from some null values in the columns that
>>>> are before the column user (e.g. id or event).
>>>>
>>>> Yanal
>>>>
>>>> Subject: Re: DataFrame equality does not working in 1.5.1 Date: Fri, 6
>>>> Nov 2015 02:14:18 +0100 From: Seongduk Cheon <s.c...@opt.ne.jp>
>>>> <s.c...@opt.ne.jp> To: Yin Huai <yh...@databricks.com>
>>>> <yh...@databricks.com> CC: user <user@spark.apache.org>
>>>> <user@spark.apache.org>
>>>>
>>>>
>>>> Hi, Yin
>>>>
>>>> Thanks for your time. This is the result.
>>>>
>>>> --
>>>> scala> eventDF.filter($"entityType" ===
>>>> "user").select("entityId").distinct.explain(true)
>>>> == Parsed Logical Plan ==
>>>> Aggregate [entityId#16], [entityId#16]
>>>>  Project [entityId#16]
>>>>   Filter (entityType#15 = user)
>>>>Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3
>>>> AS entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6
>>>> AS properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
>>>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
>

Re: Fwd: Re: DataFrame equality does not working in 1.5.1

2015-11-06 Thread Seongduk Cheon
Hi Yanal!

Yes, exactly. I read from csv file and convert to DF with column names.

simply look like this.

val eventDF = sc.textFile(eventFile).map(_.split(",")).filter(_.size >= 6)
  .map { e =>  // To do sometings
  }.toDF(eventTableColumns:_*).cache()


The result of <=> function is

scala> eventDF.filter($"entityType" <=>
"user").select("entityId").distinct.count
res25: Long = 2091

As you mentioned, It seems related to nullable column.
Using case class works as expected. It is one of the best workaround so far.



2015-11-06 19:01 GMT+09:00 Yanal Wazaefi <yanal.waza...@kelkoo.com>:

> Hi Sondoku,
>
> Are you converting an event RDD using toDF("id", "event", "entityType", 
> "entityId",
> "targetEntityType", "targetEntityId", "properties") function to get your
> eventDF ?
>
> Does <=> give you the correct result too (2091) ?
> eventDF.filter($"entityType" <=> "user").select("entityId").distinct.count
>
> I had the same problem with the DataFrame equality, using toDF("col1",
> "col2", ...) function.
>
> To resolve this problem (bug?), I used a *case class* and then I applied
> toDF() function.
> Something like that in your case:
>
> case class Event(id: String, event: String, entityType: String, entityId:
> String, targetEntityType: String, targetEntityId: String, properties:
> String)
> eventRDD.map{case (id, event, entityType, entityId, targetEntityType, 
> targetEntityId,
> properties) =>
> Event(id, event, entityType, entityId, targetEntityType, targetEntityId,
> properties) }.toDF()
>
> The comparison === should work in this case.
>
> The problem (I think) comes from some null values in the columns that are
> before the column user (e.g. id or event).
>
> Yanal
>
> Subject: Re: DataFrame equality does not working in 1.5.1 Date: Fri, 6
> Nov 2015 02:14:18 +0100 From: Seongduk Cheon <s.c...@opt.ne.jp>
> <s.c...@opt.ne.jp> To: Yin Huai <yh...@databricks.com>
> <yh...@databricks.com> CC: user <user@spark.apache.org>
> <user@spark.apache.org>
>
>
> Hi, Yin
>
> Thanks for your time. This is the result.
>
> --
> scala> eventDF.filter($"entityType" ===
> "user").select("entityId").distinct.explain(true)
> == Parsed Logical Plan ==
> Aggregate [entityId#16], [entityId#16]
>  Project [entityId#16]
>   Filter (entityType#15 = user)
>Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
> creationTimeZone#25]
> LogicalRDD
> [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12],
> MapPartitionsRDD[6] at rddToDataFrameHolder at :61
>
> == Analyzed Logical Plan ==
> entityId: string
> Aggregate [entityId#16], [entityId#16]
>  Project [entityId#16]
>   Filter (entityType#15 = user)
>Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
> creationTimeZone#25]
> LogicalRDD
> [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12],
> MapPartitionsRDD[6] at rddToDataFrameHolder at :61
>
> == Optimized Logical Plan ==
> Aggregate [entityId#16], [entityId#16]
>  Project [entityId#16]
>   Filter (entityType#15 = user)
>InMemoryRelation
> [id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25],
> true, 1, StorageLevel(true, true, false, true, 1), (TungstenProject
> [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
> creationTimeZone#25]), None
>
> == Physical Plan ==
> TungstenAggregate(key=[entityId#16], functions=[], output=[entityId#16])
>  TungstenExchange hashpartitioning(entityId#16)
>   TungstenAggregate(key=[entityId#16], functions=[], output=[entityId#16])
>Project [entityId#16]
> Filter (entityType#15 = user)
>  InMemoryColumnarTableScan [entityId#16,entityType#15],
> [(entityType#

Re: Fwd: Re: DataFrame equality does not working in 1.5.1

2015-11-06 Thread Michael Armbrust
In particular this is sounding like:
https://issues.apache.org/jira/browse/SPARK-10859

On Fri, Nov 6, 2015 at 1:05 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> I would be great if you could try sql("SET
> spark.sql.inMemoryColumnarStorage.partitionPruning=false") also, try Spark
> 1.5.2-RC2
> <http://people.apache.org/~pwendell/spark-releases/spark-1.5.2-rc2-bin/>
>
> On Fri, Nov 6, 2015 at 4:49 AM, Seongduk Cheon <s.c...@opt.ne.jp> wrote:
>
>> Hi Yanal!
>>
>> Yes, exactly. I read from csv file and convert to DF with column names.
>>
>> simply look like this.
>>
>> val eventDF = sc.textFile(eventFile).map(_.split(",")).filter(_.size >= 6)
>>   .map { e =>  // To do sometings
>>   }.toDF(eventTableColumns:_*).cache()
>>
>>
>> The result of <=> function is
>>
>> scala> eventDF.filter($"entityType" <=>
>> "user").select("entityId").distinct.count
>> res25: Long = 2091
>>
>> As you mentioned, It seems related to nullable column.
>> Using case class works as expected. It is one of the best workaround so
>> far.
>>
>>
>>
>> 2015-11-06 19:01 GMT+09:00 Yanal Wazaefi <yanal.waza...@kelkoo.com>:
>>
>>> Hi Sondoku,
>>>
>>> Are you converting an event RDD using toDF("id", "event", "entityType",
>>> "entityId", "targetEntityType", "targetEntityId", "properties")
>>> function to get your eventDF ?
>>>
>>> Does <=> give you the correct result too (2091) ?
>>> eventDF.filter($"entityType" <=>
>>> "user").select("entityId").distinct.count
>>>
>>> I had the same problem with the DataFrame equality, using toDF("col1",
>>> "col2", ...) function.
>>>
>>> To resolve this problem (bug?), I used a *case class* and then I
>>> applied toDF() function.
>>> Something like that in your case:
>>>
>>> case class Event(id: String, event: String, entityType: String, entityId:
>>> String, targetEntityType: String, targetEntityId: String, properties:
>>> String)
>>> eventRDD.map{case (id, event, entityType, entityId, targetEntityType, 
>>> targetEntityId,
>>> properties) =>
>>> Event(id, event, entityType, entityId, targetEntityType, targetEntityId,
>>> properties) }.toDF()
>>>
>>> The comparison === should work in this case.
>>>
>>> The problem (I think) comes from some null values in the columns that
>>> are before the column user (e.g. id or event).
>>>
>>> Yanal
>>>
>>> Subject: Re: DataFrame equality does not working in 1.5.1 Date: Fri, 6
>>> Nov 2015 02:14:18 +0100 From: Seongduk Cheon <s.c...@opt.ne.jp>
>>> <s.c...@opt.ne.jp> To: Yin Huai <yh...@databricks.com>
>>> <yh...@databricks.com> CC: user <user@spark.apache.org>
>>> <user@spark.apache.org>
>>>
>>>
>>> Hi, Yin
>>>
>>> Thanks for your time. This is the result.
>>>
>>> --
>>> scala> eventDF.filter($"entityType" ===
>>> "user").select("entityId").distinct.explain(true)
>>> == Parsed Logical Plan ==
>>> Aggregate [entityId#16], [entityId#16]
>>>  Project [entityId#16]
>>>   Filter (entityType#15 = user)
>>>Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
>>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
>>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
>>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
>>> creationTimeZone#25]
>>> LogicalRDD
>>> [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12],
>>> MapPartitionsRDD[6] at rddToDataFrameHolder at :61
>>>
>>> == Analyzed Logical Plan ==
>>> entityId: string
>>> Aggregate [entityId#16], [entityId#16]
>>>  Project [entityId#16]
>>>   Filter (entityType#15 = user)
>>>Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
>>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
>>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
>>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
>>> creationTimeZone#25]
>>> LogicalRDD
&g

Re: Fwd: Re: DataFrame equality does not working in 1.5.1

2015-11-06 Thread Michael Armbrust
I would be great if you could try sql("SET
spark.sql.inMemoryColumnarStorage.partitionPruning=false") also, try Spark
1.5.2-RC2
<http://people.apache.org/~pwendell/spark-releases/spark-1.5.2-rc2-bin/>

On Fri, Nov 6, 2015 at 4:49 AM, Seongduk Cheon <s.c...@opt.ne.jp> wrote:

> Hi Yanal!
>
> Yes, exactly. I read from csv file and convert to DF with column names.
>
> simply look like this.
>
> val eventDF = sc.textFile(eventFile).map(_.split(",")).filter(_.size >= 6)
>   .map { e =>  // To do sometings
>   }.toDF(eventTableColumns:_*).cache()
>
>
> The result of <=> function is
>
> scala> eventDF.filter($"entityType" <=>
> "user").select("entityId").distinct.count
> res25: Long = 2091
>
> As you mentioned, It seems related to nullable column.
> Using case class works as expected. It is one of the best workaround so
> far.
>
>
>
> 2015-11-06 19:01 GMT+09:00 Yanal Wazaefi <yanal.waza...@kelkoo.com>:
>
>> Hi Sondoku,
>>
>> Are you converting an event RDD using toDF("id", "event", "entityType", 
>> "entityId",
>> "targetEntityType", "targetEntityId", "properties") function to get your
>> eventDF ?
>>
>> Does <=> give you the correct result too (2091) ?
>> eventDF.filter($"entityType" <=>
>> "user").select("entityId").distinct.count
>>
>> I had the same problem with the DataFrame equality, using toDF("col1",
>> "col2", ...) function.
>>
>> To resolve this problem (bug?), I used a *case class* and then I applied
>> toDF() function.
>> Something like that in your case:
>>
>> case class Event(id: String, event: String, entityType: String, entityId:
>> String, targetEntityType: String, targetEntityId: String, properties:
>> String)
>> eventRDD.map{case (id, event, entityType, entityId, targetEntityType, 
>> targetEntityId,
>> properties) =>
>> Event(id, event, entityType, entityId, targetEntityType, targetEntityId,
>> properties) }.toDF()
>>
>> The comparison === should work in this case.
>>
>> The problem (I think) comes from some null values in the columns that are
>> before the column user (e.g. id or event).
>>
>> Yanal
>>
>> Subject: Re: DataFrame equality does not working in 1.5.1 Date: Fri, 6
>> Nov 2015 02:14:18 +0100 From: Seongduk Cheon <s.c...@opt.ne.jp>
>> <s.c...@opt.ne.jp> To: Yin Huai <yh...@databricks.com>
>> <yh...@databricks.com> CC: user <user@spark.apache.org>
>> <user@spark.apache.org>
>>
>>
>> Hi, Yin
>>
>> Thanks for your time. This is the result.
>>
>> --
>> scala> eventDF.filter($"entityType" ===
>> "user").select("entityId").distinct.explain(true)
>> == Parsed Logical Plan ==
>> Aggregate [entityId#16], [entityId#16]
>>  Project [entityId#16]
>>   Filter (entityType#15 = user)
>>Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
>> creationTimeZone#25]
>> LogicalRDD
>> [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12],
>> MapPartitionsRDD[6] at rddToDataFrameHolder at :61
>>
>> == Analyzed Logical Plan ==
>> entityId: string
>> Aggregate [entityId#16], [entityId#16]
>>  Project [entityId#16]
>>   Filter (entityType#15 = user)
>>Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
>> creationTimeZone#25]
>> LogicalRDD
>> [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12],
>> MapPartitionsRDD[6] at rddToDataFrameHolder at :61
>>
>> == Optimized Logical Plan ==
>> Aggregate [entityId#16], [entityId#16]
>>  Project [entityId#16]
>>   Filter (entityType#15 = user)
>>InMemoryRelation
>> [id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25],
>> true, 1, StorageLevel(true, true,

DataFrame equality does not working in 1.5.1

2015-11-05 Thread 千成徳
Hi All,

I have data frame like this.

Equality expression is not working in 1.5.1 but, works as expected in 1.4.0
What is the difference?

scala> eventDF.printSchema()
root
 |-- id: string (nullable = true)
 |-- event: string (nullable = true)
 |-- entityType: string (nullable = true)
 |-- entityId: string (nullable = true)
 |-- targetEntityType: string (nullable = true)
 |-- targetEntityId: string (nullable = true)
 |-- properties: string (nullable = true)

scala> eventDF.groupBy("entityType").agg(countDistinct("entityId")).show
+--++
|entityType|COUNT(DISTINCT entityId)|
+--++
|   ib_user|4751|
|  user|2091|
+--++


- not works ( Bug ? )
scala> eventDF.filter($"entityType" ===
"user").select("entityId").distinct.count
res151: Long = 1219

scala> eventDF.filter(eventDF("entityType") ===
"user").select("entityId").distinct.count
res153: Long = 1219

scala> eventDF.filter($"entityType" equalTo
"user").select("entityId").distinct.count
res149: Long = 1219

- works as expected
scala> eventDF.map{ e => (e.getAs[String]("entityId"),
e.getAs[String]("entityType")) }.filter(x => x._2 ==
"user").map(_._1).distinct.count
res150: Long = 2091

scala> eventDF.filter($"entityType" in
"user").select("entityId").distinct.count
warning: there were 1 deprecation warning(s); re-run with -deprecation for
details
res155: Long = 2091

scala> eventDF.filter($"entityType" !==
"ib_user").select("entityId").distinct.count
res152: Long = 2091


But, All of above code works in 1.4.0

Thanks.


Re: DataFrame equality does not working in 1.5.1

2015-11-05 Thread Yin Huai
Can you attach the result of eventDF.filter($"entityType" ===
"user").select("entityId").distinct.explain(true)?

Thanks,

Yin

On Thu, Nov 5, 2015 at 1:12 AM, 千成徳  wrote:

> Hi All,
>
> I have data frame like this.
>
> Equality expression is not working in 1.5.1 but, works as expected in 1.4.0
> What is the difference?
>
> scala> eventDF.printSchema()
> root
>  |-- id: string (nullable = true)
>  |-- event: string (nullable = true)
>  |-- entityType: string (nullable = true)
>  |-- entityId: string (nullable = true)
>  |-- targetEntityType: string (nullable = true)
>  |-- targetEntityId: string (nullable = true)
>  |-- properties: string (nullable = true)
>
> scala> eventDF.groupBy("entityType").agg(countDistinct("entityId")).show
> +--++
> |entityType|COUNT(DISTINCT entityId)|
> +--++
> |   ib_user|4751|
> |  user|2091|
> +--++
>
>
> - not works ( Bug ? )
> scala> eventDF.filter($"entityType" ===
> "user").select("entityId").distinct.count
> res151: Long = 1219
>
> scala> eventDF.filter(eventDF("entityType") ===
> "user").select("entityId").distinct.count
> res153: Long = 1219
>
> scala> eventDF.filter($"entityType" equalTo
> "user").select("entityId").distinct.count
> res149: Long = 1219
>
> - works as expected
> scala> eventDF.map{ e => (e.getAs[String]("entityId"),
> e.getAs[String]("entityType")) }.filter(x => x._2 ==
> "user").map(_._1).distinct.count
> res150: Long = 2091
>
> scala> eventDF.filter($"entityType" in
> "user").select("entityId").distinct.count
> warning: there were 1 deprecation warning(s); re-run with -deprecation for
> details
> res155: Long = 2091
>
> scala> eventDF.filter($"entityType" !==
> "ib_user").select("entityId").distinct.count
> res152: Long = 2091
>
>
> But, All of above code works in 1.4.0
>
> Thanks.
>
>


Re: DataFrame equality does not working in 1.5.1

2015-11-05 Thread Seongduk Cheon
Hi, Yin

Thanks for your time. This is the result.

--
scala> eventDF.filter($"entityType" ===
"user").select("entityId").distinct.explain(true)
== Parsed Logical Plan ==
Aggregate [entityId#16], [entityId#16]
 Project [entityId#16]
  Filter (entityType#15 = user)
   Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
creationTimeZone#25]
LogicalRDD
[_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12],
MapPartitionsRDD[6] at rddToDataFrameHolder at :61

== Analyzed Logical Plan ==
entityId: string
Aggregate [entityId#16], [entityId#16]
 Project [entityId#16]
  Filter (entityType#15 = user)
   Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
creationTimeZone#25]
LogicalRDD
[_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12],
MapPartitionsRDD[6] at rddToDataFrameHolder at :61

== Optimized Logical Plan ==
Aggregate [entityId#16], [entityId#16]
 Project [entityId#16]
  Filter (entityType#15 = user)
   InMemoryRelation
[id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25],
true, 1, StorageLevel(true, true, false, true, 1), (TungstenProject
[_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
creationTimeZone#25]), None

== Physical Plan ==
TungstenAggregate(key=[entityId#16], functions=[], output=[entityId#16])
 TungstenExchange hashpartitioning(entityId#16)
  TungstenAggregate(key=[entityId#16], functions=[], output=[entityId#16])
   Project [entityId#16]
Filter (entityType#15 = user)
 InMemoryColumnarTableScan [entityId#16,entityType#15], [(entityType#15
= user)], (InMemoryRelation
[id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25],
true, 1, StorageLevel(true, true, false, true, 1), (TungstenProject
[_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
creationTimeZone#25]), None)

Code Generation: true

scala>



2015-11-06 5:27 GMT+09:00 Yin Huai :

> Can you attach the result of eventDF.filter($"entityType" ===
> "user").select("entityId").distinct.explain(true)?
>
> Thanks,
>
> Yin
>
> On Thu, Nov 5, 2015 at 1:12 AM, 千成徳  wrote:
>
>> Hi All,
>>
>> I have data frame like this.
>>
>> Equality expression is not working in 1.5.1 but, works as expected in
>> 1.4.0
>> What is the difference?
>>
>> scala> eventDF.printSchema()
>> root
>>  |-- id: string (nullable = true)
>>  |-- event: string (nullable = true)
>>  |-- entityType: string (nullable = true)
>>  |-- entityId: string (nullable = true)
>>  |-- targetEntityType: string (nullable = true)
>>  |-- targetEntityId: string (nullable = true)
>>  |-- properties: string (nullable = true)
>>
>> scala> eventDF.groupBy("entityType").agg(countDistinct("entityId")).show
>> +--++
>> |entityType|COUNT(DISTINCT entityId)|
>> +--++
>> |   ib_user|4751|
>> |  user|2091|
>> +--++
>>
>>
>> - not works ( Bug ? )
>> scala> eventDF.filter($"entityType" ===
>> "user").select("entityId").distinct.count
>> res151: Long = 1219
>>
>> scala> eventDF.filter(eventDF("entityType") ===
>> "user").select("entityId").distinct.count
>> res153: Long = 1219
>>
>> scala> eventDF.filter($"entityType" equalTo
>> "user").select("entityId").distinct.count
>> res149: Long = 1219
>>
>> - works as expected
>> scala> eventDF.map{ e => (e.getAs[String]("entityId"),
>> e.getAs[String]("entityType")) }.filter(x => x._2 ==
>> "user").map(_._1).distinct.count
>> res150: Long = 2091
>>
>> scala> eventDF.filter($"entityType" in
>> "user").select("entityId").distinct.count
>> warning: there were 1 deprecation warning(s); re-run with -deprecation
>> for details
>> res155: Long = 2091
>>
>> scala> eventDF.filter($"entityType" !==
>> "ib_user").select("entityId").distinct.count
>> res152: Long = 2091
>>
>>
>> But, All of above code works