It seems to be because of this issues:
https://issues.apache.org/jira/browse/SPARK-10925

I added a checkpoint, as suggested, to break the lineage and it worked.

Best regards,

2017-07-04 17:26 GMT+02:00 Bernard Jesop <bernard.je...@gmail.com>:

> Thank Didac,
>
> My bad, actually this code is incomplete, it should have been : - dfAgg =
> df.groupBy("S_ID").agg(...).
>
> I want to access the aggregated values (of dfAgg) for each row of 'df',
> that is why I do a left outer join.
>
>
> Also, regarding the second parameter, I am using this signature of join :
> (Dataset, Seq[String], String) => Dataset.
>
> It is a sequence of the column names to use as keys for the join, some
> kind of syntactic sugar for (df1("key1") === df2("key1") && df1("key2") ===
> df2("key2") && ....),
> except it will not duplicate the columns used as keys.
>
> 2017-07-03 12:39 GMT+02:00 Didac Gil <didacgil9...@gmail.com>:
>
>> With the left join, you are joining two tables.
>>
>> In your case, df is the left table, dfAgg is the right table.
>> The second parameter should be the joining condition, right?
>> For instance
>>
>>  dfRes = df.join(dfAgg, $”userName”===$”name", "left_outer”)
>>
>> having a field in df called userName, and another in dfAgg called “name”
>>
>> However, what’s the kind of query you want to make? dfAgg is already the
>> df table that has been grouped by S_ID.
>>
>> I guess that you are looking for something more like the following example
>> dfAgg = df.groupBy("S_ID”)
>>    .agg(org.apache.spark.sql.functions.count(*“userName"*).as(
>> *“usersCount**”*),
>>  .agg(org.apache.spark.sql.functions.collect_set(“city")
>> .as("ListofCities”)),
>>  .agg(*org.apache.spark.sql.functions.max(“age").as(“oldest”))*
>> )
>>
>> On 3 Jul 2017, at 11:55, Bernard Jesop <bernard.je...@gmail.com> wrote:
>>
>> Hello, I don't understand my error message.
>>
>> Basically, all I am doing is :
>> - dfAgg = df.groupBy("S_ID")
>> - dfRes = df.join(dfAgg, Seq("S_ID"), "left_outer")
>>
>> However I get this AnalysisException: "
>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>> resolved attribute(s) S_ID#1903L missing from Dummy_ID#740,sex#37L,PERSONAL_
>> STATUS#726L,W_DEP_CODE#736,W_SIZE#739L,
>> POSTAL_CODE#735,COUNTRY_CODE#730,
>> ID#724L,Dummy_ID_1#741,DEP_CODE#729,HOUSEHOLD_TYPE#733L,
>> HOUSEHOLD_SIZE#734L,AGE#727L,W_ID#738L,H_ID#732L,AGE_TYPE#728,
>> S_ID#57L,NATIONALITY#731
>> in operator !Project [ID#724L, sex#37L, PERSON\
>>  AL_STATUS#726L, AGE#727L, AGE_TYPE#728, DEP_CODE#729, COUNTRY_CODE#730,
>> NATIONALITY#731 AS Nationality#77, H_ID#732L, HOUSEHOLD_TYPE#733L,
>> HOUSEHOLD_SIZE#734L, POSTAL_CODE#735, W_DEP_CODE#736, S_ID#1903L,
>> W_ID#738L, W_SIZE#739L, Dummy_ID#740, Dummy_ID_1#741];; "
>>
>> What I don't understand is it says S_ID#1903L is missing
>> but everything seems fine on the Logical Plan.
>> +- Join LeftOuter, (S_ID#57L = S_ID#1903L)
>>
>>    :- Project [W_ID#14L, H_ID#8L, ID#0L, sex#37L, category#97L, AGE#3L,
>> AGE_TYPE#4, DEP_CODE#5, COUNTRY_CODE#6, Nationality#77, HOUSEHOLD_TYPE#9L,
>> familySize#117L, POSTAL_CODE#11, W_DEP_CODE#12, S_ID#57\
>>  L, workplaceSize#137L, Dummy_ID#16, Dummy_ID_1#17, Inc_period#157,
>> Time_inf#1064, Time_inc#200, Health#1014, Inf_period#1039,
>> infectedFamily#1355L, infectedWorker#1385L]
>>
>>     +- Aggregate [S_ID#1903L], [S_ID#1903L, count(1) AS
>> infectedStreet#1415L]
>>
>> Does someone have a clue about it?
>> Thanks,
>>
>>
>>
>>
>> Didac Gil de la Iglesia
>> PhD in Computer Science
>> didacg...@gmail.com
>> Spain:     +34 696 285 544 <+34%20696%2028%2055%2044>
>> Sweden: +46 (0)730229737 <+46%2073%20022%2097%2037>
>> Skype: didac.gil.de.la.iglesia
>>
>>
>

Reply via email to