I started using http://spark.apache.org/docs/latest/mllib-frequent-pattern-mining.html#fp-gr owth in python. It was really easy to get the frequent items set. Unfortunately associations is not implemented in python.
Here is my python code It works great rawJsonRDD = jsonToPythonDictionaries(sc, inputURL, coalesceInputToNumPartions) idsRDD = (rawJsonRDD # fetch the list of ids, the items are of type int .map(lambda ids : r[ids']) # make sure ids are unique .map(lambda ids : list(set(ids))) ) My Java Code generates java.io.NotSerializableException: org.apache.spark.sql.types.LongType . It has something to do with the UDF I wrote to make sure the ids are unique Any idea what my bug is? I guess instead of data frames I could try to implement this using RDD¹s I expect I¹ll run into a similar problem Thanks in advance Andy df.printSchema(); root |-- ids: array (nullable = true) | |-- element: long (containsNull = true) |-- updated: long (nullable = true) |-- userId: long (nullable = true) 6/04/21 16:26:50 Info FrequentItems: expr: UniqIdsUDF(Ids) as uniqueIds UniqIdsUDF.register(sqlContext); DataFrame df2 = df.selectExpr(inputColName, expr); /** * this is based on some test code I wrote that * that takes in a list of strings and returns a list of strings */ public class UniqIdsUDF implements UDF1<WrappedArray<Long>, Long[]>, Serializable { private static final long serialVersionUID = 1L; public static final String udfName = "UniqIdsUDF"; public static void register(SQLContext ssc) { // TODO probably need to be careful about registering multiple times UniqIdsUDF udf = new UniqIdsUDF(); DataType elementType = new LongType(); DataType returnType = DataTypes.createArrayType(elementType); ssc.udf().register(udfName, udf, returnType); } @Override public Long[] call(WrappedArray<Long> idsArg) throws Exception { List<Long> ids = JavaConversions.asJavaList(idsArg); HashSet<Long> hs = new HashSet<Long>(ids); Iterator<Long> it = hs.iterator(); int size = hs.size(); Long[] ret = new Long[size]; for (int i = 0; i < size; i++) { ret[i] = it.next(); } } Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scal a:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$ clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:15 0) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:11 1) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706) at org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.s cala:56) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan. scala:132) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan. scala:130) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:15 0) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:187) at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165 ) at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scal a:174) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$exec ute$1$1.apply(DataFrame.scala:1499) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$exec ute$1$1.apply(DataFrame.scala:1499) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution .scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(Dat aFrame.scala:1498) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataF rame.scala:1505) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1374) at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311) at com.pws.sparkStreaming.ml.FrequentItems.run(FrequentItems.java:89) at com.pws.sparkStreaming.ml.FrequentItems.main(FrequentItems.java:39) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62 ) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl .java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$ru nMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.spark.sql.types.LongType Serialization stack: - object not serializable (class: org.apache.spark.sql.types.LongType, value: org.apache.spark.sql.types.LongType@67328bcb) - field (class: org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter, name: elementType, type: class org.apache.spark.sql.types.DataType) - object (class org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter, ArrayConverter(org.apache.spark.sql.types.LongType@67328bcb)) - field (class: org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCataly stConverter$2, name: eta$0$1$1, type: class org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter) - object (class org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCataly stConverter$2, <function1>) - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, name: converter, type: interface scala.Function1) - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, UDF(friendsIds#0)) - field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, type: class org.apache.spark.sql.catalyst.expressions.Expression) - object (class org.apache.spark.sql.catalyst.expressions.Alias, UDF(friendsIds#0) AS uniqueIds#3) - element of array (index: 1) - array (class [Ljava.lang.Object;, size 2) - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;) - object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(friendsIds#0, UDF(friendsIds#0) AS uniqueIds#3)) - field (class: org.apache.spark.sql.execution.Project, name: projectList, type: interface scala.collection.Seq) - object (class org.apache.spark.sql.execution.Project, Project [friendsIds#0,UDF(friendsIds#0) AS uniqueFriendsIds#3] +- Scan JSONRelation[friendsIds#0] InputPaths: file:/Users/a/workSpace/BigPWS/sparkApps/data/part-00000.csv.friendsList.jso n ) - field (class: org.apache.spark.sql.execution.ConvertToSafe, name: child, type: class org.apache.spark.sql.execution.SparkPlan) - object (class org.apache.spark.sql.execution.ConvertToSafe, ConvertToSafe +- Project [friendsIds#0,UDF(friendsIds#0) AS uniqueFriendsIds#3] +- Scan JSONRelation[friendsIds#0] InputPaths: file:/Users/a/workSpace/BigPWS/sparkApps/data/part-00000.csv.friendsList.jso n ) - field (class: org.apache.spark.sql.execution.ConvertToSafe$$anonfun$2, name: $outer, type: class org.apache.spark.sql.execution.ConvertToSafe) - object (class org.apache.spark.sql.execution.ConvertToSafe$$anonfun$2, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(Serializ ationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializ er.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer. scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scal a:301) ... 42 more 16/04/21