I am working on a modified Spark core and have a Broadcast variable which I deserialize to obtain an RDD along with its set of dependencies, as is done in ShuffleMapTask, as following:
val taskBinary: Broadcast[Array[Byte]]var (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) However, I want to wrap this rdd by a ShuffledRDD because I need to apply a custom partitioner to it ,and I am doing this by: var wrappedRDD = new ShuffledRDD[_ ,_, _](rdd[_ <: Product2[Any, Any]], context.getCustomPartitioner()) but it results in an error: Error:unbound wildcard type rdd = new ShuffledRDD[_ ,_, _ ](rdd[_ <: Product2[Any, Any]], context.getCustomPartitioner()) ..................................^ The problem is that I don't know how to replace these wildcards with any inferred type as I its supposed to be dynamic and I have no idea what would be the inferred type of the original rdd. Any idea how I could resolved this?