I started using 
http://spark.apache.org/docs/latest/mllib-frequent-pattern-mining.html#fp-gr
owth in python. It was really easy to get the frequent items set.
Unfortunately associations is not implemented in python.

Here is my python code It works great

rawJsonRDD = jsonToPythonDictionaries(sc, inputURL,
coalesceInputToNumPartions)

idsRDD = (rawJsonRDD
                 # fetch the list of ids, the items are of type int
                 .map(lambda ids : r[Œids'])
                 # make sure ids are unique
                 .map(lambda ids : list(set(ids)))
                )



My Java Code generates java.io.NotSerializableException:
org.apache.spark.sql.types.LongType . It has something to do with the UDF I
wrote to make sure the ids are unique

Any idea what my bug is? I guess instead of data frames I could try to
implement this using RDD¹s I expect I¹ll run into a similar problem

Thanks in advance

Andy

 df.printSchema();


root

 |-- ids: array (nullable = true)

 |    |-- element: long (containsNull = true)

 |-- updated: long (nullable = true)

 |-- userId: long (nullable = true)



6/04/21 16:26:50 Info FrequentItems: expr: UniqIdsUDF(Ids) as uniqueIds



UniqIdsUDF.register(sqlContext);



 DataFrame df2 = df.selectExpr(inputColName, expr);



/**
 * this is based on some test code I wrote that
 * that takes in a list of strings and returns a list of strings
 */
public class UniqIdsUDF implements UDF1<WrappedArray<Long>, Long[]>,
Serializable {

    private static final long serialVersionUID = 1L;

    public static final String udfName = "UniqIdsUDF";



    public static void register(SQLContext ssc) {

        // TODO probably need to be careful about registering multiple times

        UniqIdsUDF udf = new UniqIdsUDF();

        DataType elementType = new LongType();

        DataType returnType = DataTypes.createArrayType(elementType);

        ssc.udf().register(udfName, udf, returnType);

    }



    @Override

    public Long[] call(WrappedArray<Long> idsArg) throws Exception {

        List<Long> ids = JavaConversions.asJavaList(idsArg);

        HashSet<Long> hs = new HashSet<Long>(ids);

        Iterator<Long> it = hs.iterator();

        int size = hs.size();

        Long[] ret = new Long[size];

        for (int i = 0; i < size; i++) {

            ret[i] = it.next();

        }

}



Exception in thread "main" org.apache.spark.SparkException: Task not
serializable

at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scal
a:304)

at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$
clean(ClosureCleaner.scala:294)

at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)

at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)

at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707)

at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:15
0)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:11
1)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)

at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)

at 
org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.s
cala:56)

at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.
scala:132)

at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.
scala:130)

at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:15
0)

at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)

at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:187)

at 
org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165
)

at 
org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scal
a:174)

at 
org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$exec
ute$1$1.apply(DataFrame.scala:1499)

at 
org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$exec
ute$1$1.apply(DataFrame.scala:1499)

at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution
.scala:56)

at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)

at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(Dat
aFrame.scala:1498)

at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataF
rame.scala:1505)

at 
org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375)

at 
org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1374)

at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099)

at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374)

at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456)

at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)

at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350)

at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311)

at com.pws.sparkStreaming.ml.FrequentItems.run(FrequentItems.java:89)

at com.pws.sparkStreaming.ml.FrequentItems.main(FrequentItems.java:39)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62
)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl
.java:43)

at java.lang.reflect.Method.invoke(Method.java:497)

at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$ru
nMain(SparkSubmit.scala:731)

at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)

at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Caused by: java.io.NotSerializableException:
org.apache.spark.sql.types.LongType

Serialization stack:

- object not serializable (class: org.apache.spark.sql.types.LongType,
value: org.apache.spark.sql.types.LongType@67328bcb)

- field (class: 
org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter, name:
elementType, type: class org.apache.spark.sql.types.DataType)

- object (class 
org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter,
ArrayConverter(org.apache.spark.sql.types.LongType@67328bcb))

- field (class: 
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCataly
stConverter$2, name: eta$0$1$1, type: class
org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter)

- object (class 
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCataly
stConverter$2, <function1>)

- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name:
converter, type: interface scala.Function1)

- object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF,
UDF(friendsIds#0))

- field (class: org.apache.spark.sql.catalyst.expressions.Alias, name:
child, type: class org.apache.spark.sql.catalyst.expressions.Expression)

- object (class org.apache.spark.sql.catalyst.expressions.Alias,
UDF(friendsIds#0) AS uniqueIds#3)

- element of array (index: 1)

- array (class [Ljava.lang.Object;, size 2)

- field (class: scala.collection.mutable.ArrayBuffer, name: array, type:
class [Ljava.lang.Object;)

- object (class scala.collection.mutable.ArrayBuffer,
ArrayBuffer(friendsIds#0, UDF(friendsIds#0) AS uniqueIds#3))

- field (class: org.apache.spark.sql.execution.Project, name: projectList,
type: interface scala.collection.Seq)

- object (class org.apache.spark.sql.execution.Project, Project
[friendsIds#0,UDF(friendsIds#0) AS uniqueFriendsIds#3]

+- Scan JSONRelation[friendsIds#0] InputPaths:
file:/Users/a/workSpace/BigPWS/sparkApps/data/part-00000.csv.friendsList.jso
n

)

- field (class: org.apache.spark.sql.execution.ConvertToSafe, name: child,
type: class org.apache.spark.sql.execution.SparkPlan)

- object (class org.apache.spark.sql.execution.ConvertToSafe, ConvertToSafe

+- Project [friendsIds#0,UDF(friendsIds#0) AS uniqueFriendsIds#3]

   +- Scan JSONRelation[friendsIds#0] InputPaths:
file:/Users/a/workSpace/BigPWS/sparkApps/data/part-00000.csv.friendsList.jso
n

)

- field (class: org.apache.spark.sql.execution.ConvertToSafe$$anonfun$2,
name: $outer, type: class org.apache.spark.sql.execution.ConvertToSafe)

- object (class org.apache.spark.sql.execution.ConvertToSafe$$anonfun$2,
<function1>)

at 
org.apache.spark.serializer.SerializationDebugger$.improveException(Serializ
ationDebugger.scala:40)

at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializ
er.scala:47)

at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.
scala:101)

at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scal
a:301)

... 42 more

16/04/21 


Reply via email to