I would be great if you could try sql("SET
spark.sql.inMemoryColumnarStorage.partitionPruning=false") also, try Spark
1.5.2-RC2
<http://people.apache.org/~pwendell/spark-releases/spark-1.5.2-rc2-bin/>

On Fri, Nov 6, 2015 at 4:49 AM, Seongduk Cheon <s.c...@opt.ne.jp> wrote:

> Hi Yanal!
>
> Yes, exactly. I read from csv file and convert to DF with column names.
>
> simply look like this.
>
> val eventDF = sc.textFile(eventFile).map(_.split(",")).filter(_.size >= 6)
>   .map { e => .... // To do sometings
>   }.toDF(eventTableColumns:_*).cache()
>
>
> The result of <=> function is
>
> scala> eventDF.filter($"entityType" <=>
> "user").select("entityId").distinct.count
> res25: Long = 2091
>
> As you mentioned, It seems related to nullable column.
> Using case class works as expected. It is one of the best workaround so
> far.
>
>
>
> 2015-11-06 19:01 GMT+09:00 Yanal Wazaefi <yanal.waza...@kelkoo.com>:
>
>> Hi Sondoku,
>>
>> Are you converting an event RDD using toDF("id", "event", "entityType", 
>> "entityId",
>> "targetEntityType", "targetEntityId", "properties") function to get your
>> eventDF ?
>>
>> Does <=> give you the correct result too (2091) ?
>> eventDF.filter($"entityType" <=>
>> "user").select("entityId").distinct.count
>>
>> I had the same problem with the DataFrame equality, using toDF("col1",
>> "col2", ...) function.
>>
>> To resolve this problem (bug?), I used a *case class* and then I applied
>> toDF() function.
>> Something like that in your case:
>>
>> case class Event(id: String, event: String, entityType: String, entityId:
>> String, targetEntityType: String, targetEntityId: String, properties:
>> String)
>> eventRDD.map{case (id, event, entityType, entityId, targetEntityType, 
>> targetEntityId,
>> properties) =>
>> Event(id, event, entityType, entityId, targetEntityType, targetEntityId,
>> properties) }.toDF()
>>
>> The comparison === should work in this case.
>>
>> The problem (I think) comes from some null values in the columns that are
>> before the column user (e.g. id or event).
>>
>> Yanal
>>
>> Subject: Re: DataFrame equality does not working in 1.5.1 Date: Fri, 6
>> Nov 2015 02:14:18 +0100 From: Seongduk Cheon <s.c...@opt.ne.jp>
>> <s.c...@opt.ne.jp> To: Yin Huai <yh...@databricks.com>
>> <yh...@databricks.com> CC: user <user@spark.apache.org>
>> <user@spark.apache.org>
>>
>>
>> Hi, Yin
>>
>> Thanks for your time. This is the result.
>>
>> ------------------
>> scala> eventDF.filter($"entityType" ===
>> "user").select("entityId").distinct.explain(true)
>> == Parsed Logical Plan ==
>> Aggregate [entityId#16], [entityId#16]
>>  Project [entityId#16]
>>   Filter (entityType#15 = user)
>>    Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
>> creationTimeZone#25]
>>     LogicalRDD
>> [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12],
>> MapPartitionsRDD[6] at rddToDataFrameHolder at <console>:61
>>
>> == Analyzed Logical Plan ==
>> entityId: string
>> Aggregate [entityId#16], [entityId#16]
>>  Project [entityId#16]
>>   Filter (entityType#15 = user)
>>    Project [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
>> creationTimeZone#25]
>>     LogicalRDD
>> [_1#0,_2#1,_3#2,_4#3,_5#4,_6#5,_7#6,_8#7,_9#8,_10#9,_11#10,_12#11,_13#12],
>> MapPartitionsRDD[6] at rddToDataFrameHolder at <console>:61
>>
>> == Optimized Logical Plan ==
>> Aggregate [entityId#16], [entityId#16]
>>  Project [entityId#16]
>>   Filter (entityType#15 = user)
>>    InMemoryRelation
>> [id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25],
>> true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject
>> [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
>> creationTimeZone#25]), None
>>
>> == Physical Plan ==
>> TungstenAggregate(key=[entityId#16], functions=[], output=[entityId#16])
>>  TungstenExchange hashpartitioning(entityId#16)
>>   TungstenAggregate(key=[entityId#16], functions=[], output=[entityId#16])
>>    Project [entityId#16]
>>     Filter (entityType#15 = user)
>>      InMemoryColumnarTableScan [entityId#16,entityType#15],
>> [(entityType#15 = user)], (InMemoryRelation
>> [id#13,event#14,entityType#15,entityId#16,targetEntityType#17,targetEntityId#18,properties#19,eventTime#20,eventTimeZone#21,tags#22,prId#23,creationTime#24,creationTimeZone#25],
>> true, 10000, StorageLevel(true, true, false, true, 1), (TungstenProject
>> [_1#0 AS id#13,_2#1 AS event#14,_3#2 AS entityType#15,_4#3 AS
>> entityId#16,_5#4 AS targetEntityType#17,_6#5 AS targetEntityId#18,_7#6 AS
>> properties#19,_8#7 AS eventTime#20,_9#8 AS eventTimeZone#21,_10#9 AS
>> tags#22,_11#10 AS prId#23,_12#11 AS creationTime#24,_13#12 AS
>> creationTimeZone#25]), None)
>>
>> Code Generation: true
>>
>> scala>
>>
>>
>>
>> 2015-11-06 5:27 GMT+09:00 Yin Huai <yh...@databricks.com>:
>>
>>> Can you attach the result of eventDF.filter($"entityType" ===
>>> "user").select("entityId").distinct.explain(true)?
>>>
>>> Thanks,
>>>
>>> Yin
>>>
>>> On Thu, Nov 5, 2015 at 1:12 AM, 千成徳 <s.c...@opt.ne.jp> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I have data frame like this.
>>>>
>>>> Equality expression is not working in 1.5.1 but, works as expected in
>>>> 1.4.0
>>>> What is the difference?
>>>>
>>>> scala> eventDF.printSchema()
>>>> root
>>>>  |-- id: string (nullable = true)
>>>>  |-- event: string (nullable = true)
>>>>  |-- entityType: string (nullable = true)
>>>>  |-- entityId: string (nullable = true)
>>>>  |-- targetEntityType: string (nullable = true)
>>>>  |-- targetEntityId: string (nullable = true)
>>>>  |-- properties: string (nullable = true)
>>>>
>>>> scala> eventDF.groupBy("entityType").agg(countDistinct("entityId")).show
>>>> +----------+------------------------+
>>>> |entityType|COUNT(DISTINCT entityId)|
>>>> +----------+------------------------+
>>>> |   ib_user|                    4751|
>>>> |      user|                    2091|
>>>> +----------+------------------------+
>>>>
>>>>
>>>> ----- not works ( Bug ? )
>>>> scala> eventDF.filter($"entityType" ===
>>>> "user").select("entityId").distinct.count
>>>> res151: Long = 1219
>>>>
>>>> scala> eventDF.filter(eventDF("entityType") ===
>>>> "user").select("entityId").distinct.count
>>>> res153: Long = 1219
>>>>
>>>> scala> eventDF.filter($"entityType" equalTo
>>>> "user").select("entityId").distinct.count
>>>> res149: Long = 1219
>>>>
>>>> ----- works as expected
>>>> scala> eventDF.map{ e => (e.getAs[String]("entityId"),
>>>> e.getAs[String]("entityType")) }.filter(x => x._2 ==
>>>> "user").map(_._1).distinct.count
>>>> res150: Long = 2091
>>>>
>>>> scala> eventDF.filter($"entityType" in
>>>> "user").select("entityId").distinct.count
>>>> warning: there were 1 deprecation warning(s); re-run with -deprecation
>>>> for details
>>>> res155: Long = 2091
>>>>
>>>> scala> eventDF.filter($"entityType" !==
>>>> "ib_user").select("entityId").distinct.count
>>>> res152: Long = 2091
>>>>
>>>>
>>>> But, All of above code works in 1.4.0
>>>>
>>>> Thanks.
>>>>
>>>>
>>>
>>
>>
>> --
>> -------------------------------------------------------
>> 千 成徳 (Sondoku Chon)
>>
>> 株 式 会社オプトホールディング http://www.opt.ne.jp/holding/
>> データサイエンスラボ https://datasciencelab.jp/
>> ビッ クデータアーキテクト
>>
>> 〒 102-0081 東京都千代田区四番町6東急番町ビル
>> Tel:080-4581-9708
>> Fax:050-3156-7346
>> -------------------------------------------------------
>>
>>
>>
>>
>> ------------------------------
>> Kelkoo SAS
>> Société par Actions Simplifiée
>> Au capital de € 4.168.964,30
>> Siège social : 158 Ter Rue du Temple 75003 Paris
>> 425 093 069 RCS Paris
>>
>> Ce message et les pièces jointes sont confidentiels et établis à
>> l'attention exclusive de leurs destinataires. Si vous n'êtes pas le
>> destinataire de ce message, merci de le détruire et d'en avertir
>> l'expéditeur.
>>
>
>
>
> --
> -------------------------------------------------------
> 千 成徳 (Sondoku Chon)
>
> 株式会社オプトホールディング http://www.opt.ne.jp/holding/
> データサイエンスラボ https://datasciencelab.jp/
> ビックデータアーキテクト
>
> 〒102-0081 東京都千代田区四番町6東急番町ビル
> Tel:080-4581-9708
> Fax:050-3156-7346
> -------------------------------------------------------
>

Reply via email to