there're lots of dependencies in the use cases, including token/auth, decoder schema loading,... which results in long time for initialization.
A connection pool is a good idea, I'm testing in my side as well to apply it in KafkaIO side. On Sat, Jan 21, 2017 at 1:05 PM, Amit Sela <amitsel...@gmail.com> wrote: > You're right that in current implementation the Kafka consumer is being > initialized for each micro-batch, which is not different from direct Kafka > in native Spark (it is so because RDDs and their execution context live as > long as the batch is processed, an implementation baed on receivers would > be long lived but has resilience issues). > > I wonder why you would require 25s with Kafka 0.10 + external auth - is > the authentication taking long ? > Setting auth. aside, I did notice a funny behaviour where although a new > consumer is created each microbatch, the first one takes far longer then > the following, which is actually good. > > One way to improve init. in case of a large overhead could be to replace > the creation of Readers with a lazy init. connection pool via broadcast > variables - filed BEAM-1294 > <https://issues.apache.org/jira/browse/BEAM-1294>. > > Amit. > > On Sat, Jan 21, 2017 at 10:41 PM Xu Mingmin <mmxu1...@gmail.com> wrote: > > Thanks @Amit for the detailed design doc, that helps a lot to understand > how it works in the backend. > > I think my error is probably caused due to it takes too long to read from > Kafka. By increasing *readTimePercentage*, it could run with smaller > *batchIntervalMillis*. > > With some debug work on KafkaIO, looks like > *KafkaIO.UnboundedKafkaReader.start()* is called by every micro-batch. > That makes sense why I need such a large interval time(larger than 25s for > other test with kafka0.10+external_auth). I suppose start() should be > invoked only once before the first microbatch, and keep alive. Please > correct me if any misunderstanding. > > Mingmin > > On Sat, Jan 21, 2017 at 1:35 AM, Amit Sela <amitsel...@gmail.com> wrote: > > Not sure why this would cause the application to crash, but I can give > some background about how the Spark runner reads microbatches from Kafka > (and generally UnboundedSources): > > With each microbatch having a pre-set interval time, the Spark runner > would apply bounds on the read from the source (Kafka). > By default, a time bound is set (10% of the interval, but no less then 200 > msec - those knobs are available in PipelineOptions), and a bound on the > number of records can also be set (explicitly or by enabling Spark's > backpressure mechanism). > Those bounds are meant to avoid reading-forever-but-never-processing as > Spark is a microbatch engine and so the readers have to stop at some point > and "release" the read RDD as part of the DStream. > This approach is somewhat similar to the integration with Spark's direct > Kafka by generating the stream's initial RDDs by itself (unlike Receivers) > but it's different because of the bounds, as it tries to avoid (too long) > reads that will cause delays. > If you're interested more in the internals of the design of Spark's read > from UnboundedSources you can find it here > <https://docs.google.com/document/d/12BzHbETDt7ICIF7vc8zzCeLllmIpvvaVDIdBlcIwE1M/edit?usp=sharing> > . > > Back to your issue, since your pipeline only reads/writes and doesn't > apply any time-consuming computations or shuffles, you could increase the > read time percentage ("--readTimePercentage": (0, 1) defaults to 0.1). > > Let me know if this helps. > > Amit > > On Sat, Jan 21, 2017 at 12:07 AM Xu Mingmin <mmxu1...@gmail.com> wrote: > > Thanks @Amit. > > I tried the parameters mentioned, seems the core setting is > '--batchIntervalMillis'. I can run the simple KafkaInKafkaOut job with > value >= 3000 only, no matter value for --maxRecordsPerBatch, or > --sparkMaster=local[2] / --sparkMaster=local[*]. > > Here's the kafka consumer configuration, not sure why >=3000ms works. > > 17/01/20 13:27:54 INFO ConsumerConfig: ConsumerConfig values: > metric.reporters = [] > metadata.max.age.ms = 300000 > partition.assignment.strategy = [org.apache.kafka.clients. > consumer.RangeAssignor] > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > max.partition.fetch.bytes = 1048576 > bootstrap.servers = [***:9092] > ssl.keystore.type = JKS > enable.auto.commit = false > sasl.mechanism = GSSAPI > interceptor.classes = null > exclude.internal.topics = true > ssl.truststore.password = null > client.id = consumer-6 > ssl.endpoint.identification.algorithm = null > max.poll.records = 2147483647 <(214)%20748-3647> > check.crcs = true > request.timeout.ms = 40000 > heartbeat.interval.ms = 3000 > auto.commit.interval.ms = 5000 > receive.buffer.bytes = 524288 > sasl.login.class = null > ssl.truststore.type = JKS > ssl.truststore.location = null > ssl.keystore.password = null > fetch.min.bytes = 1 > send.buffer.bytes = 131072 > value.deserializer = class org.apache.kafka.common.serialization. > ByteArrayDeserializer > group.id = Reader-0_offset_consumer_676510629_none > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.trustmanager.algorithm = PKIX > ssl.key.password = null > fetch.max.wait.ms = 500 > sasl.kerberos.min.time.before.relogin = 60000 > connections.max.idle.ms = 540000 > session.timeout.ms = 30000 > metrics.num.samples = 2 > key.deserializer = class org.apache.kafka.common.serialization. > ByteArrayDeserializer > ssl.protocol = TLS > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.keystore.location = null > ssl.cipher.suites = null > security.protocol = PLAINTEXT > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 30000 > sasl.callback.handler.class = null > auto.offset.reset = latest > > > > > On Fri, Jan 20, 2017 at 12:19 PM, Amit Sela <amitsel...@gmail.com> wrote: > > Beam streaming pipelines over the spark runner are still experimental so > there's nothing official, but at PayPal we're running a version which is > very similar to the Apache branch, especially if your only executing > Kafka-in-Kafka-out pipeline. > > From my experience with Spark, this usually means that something caused > Spark to stop, and in case there was no clear exception failing the job it > might indicate insufficient resources. > > Have you tried setting the option: "--sparkMaster local[*]" which will use > the number of threads (spark believes) your machine can provide instead of > the default 4 set in SparkPipelineOptions. > If resources are indeed the problem you could also try increasing the > batch interval (--batchIntervalMillis) (default: 1000) and even bounding > the records read in each microbatch ("--maxRecordsPerBatch") (default: -1). > > On Fri, Jan 20, 2017 at 10:08 PM Xu Mingmin <mmxu1...@gmail.com> wrote: > > it's running in local-mode for test now. I tried with 0.5.0-SNAPSHOT, with > the same error: > > 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already > stopped! Dropping event > SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId(driver, > localhost, 59305),rdd_30_0,StorageLevel(false, true, false, true, > 1),11616,0,0)) > 17/01/20 12:01:30 INFO BlockManagerInfo: Added rdd_31_0 in memory on > localhost:59305 (size: 1340.0 B, free: 1124.5 MB) > 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already > stopped! Dropping event > SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId(driver, > localhost, 59305),rdd_31_0,StorageLevel(false, true, false, false, > 1),1340,0,0)) > 17/01/20 12:01:30 INFO Executor: Finished task 0.0 in stage 15.0 (TID 19). > 2956 bytes result sent to driver > 17/01/20 12:01:30 INFO DAGScheduler: ResultStage 15 (DStream at > SparkUnboundedSource.java:154) failed in 0.564 s > 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already > stopped! Dropping event SparkListenerStageCompleted( > org.apache.spark.scheduler.StageInfo@43f0ada9) > 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already > stopped! Dropping event > SparkListenerJobEnd(3,1484942490827,JobFailed(org.apache.spark.SparkException: > Job 3 cancelled because SparkContext was shut down)) > > > Btw, is there a runnable example for Spark streaming so I can refer to? > > Thanks! > Mingmin > > On Fri, Jan 20, 2017 at 11:45 AM, Amit Sela <amitsel...@gmail.com> wrote: > > The WakeupException is being logged and not thrown (it is OK since the > reader was closed due to end-of-microbatch), so I wonder what causes "ERROR > StreamingListenerBus: StreamingListenerBus has already stopped". > > Are you running in local-mode ("local[*]") ? or over YARN ? > Any specific options you're using ? > Would you mind trying the Beam Snapshot ? 0.5.0-SNAPSHOT > > Amit. > > On Fri, Jan 20, 2017 at 9:20 PM Xu Mingmin <mmxu1...@gmail.com> wrote: > > Hello all, > > I'm working on a streaming POC project, which is written with Beam API, > and run on both FlinkRunner and SparkRunner. It works good on Flink, > however I cannot run it on SparkRunner. > > Currently I run it locally, and get this exception: > > 17/01/20 11:13:49 WARN KafkaIO: Reader-0: exception while fetching latest > offsets. ignored. > org.apache.kafka.common.errors.WakeupException > at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient. > maybeTriggerWakeup(ConsumerNetworkClient.java:404) > at org.apache.kafka.clients.consumer.internals. > ConsumerNetworkClient.poll(ConsumerNetworkClient.java:245) > at org.apache.kafka.clients.consumer.internals. > ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209) > at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient. > awaitMetadataUpdate(ConsumerNetworkClient.java:148) > at org.apache.kafka.clients.consumer.internals.Fetcher. > getOffsetsByTimes(Fetcher.java:374) > at org.apache.kafka.clients.consumer.internals.Fetcher. > resetOffset(Fetcher.java:341) > at org.apache.kafka.clients.consumer.internals.Fetcher. > resetOffsetsIfNeeded(Fetcher.java:197) > at org.apache.kafka.clients.consumer.KafkaConsumer. > updateFetchPositions(KafkaConsumer.java:1524) > at org.apache.kafka.clients.consumer.KafkaConsumer. > position(KafkaConsumer.java:1242) > at com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader. > updateLatestOffsets(KafkaIO.java:1059) > at com.ebay.dss.beam.common.kafka.KafkaIO$ > UnboundedKafkaReader.access$3(KafkaIO.java:1055) > at com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader$3.run( > KafkaIO.java:966) > at java.util.concurrent.Executors$RunnableAdapter. > call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at java.util.concurrent.ScheduledThreadPoolExecutor$ > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at java.util.concurrent.ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > 17/01/20 11:13:50 ERROR StreamingListenerBus: StreamingListenerBus has > already stopped! Dropping event StreamingListenerOutputOperationCompleted( > OutputOperationInfo(1484939618000 ms,0,foreachRDD at > UnboundedDataset.java:102,org.apache.spark.streaming.api. > java.AbstractJavaDStreamLike.foreachRDD(JavaDStreamLike.scala:42) > org.apache.beam.runners.spark.translation.streaming. > UnboundedDataset.action(UnboundedDataset.java:102) > org.apache.beam.runners.spark.translation.EvaluationContext. > computeOutputs(EvaluationContext.java:164) > org.apache.beam.runners.spark.translation.streaming. > SparkRunnerStreamingContextFactory.create(SparkRunnerStreamingContextFac > tory.java:78) > org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply( > JavaStreamingContext.scala:706) > org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply( > JavaStreamingContext.scala:705) > scala.Option.getOrElse(Option.scala:120) > org.apache.spark.streaming.StreamingContext$.getOrCreate( > StreamingContext.scala:864) > org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate( > JavaStreamingContext.scala:705) > org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate( > JavaStreamingContext.scala) > org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:159) > org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:81) > org.apache.beam.sdk.Pipeline.run(Pipeline.java:176) > com.ebay.dss.beam.spark.streaming.KafkaInKafkaOut. > main(KafkaInKafkaOut.java:132),Some(1484939629500),Some( > 1484939630152),None)) > > > Here's my code: > > SparkPipelineOptions options = PipelineOptionsFactory. > fromArgs(args).withValidation() > .as(SparkPipelineOptions.class);// PipelineOptionsFactory.create( > ); > > options.setRunner(SparkRunner.class); > options.setStreaming(true); > > Pipeline pipeline = Pipeline.create(options); > > PCollection<String> inStream = pipeline.apply("source", > KafkaIO.read().withBootstrapServers(KAFKA_ > BROKER_IN).withTopics(Arrays.asList(KAFKA_IN_TOPIC)) > .withKeyCoder(ByteArrayCoder.of()) > .withValueCoder(StringUtf8Coder.of()) > // .withMaxNumRecords(5) > .withoutMetadata() > )... > > > Environment: > > Apache-Beam : 0.4.0 > Kafka: 0.9/0.10 (test both) > Spark: 1.6.3 > > > I can run it by adding withMaxNumRecords(), however it's batch-onetime > then. > > Any suggestion? > > Thanks! > Mingmin > > > > >