it's running in local-mode for test now. I tried with 0.5.0-SNAPSHOT, with
the same error:

> 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already
> stopped! Dropping event
> SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId(driver,
> localhost, 59305),rdd_30_0,StorageLevel(false, true, false, true,
> 1),11616,0,0))
> 17/01/20 12:01:30 INFO BlockManagerInfo: Added rdd_31_0 in memory on
> localhost:59305 (size: 1340.0 B, free: 1124.5 MB)
> 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already
> stopped! Dropping event
> SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId(driver,
> localhost, 59305),rdd_31_0,StorageLevel(false, true, false, false,
> 1),1340,0,0))
> 17/01/20 12:01:30 INFO Executor: Finished task 0.0 in stage 15.0 (TID 19).
> 2956 bytes result sent to driver
> 17/01/20 12:01:30 INFO DAGScheduler: ResultStage 15 (DStream at
> SparkUnboundedSource.java:154) failed in 0.564 s
> 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already
> stopped! Dropping event
> SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@43f0ada9)
> 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already
> stopped! Dropping event
> SparkListenerJobEnd(3,1484942490827,JobFailed(org.apache.spark.SparkException:
> Job 3 cancelled because SparkContext was shut down))
>

Btw, is there a runnable example for Spark streaming so I can refer to?

Thanks!
Mingmin

On Fri, Jan 20, 2017 at 11:45 AM, Amit Sela <amitsel...@gmail.com> wrote:

> The WakeupException is being logged and not thrown (it is OK since the
> reader was closed due to end-of-microbatch), so I wonder what causes "ERROR
> StreamingListenerBus: StreamingListenerBus has already stopped".
>
> Are you running in local-mode ("local[*]") ? or over YARN ?
> Any specific options you're using ?
> Would you mind trying the Beam Snapshot ? 0.5.0-SNAPSHOT
>
> Amit.
>
> On Fri, Jan 20, 2017 at 9:20 PM Xu Mingmin <mmxu1...@gmail.com> wrote:
>
>> Hello all,
>>
>> I'm working on a streaming POC project, which is written with Beam API,
>> and run on both FlinkRunner and SparkRunner. It works good on Flink,
>> however I cannot run it on SparkRunner.
>>
>> Currently I run it locally, and get this exception:
>>
>> 17/01/20 11:13:49 WARN KafkaIO: Reader-0: exception while fetching latest
>> offsets. ignored.
>> org.apache.kafka.common.errors.WakeupException
>>     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.
>> maybeTriggerWakeup(ConsumerNetworkClient.java:404)
>>     at org.apache.kafka.clients.consumer.internals.
>> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:245)
>>     at org.apache.kafka.clients.consumer.internals.
>> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209)
>>     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.
>> awaitMetadataUpdate(ConsumerNetworkClient.java:148)
>>     at org.apache.kafka.clients.consumer.internals.Fetcher.
>> getOffsetsByTimes(Fetcher.java:374)
>>     at org.apache.kafka.clients.consumer.internals.Fetcher.
>> resetOffset(Fetcher.java:341)
>>     at org.apache.kafka.clients.consumer.internals.Fetcher.
>> resetOffsetsIfNeeded(Fetcher.java:197)
>>     at org.apache.kafka.clients.consumer.KafkaConsumer.
>> updateFetchPositions(KafkaConsumer.java:1524)
>>     at org.apache.kafka.clients.consumer.KafkaConsumer.
>> position(KafkaConsumer.java:1242)
>>     at com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader.
>> updateLatestOffsets(KafkaIO.java:1059)
>>     at com.ebay.dss.beam.common.kafka.KafkaIO$
>> UnboundedKafkaReader.access$3(KafkaIO.java:1055)
>>     at com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader$3.run(
>> KafkaIO.java:966)
>>     at java.util.concurrent.Executors$RunnableAdapter.
>> call(Executors.java:511)
>>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>>     at java.util.concurrent.ScheduledThreadPoolExecutor$
>> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>>     at java.util.concurrent.ScheduledThreadPoolExecutor$
>> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1142)
>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:617)
>>     at java.lang.Thread.run(Thread.java:745)
>>
>> 17/01/20 11:13:50 ERROR StreamingListenerBus: StreamingListenerBus has
>> already stopped! Dropping event StreamingListenerOutputOperati
>> onCompleted(OutputOperationInfo(1484939618000 ms,0,foreachRDD at
>> UnboundedDataset.java:102,org.apache.spark.streaming.api.
>> java.AbstractJavaDStreamLike.foreachRDD(JavaDStreamLike.scala:42)
>> org.apache.beam.runners.spark.translation.streaming.
>> UnboundedDataset.action(UnboundedDataset.java:102)
>> org.apache.beam.runners.spark.translation.EvaluationContext.
>> computeOutputs(EvaluationContext.java:164)
>> org.apache.beam.runners.spark.translation.streaming.
>> SparkRunnerStreamingContextFactory.create(SparkRunnerStreamingContextFac
>> tory.java:78)
>> org.apache.spark.streaming.api.java.JavaStreamingContext$
>> $anonfun$7.apply(JavaStreamingContext.scala:706)
>> org.apache.spark.streaming.api.java.JavaStreamingContext$
>> $anonfun$7.apply(JavaStreamingContext.scala:705)
>> scala.Option.getOrElse(Option.scala:120)
>> org.apache.spark.streaming.StreamingContext$.getOrCreate(
>> StreamingContext.scala:864)
>> org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(
>> JavaStreamingContext.scala:705)
>> org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(
>> JavaStreamingContext.scala)
>> org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:159)
>> org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:81)
>> org.apache.beam.sdk.Pipeline.run(Pipeline.java:176)
>> com.ebay.dss.beam.spark.streaming.KafkaInKafkaOut.
>> main(KafkaInKafkaOut.java:132),Some(1484939629500),Some(
>> 1484939630152),None))
>>
>>
>> Here's my code:
>>
>> SparkPipelineOptions options = PipelineOptionsFactory.
>> fromArgs(args).withValidation()
>>         .as(SparkPipelineOptions.class);// PipelineOptionsFactory.create(
>> );
>>
>> options.setRunner(SparkRunner.class);
>> options.setStreaming(true);
>>
>> Pipeline pipeline = Pipeline.create(options);
>>
>> PCollection<String> inStream = pipeline.apply("source",
>>         KafkaIO.read().withBootstrapServers(KAFKA_
>> BROKER_IN).withTopics(Arrays.asList(KAFKA_IN_TOPIC))
>>                 .withKeyCoder(ByteArrayCoder.of())
>>                 .withValueCoder(StringUtf8Coder.of())
>> //                .withMaxNumRecords(5)
>>                 .withoutMetadata()
>>                 )...
>>
>>
>> Environment:
>>
>> Apache-Beam : 0.4.0
>> Kafka: 0.9/0.10 (test both)
>> Spark: 1.6.3
>>
>>
>> I can run it by adding withMaxNumRecords(), however it's batch-onetime
>> then.
>>
>> Any suggestion?
>>
>> Thanks!
>> Mingmin
>>
>

Reply via email to