it's running in local-mode for test now. I tried with 0.5.0-SNAPSHOT, with the same error:
> 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already > stopped! Dropping event > SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId(driver, > localhost, 59305),rdd_30_0,StorageLevel(false, true, false, true, > 1),11616,0,0)) > 17/01/20 12:01:30 INFO BlockManagerInfo: Added rdd_31_0 in memory on > localhost:59305 (size: 1340.0 B, free: 1124.5 MB) > 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already > stopped! Dropping event > SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId(driver, > localhost, 59305),rdd_31_0,StorageLevel(false, true, false, false, > 1),1340,0,0)) > 17/01/20 12:01:30 INFO Executor: Finished task 0.0 in stage 15.0 (TID 19). > 2956 bytes result sent to driver > 17/01/20 12:01:30 INFO DAGScheduler: ResultStage 15 (DStream at > SparkUnboundedSource.java:154) failed in 0.564 s > 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already > stopped! Dropping event > SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@43f0ada9) > 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already > stopped! Dropping event > SparkListenerJobEnd(3,1484942490827,JobFailed(org.apache.spark.SparkException: > Job 3 cancelled because SparkContext was shut down)) > Btw, is there a runnable example for Spark streaming so I can refer to? Thanks! Mingmin On Fri, Jan 20, 2017 at 11:45 AM, Amit Sela <amitsel...@gmail.com> wrote: > The WakeupException is being logged and not thrown (it is OK since the > reader was closed due to end-of-microbatch), so I wonder what causes "ERROR > StreamingListenerBus: StreamingListenerBus has already stopped". > > Are you running in local-mode ("local[*]") ? or over YARN ? > Any specific options you're using ? > Would you mind trying the Beam Snapshot ? 0.5.0-SNAPSHOT > > Amit. > > On Fri, Jan 20, 2017 at 9:20 PM Xu Mingmin <mmxu1...@gmail.com> wrote: > >> Hello all, >> >> I'm working on a streaming POC project, which is written with Beam API, >> and run on both FlinkRunner and SparkRunner. It works good on Flink, >> however I cannot run it on SparkRunner. >> >> Currently I run it locally, and get this exception: >> >> 17/01/20 11:13:49 WARN KafkaIO: Reader-0: exception while fetching latest >> offsets. ignored. >> org.apache.kafka.common.errors.WakeupException >> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient. >> maybeTriggerWakeup(ConsumerNetworkClient.java:404) >> at org.apache.kafka.clients.consumer.internals. >> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:245) >> at org.apache.kafka.clients.consumer.internals. >> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209) >> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient. >> awaitMetadataUpdate(ConsumerNetworkClient.java:148) >> at org.apache.kafka.clients.consumer.internals.Fetcher. >> getOffsetsByTimes(Fetcher.java:374) >> at org.apache.kafka.clients.consumer.internals.Fetcher. >> resetOffset(Fetcher.java:341) >> at org.apache.kafka.clients.consumer.internals.Fetcher. >> resetOffsetsIfNeeded(Fetcher.java:197) >> at org.apache.kafka.clients.consumer.KafkaConsumer. >> updateFetchPositions(KafkaConsumer.java:1524) >> at org.apache.kafka.clients.consumer.KafkaConsumer. >> position(KafkaConsumer.java:1242) >> at com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader. >> updateLatestOffsets(KafkaIO.java:1059) >> at com.ebay.dss.beam.common.kafka.KafkaIO$ >> UnboundedKafkaReader.access$3(KafkaIO.java:1055) >> at com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader$3.run( >> KafkaIO.java:966) >> at java.util.concurrent.Executors$RunnableAdapter. >> call(Executors.java:511) >> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) >> at java.util.concurrent.ScheduledThreadPoolExecutor$ >> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) >> at java.util.concurrent.ScheduledThreadPoolExecutor$ >> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) >> at java.util.concurrent.ThreadPoolExecutor.runWorker( >> ThreadPoolExecutor.java:1142) >> at java.util.concurrent.ThreadPoolExecutor$Worker.run( >> ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> 17/01/20 11:13:50 ERROR StreamingListenerBus: StreamingListenerBus has >> already stopped! Dropping event StreamingListenerOutputOperati >> onCompleted(OutputOperationInfo(1484939618000 ms,0,foreachRDD at >> UnboundedDataset.java:102,org.apache.spark.streaming.api. >> java.AbstractJavaDStreamLike.foreachRDD(JavaDStreamLike.scala:42) >> org.apache.beam.runners.spark.translation.streaming. >> UnboundedDataset.action(UnboundedDataset.java:102) >> org.apache.beam.runners.spark.translation.EvaluationContext. >> computeOutputs(EvaluationContext.java:164) >> org.apache.beam.runners.spark.translation.streaming. >> SparkRunnerStreamingContextFactory.create(SparkRunnerStreamingContextFac >> tory.java:78) >> org.apache.spark.streaming.api.java.JavaStreamingContext$ >> $anonfun$7.apply(JavaStreamingContext.scala:706) >> org.apache.spark.streaming.api.java.JavaStreamingContext$ >> $anonfun$7.apply(JavaStreamingContext.scala:705) >> scala.Option.getOrElse(Option.scala:120) >> org.apache.spark.streaming.StreamingContext$.getOrCreate( >> StreamingContext.scala:864) >> org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate( >> JavaStreamingContext.scala:705) >> org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate( >> JavaStreamingContext.scala) >> org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:159) >> org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:81) >> org.apache.beam.sdk.Pipeline.run(Pipeline.java:176) >> com.ebay.dss.beam.spark.streaming.KafkaInKafkaOut. >> main(KafkaInKafkaOut.java:132),Some(1484939629500),Some( >> 1484939630152),None)) >> >> >> Here's my code: >> >> SparkPipelineOptions options = PipelineOptionsFactory. >> fromArgs(args).withValidation() >> .as(SparkPipelineOptions.class);// PipelineOptionsFactory.create( >> ); >> >> options.setRunner(SparkRunner.class); >> options.setStreaming(true); >> >> Pipeline pipeline = Pipeline.create(options); >> >> PCollection<String> inStream = pipeline.apply("source", >> KafkaIO.read().withBootstrapServers(KAFKA_ >> BROKER_IN).withTopics(Arrays.asList(KAFKA_IN_TOPIC)) >> .withKeyCoder(ByteArrayCoder.of()) >> .withValueCoder(StringUtf8Coder.of()) >> // .withMaxNumRecords(5) >> .withoutMetadata() >> )... >> >> >> Environment: >> >> Apache-Beam : 0.4.0 >> Kafka: 0.9/0.10 (test both) >> Spark: 1.6.3 >> >> >> I can run it by adding withMaxNumRecords(), however it's batch-onetime >> then. >> >> Any suggestion? >> >> Thanks! >> Mingmin >> >