Thanks, That is what I am missing. I have added cache before action, and
that 2nd processing is avoided.

2016-09-10 5:10 GMT-07:00 Cody Koeninger <c...@koeninger.org>:

> Hard to say without seeing the code, but if you do multiple actions on an
> Rdd without caching, the Rdd will be computed multiple times.
>
> On Sep 10, 2016 2:43 AM, "Cheng Yi" <phillipchen...@gmail.com> wrote:
>
> After some investigation, the problem i see is liked caused by a filter and
> union of the dstream.
> if i just do kafka-stream -- process -- output operator, then there is no
> problem, one event will be fetched once.
> if i do
> kafka-stream -- process(1) - filter a stream A for later union --|
>                                        |_ filter a stream B  -- process(2)
> -----|_ A union B output process (3)
> the event will be fetched 2 times, duplicate message start process at the
> end of process(1), see following traces:
>
> 16/09/10 00:11:00 INFO CachedKafkaConsumer: Initial fetch for
> spark-executor-testgid log-analysis-topic 2 1 *(fetch EVENT 1st time)*
>
> 16/09/10 00:11:00 INFO AbstractCoordinator: Discovered coordinator
> 192.168.2.6:9092 (id: 2147483647 rack: null) for group
> spark-executor-testgid.
>
> log of processing (1) for event 1
>
> 16/09/10 00:11:03 INFO Executor: Finished task 0.0 in stage 9.0 (TID 36).
> 1401 bytes result sent to driver
>
> 16/09/10 00:11:03 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID
> 36) in 3494 ms on localhost (3/3)
>
> 16/09/10 00:11:03 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks
> have all completed, from pool
>
> 16/09/10 00:11:03 INFO DAGScheduler: ShuffleMapStage 9 (flatMapToPair
> (*processing (1)*) at SparkAppDriver.java:136) finished in 3.506 s
>
> 16/09/10 00:11:03 INFO DAGScheduler: looking for newly runnable stages
>
> 16/09/10 00:11:03 INFO DAGScheduler: running: Set()
>
> 16/09/10 00:11:03 INFO DAGScheduler: waiting: Set(ShuffleMapStage 10,
> ResultStage 11)
>
> 16/09/10 00:11:03 INFO DAGScheduler: failed: Set()
>
> 16/09/10 00:11:03 INFO DAGScheduler: Submitting ShuffleMapStage 10
> (UnionRDD[41] at union (*process (3)*) at SparkAppDriver.java:155), which
> has no missing parents
>
> 16/09/10 00:11:03 INFO DAGScheduler: Submitting 6 missing tasks from
> ShuffleMapStage 10 (UnionRDD[41] at union at SparkAppDriver.java:155)
>
> 16/09/10 00:11:03 INFO KafkaRDD: Computing topic log-analysis-topic,
> partition 2 offsets 1 -> 2
>
> 16/09/10 00:11:03 INFO CachedKafkaConsumer: Initial fetch for
> spark-executor-testgid log-analysis-topic 2 1 ( *(fetch the same EVENT 2nd
> time)*)
>
> 16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
> 1473491460000 ms.0 from job set of time 1473491460000 ms
>
> 16/09/10 00:11:10 INFO JobScheduler: Total delay: 10.920 s for time
> 1473491460000 ms (execution: 10.874 s)* (EVENT 1st time process cost 10.874
> s)*
>
> 16/09/10 00:11:10 INFO JobScheduler: Finished job streaming job
> 1473491465000 ms.0 from job set of time 1473491465000 ms
>
> 16/09/10 00:11:10 INFO JobScheduler: Total delay: 5.986 s for time
> 1473491465000 ms (execution: 0.066 s) *(EVENT 2nd time process cost 0.066)*
>
> and the 2nd time processing of the event finished without really doing the
> work.
>
> Help is hugely appreciated.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/spark-streaming-kafka-connector-questi
> ons-tp27681p27687.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>

Reply via email to