Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/2562#discussion_r18129623 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala --- @@ -602,6 +602,18 @@ abstract class DStream[T: ClassTag] ( } /** + * Return a new UpdateDStream in which each RDD is used to update the original rdd by + * applying a function on each RDD of 'this' DStream. + */ + def updateRDD[U: ClassTag, V: ClassTag]( + rdd: RDD[V], + updateFunc: (Option[RDD[T]], RDD[V]) => RDD[U] + ): DStream[T] = { + val cleanF = ssc.sparkContext.clean(updateFunc) + new UpdateDStream[U, T, V](this, cleanF, rdd).register() --- End diff -- Hi @uncleGen , I'm not sure why do you need to register this DStream? looks like your `updateRDD` operator is a transformation DStream, not a action DStream, I don't think you need to call register. It's only for output DStream like `ForEachDStream`.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org