The WakeupException is being logged and not thrown (it is OK since the
reader was closed due to end-of-microbatch), so I wonder what causes "ERROR
StreamingListenerBus: StreamingListenerBus has already stopped".

Are you running in local-mode ("local[*]") ? or over YARN ?
Any specific options you're using ?
Would you mind trying the Beam Snapshot ? 0.5.0-SNAPSHOT

Amit.

On Fri, Jan 20, 2017 at 9:20 PM Xu Mingmin <mmxu1...@gmail.com> wrote:

> Hello all,
>
> I'm working on a streaming POC project, which is written with Beam API,
> and run on both FlinkRunner and SparkRunner. It works good on Flink,
> however I cannot run it on SparkRunner.
>
> Currently I run it locally, and get this exception:
>
> 17/01/20 11:13:49 WARN KafkaIO: Reader-0: exception while fetching latest
> offsets. ignored.
> org.apache.kafka.common.errors.WakeupException
>     at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:404)
>     at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:245)
>     at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209)
>     at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:148)
>     at
> org.apache.kafka.clients.consumer.internals.Fetcher.getOffsetsByTimes(Fetcher.java:374)
>     at
> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:341)
>     at
> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetsIfNeeded(Fetcher.java:197)
>     at
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1524)
>     at
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1242)
>     at
> com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader.updateLatestOffsets(KafkaIO.java:1059)
>     at
> com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader.access$3(KafkaIO.java:1055)
>     at
> com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader$3.run(KafkaIO.java:966)
>     at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
>
> 17/01/20 11:13:50 ERROR StreamingListenerBus: StreamingListenerBus has
> already stopped! Dropping event
> StreamingListenerOutputOperationCompleted(OutputOperationInfo(1484939618000
> ms,0,foreachRDD at
> UnboundedDataset.java:102,org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.foreachRDD(JavaDStreamLike.scala:42)
>
> org.apache.beam.runners.spark.translation.streaming.UnboundedDataset.action(UnboundedDataset.java:102)
>
> org.apache.beam.runners.spark.translation.EvaluationContext.computeOutputs(EvaluationContext.java:164)
>
> org.apache.beam.runners.spark.translation.streaming.SparkRunnerStreamingContextFactory.create(SparkRunnerStreamingContextFactory.java:78)
>
> org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:706)
>
> org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(JavaStreamingContext.scala:705)
> scala.Option.getOrElse(Option.scala:120)
>
> org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:864)
>
> org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(JavaStreamingContext.scala:705)
>
> org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(JavaStreamingContext.scala)
> org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:159)
> org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:81)
> org.apache.beam.sdk.Pipeline.run(Pipeline.java:176)
>
> com.ebay.dss.beam.spark.streaming.KafkaInKafkaOut.main(KafkaInKafkaOut.java:132),Some(1484939629500),Some(1484939630152),None))
>
>
> Here's my code:
>
> SparkPipelineOptions options =
> PipelineOptionsFactory.fromArgs(args).withValidation()
>         .as(SparkPipelineOptions.class);// PipelineOptionsFactory.create();
>
> options.setRunner(SparkRunner.class);
> options.setStreaming(true);
>
> Pipeline pipeline = Pipeline.create(options);
>
> PCollection<String> inStream = pipeline.apply("source",
>
> KafkaIO.read().withBootstrapServers(KAFKA_BROKER_IN).withTopics(Arrays.asList(KAFKA_IN_TOPIC))
>                 .withKeyCoder(ByteArrayCoder.of())
>                 .withValueCoder(StringUtf8Coder.of())
> //                .withMaxNumRecords(5)
>                 .withoutMetadata()
>                 )...
>
>
> Environment:
>
> Apache-Beam : 0.4.0
> Kafka: 0.9/0.10 (test both)
> Spark: 1.6.3
>
>
> I can run it by adding withMaxNumRecords(), however it's batch-onetime
> then.
>
> Any suggestion?
>
> Thanks!
> Mingmin
>

Reply via email to