The WakeupException is being logged and not thrown (it is OK since the reader was closed due to end-of-microbatch), so I wonder what causes "ERROR StreamingListenerBus: StreamingListenerBus has already stopped".
Are you running in local-mode ("local[*]") ? or over YARN ? Any specific options you're using ? Would you mind trying the Beam Snapshot ? 0.5.0-SNAPSHOT Amit. On Fri, Jan 20, 2017 at 9:20 PM Xu Mingmin <mmxu1...@gmail.com> wrote: > Hello all, > > I'm working on a streaming POC project, which is written with Beam API, > and run on both FlinkRunner and SparkRunner. It works good on Flink, > however I cannot run it on SparkRunner. > > Currently I run it locally, and get this exception: > > 17/01/20 11:13:49 WARN KafkaIO: Reader-0: exception while fetching latest > offsets. ignored. > org.apache.kafka.common.errors.WakeupException > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:404) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:245) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:148) > at > org.apache.kafka.clients.consumer.internals.Fetcher.getOffsetsByTimes(Fetcher.java:374) > at > org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:341) > at > org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetsIfNeeded(Fetcher.java:197) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1524) > at > org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1242) > at > com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader.updateLatestOffsets(KafkaIO.java:1059) > at > com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader.access$3(KafkaIO.java:1055) > at > com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader$3.run(KafkaIO.java:966) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > 17/01/20 11:13:50 ERROR StreamingListenerBus: StreamingListenerBus has > already stopped! Dropping event > StreamingListenerOutputOperationCompleted(OutputOperationInfo(1484939618000 > ms,0,foreachRDD at > UnboundedDataset.java:102,org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.foreachRDD(JavaDStreamLike.scala:42) > > org.apache.beam.runners.spark.translation.streaming.UnboundedDataset.action(UnboundedDataset.java:102) > > org.apache.beam.runners.spark.translation.EvaluationContext.computeOutputs(EvaluationContext.java:164) > > org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.create(SparkRunnerStreamingContextFactory.java:78) > > org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:706) > > org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:705) > scala.Option.getOrElse(Option.scala:120) > > org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:864) > > org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:705) > > org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala) > org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:159) > org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:81) > org.apache.beam.sdk.Pipeline.run(Pipeline.java:176) > > com.ebay.dss.beam.spark.streaming.KafkaInKafkaOut.main(KafkaInKafkaOut.java:132),Some(1484939629500),Some(1484939630152),None)) > > > Here's my code: > > SparkPipelineOptions options = > PipelineOptionsFactory.fromArgs(args).withValidation() > .as(SparkPipelineOptions.class);// PipelineOptionsFactory.create(); > > options.setRunner(SparkRunner.class); > options.setStreaming(true); > > Pipeline pipeline = Pipeline.create(options); > > PCollection<String> inStream = pipeline.apply("source", > > KafkaIO.read().withBootstrapServers(KAFKA_BROKER_IN).withTopics(Arrays.asList(KAFKA_IN_TOPIC)) > .withKeyCoder(ByteArrayCoder.of()) > .withValueCoder(StringUtf8Coder.of()) > // .withMaxNumRecords(5) > .withoutMetadata() > )... > > > Environment: > > Apache-Beam : 0.4.0 > Kafka: 0.9/0.10 (test both) > Spark: 1.6.3 > > > I can run it by adding withMaxNumRecords(), however it's batch-onetime > then. > > Any suggestion? > > Thanks! > Mingmin >