[ https://issues.apache.org/jira/browse/SPARK-4817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14242177#comment-14242177 ]
宿荣全 edited comment on SPARK-4817 at 12/11/14 5:57 AM: ------------------------------------------------------ [~srowen] Always call foreachRDD, and operate on all of the RDD, and then call take on the RDD to get a few elements to print. It can achieve the effect, but it is more complicated. for example: ``` 1.val dstream = stream.map->filter->..foreachRDD(rdd => { val array = rdd.collect var result = Array[(String,String)]() result = if (array.size > 5) array.take(5) else array.take(array.size) result foreach println }) 2.val dstream = stream.map->filter->foreachRDD(rdd => { val rddarray = ssc.sparkContext.runJob(rdd, (iter: Iterator[(String, String)]) => iter.toArray) val array = Array.concat(rddarray: _*) var result = Array[(String,String)]() result = if (array.size > 5) array.take(5) else array.take(array.size) result foreach println }) ``` This two samples can achieve the effect. From the design perspective streaming direct manipulation of the RDD is not a good design.and I think the method 'foreachRDD' is generally not used in coding. Generally when streaming register action by through the following 6 methods. Those methods all called method 'foreachRDD'. 1.DStream.foreach 2.DStream.saveAsObjectFiles 3.DStream.saveAsTextFiles 4.PairDStreamFunctions.saveAsHadoopFiles 5.PairDStreamFunctions.saveAsNewAPIHadoopFiles was (Author: surq): [~srowen] Always call foreachRDD, and operate on all of the RDD, and then call take on the RDD to get a few elements to print.It can achieve the effect, but it is more complicated. for example: 1.val dstream = stream.map->filter->..foreachRDD(rdd => { val array = rdd.collect var result = Array[(String,String)]() result = if (array.size > 5) array.take(5) else array.take(array.size) result foreach println }) 2.val dstream = stream.map->filter->foreachRDD(rdd => { val rddarray = ssc.sparkContext.runJob(rdd, (iter: Iterator[(String, String)]) => iter.toArray) val array = Array.concat(rddarray: _*) var result = Array[(String,String)]() result = if (array.size > 5) array.take(5) else array.take(array.size) result foreach println }) this two samples can achieve the effect. From the design perspective streaming direct manipulation of the RDD is not a good design.and I thank the method 'foreachRDD' is generally not used in coding. Generally when streaming register action by through the following 6 methods.Those methods all called method 'foreachRDD'. 1.DStream.foreach 2.DStream.saveAsObjectFiles 3.DStream.saveAsTextFiles 4.PairDStreamFunctions.saveAsHadoopFiles 5.PairDStreamFunctions.saveAsNewAPIHadoopFiles > [streaming]Print the specified number of data and handle all of the elements > in RDD > ----------------------------------------------------------------------------------- > > Key: SPARK-4817 > URL: https://issues.apache.org/jira/browse/SPARK-4817 > Project: Spark > Issue Type: New Feature > Components: Streaming > Reporter: 宿荣全 > Priority: Minor > > Dstream.print function:Print 10 elements and handle 11 elements. > A new function based on Dstream.print function is presented: > the new function: > Print the specified number of data and handle all of the elements in RDD. > there is a work scene: > val dstream = stream.map->filter->mapPartitions->print > the data after filter need update database in mapPartitions,but don't need > print each data,only need to print the top 20 for view the data processing. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org