there're lots of dependencies in the use cases, including token/auth,
decoder schema loading,... which results in long time for initialization.

A connection pool is a good idea, I'm testing in my side as well to apply
it in KafkaIO side.


On Sat, Jan 21, 2017 at 1:05 PM, Amit Sela <amitsel...@gmail.com> wrote:

> You're right that in current implementation the Kafka consumer is being
> initialized for each micro-batch, which is not different from direct Kafka
> in native Spark (it is so because RDDs and their execution context live as
> long as the batch is processed, an implementation baed on receivers would
> be long lived but has resilience issues).
>
> I wonder why you would require 25s with Kafka 0.10 + external auth - is
> the authentication taking long ?
> Setting auth. aside, I did notice a funny behaviour where although a new
> consumer is created each microbatch, the first one takes far longer then
> the following, which is actually good.
>
> One way to improve init. in case of a large overhead could be to replace
> the creation of Readers with a lazy init. connection pool via broadcast
> variables - filed BEAM-1294
> <https://issues.apache.org/jira/browse/BEAM-1294>.
>
> Amit.
>
> On Sat, Jan 21, 2017 at 10:41 PM Xu Mingmin <mmxu1...@gmail.com> wrote:
>
> Thanks @Amit for the detailed design doc, that helps a lot to understand
> how it works in the backend.
>
> I think my error is probably caused due to it takes too long to read from
> Kafka. By increasing *readTimePercentage*, it could run with smaller
> *batchIntervalMillis*.
>
> With some debug work on KafkaIO, looks like
> *KafkaIO.UnboundedKafkaReader.start()* is called by every micro-batch.
> That makes sense why I need such a large interval time(larger than 25s for
> other test with kafka0.10+external_auth). I suppose start() should be
> invoked only once before the first microbatch, and keep alive.  Please
> correct me if any misunderstanding.
>
> Mingmin
>
> On Sat, Jan 21, 2017 at 1:35 AM, Amit Sela <amitsel...@gmail.com> wrote:
>
> Not sure why this would cause the application to crash, but I can give
> some background about how the Spark runner reads microbatches from Kafka
> (and generally UnboundedSources):
>
> With each microbatch having a pre-set interval time, the Spark runner
> would apply bounds on the read from the source (Kafka).
> By default, a time bound is set (10% of the interval, but no less then 200
> msec - those knobs are available in PipelineOptions), and a bound on the
> number of records can also be set (explicitly or by enabling Spark's
> backpressure mechanism).
> Those bounds are meant to avoid reading-forever-but-never-processing as
> Spark is a microbatch engine and so the readers have to stop at some point
> and "release" the read RDD as part of the DStream.
> This approach is somewhat similar to the integration with Spark's direct
> Kafka by generating the stream's initial RDDs by itself (unlike Receivers)
> but it's different because of the bounds, as it tries to avoid (too long)
> reads that will cause delays.
> If you're interested more in the internals of the design of Spark's read
> from UnboundedSources you can find it here
> <https://docs.google.com/document/d/12BzHbETDt7ICIF7vc8zzCeLllmIpvvaVDIdBlcIwE1M/edit?usp=sharing>
> .
>
> Back to your issue, since your pipeline only reads/writes and doesn't
> apply any time-consuming computations or shuffles, you could increase the
> read time percentage ("--readTimePercentage": (0, 1) defaults to 0.1).
>
> Let me know if this helps.
>
> Amit
>
> On Sat, Jan 21, 2017 at 12:07 AM Xu Mingmin <mmxu1...@gmail.com> wrote:
>
> Thanks @Amit.
>
> I tried the parameters mentioned, seems the core setting is
> '--batchIntervalMillis'. I can run the simple KafkaInKafkaOut job with
> value >= 3000 only, no matter value for --maxRecordsPerBatch, or
> --sparkMaster=local[2] / --sparkMaster=local[*].
>
> Here's the kafka consumer configuration, not sure why >=3000ms works.
>
> 17/01/20 13:27:54 INFO ConsumerConfig: ConsumerConfig values:
>     metric.reporters = []
>     metadata.max.age.ms = 300000
>     partition.assignment.strategy = [org.apache.kafka.clients.
> consumer.RangeAssignor]
>     reconnect.backoff.ms = 50
>     sasl.kerberos.ticket.renew.window.factor = 0.8
>     max.partition.fetch.bytes = 1048576
>     bootstrap.servers = [***:9092]
>     ssl.keystore.type = JKS
>     enable.auto.commit = false
>     sasl.mechanism = GSSAPI
>     interceptor.classes = null
>     exclude.internal.topics = true
>     ssl.truststore.password = null
>     client.id = consumer-6
>     ssl.endpoint.identification.algorithm = null
>     max.poll.records = 2147483647 <(214)%20748-3647>
>     check.crcs = true
>     request.timeout.ms = 40000
>     heartbeat.interval.ms = 3000
>     auto.commit.interval.ms = 5000
>     receive.buffer.bytes = 524288
>     sasl.login.class = null
>     ssl.truststore.type = JKS
>     ssl.truststore.location = null
>     ssl.keystore.password = null
>     fetch.min.bytes = 1
>     send.buffer.bytes = 131072
>     value.deserializer = class org.apache.kafka.common.serialization.
> ByteArrayDeserializer
>     group.id = Reader-0_offset_consumer_676510629_none
>     retry.backoff.ms = 100
>     sasl.kerberos.kinit.cmd = /usr/bin/kinit
>     sasl.kerberos.service.name = null
>     sasl.kerberos.ticket.renew.jitter = 0.05
>     ssl.trustmanager.algorithm = PKIX
>     ssl.key.password = null
>     fetch.max.wait.ms = 500
>     sasl.kerberos.min.time.before.relogin = 60000
>     connections.max.idle.ms = 540000
>     session.timeout.ms = 30000
>     metrics.num.samples = 2
>     key.deserializer = class org.apache.kafka.common.serialization.
> ByteArrayDeserializer
>     ssl.protocol = TLS
>     ssl.provider = null
>     ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>     ssl.keystore.location = null
>     ssl.cipher.suites = null
>     security.protocol = PLAINTEXT
>     ssl.keymanager.algorithm = SunX509
>     metrics.sample.window.ms = 30000
>     sasl.callback.handler.class = null
>     auto.offset.reset = latest
>
>
>
>
> On Fri, Jan 20, 2017 at 12:19 PM, Amit Sela <amitsel...@gmail.com> wrote:
>
> Beam streaming pipelines over the spark runner are still experimental so
> there's nothing official, but at PayPal we're running a version which is
> very similar to the Apache branch, especially if your only executing
> Kafka-in-Kafka-out pipeline.
>
> From my experience with Spark, this usually means that something caused
> Spark to stop, and in case there was no clear exception failing the job it
> might indicate insufficient resources.
>
> Have you tried setting the option: "--sparkMaster local[*]" which will use
> the number of threads (spark believes) your machine can provide instead of
> the default 4 set in SparkPipelineOptions.
> If resources are indeed the problem you could also try increasing the
> batch interval (--batchIntervalMillis) (default: 1000) and even bounding
> the records read in each microbatch ("--maxRecordsPerBatch") (default: -1).
>
> On Fri, Jan 20, 2017 at 10:08 PM Xu Mingmin <mmxu1...@gmail.com> wrote:
>
> it's running in local-mode for test now. I tried with 0.5.0-SNAPSHOT, with
> the same error:
>
> 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already
> stopped! Dropping event 
> SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId(driver,
> localhost, 59305),rdd_30_0,StorageLevel(false, true, false, true,
> 1),11616,0,0))
> 17/01/20 12:01:30 INFO BlockManagerInfo: Added rdd_31_0 in memory on
> localhost:59305 (size: 1340.0 B, free: 1124.5 MB)
> 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already
> stopped! Dropping event 
> SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId(driver,
> localhost, 59305),rdd_31_0,StorageLevel(false, true, false, false,
> 1),1340,0,0))
> 17/01/20 12:01:30 INFO Executor: Finished task 0.0 in stage 15.0 (TID 19).
> 2956 bytes result sent to driver
> 17/01/20 12:01:30 INFO DAGScheduler: ResultStage 15 (DStream at
> SparkUnboundedSource.java:154) failed in 0.564 s
> 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already
> stopped! Dropping event SparkListenerStageCompleted(
> org.apache.spark.scheduler.StageInfo@43f0ada9)
> 17/01/20 12:01:30 ERROR LiveListenerBus: SparkListenerBus has already
> stopped! Dropping event 
> SparkListenerJobEnd(3,1484942490827,JobFailed(org.apache.spark.SparkException:
> Job 3 cancelled because SparkContext was shut down))
>
>
> Btw, is there a runnable example for Spark streaming so I can refer to?
>
> Thanks!
> Mingmin
>
> On Fri, Jan 20, 2017 at 11:45 AM, Amit Sela <amitsel...@gmail.com> wrote:
>
> The WakeupException is being logged and not thrown (it is OK since the
> reader was closed due to end-of-microbatch), so I wonder what causes "ERROR
> StreamingListenerBus: StreamingListenerBus has already stopped".
>
> Are you running in local-mode ("local[*]") ? or over YARN ?
> Any specific options you're using ?
> Would you mind trying the Beam Snapshot ? 0.5.0-SNAPSHOT
>
> Amit.
>
> On Fri, Jan 20, 2017 at 9:20 PM Xu Mingmin <mmxu1...@gmail.com> wrote:
>
> Hello all,
>
> I'm working on a streaming POC project, which is written with Beam API,
> and run on both FlinkRunner and SparkRunner. It works good on Flink,
> however I cannot run it on SparkRunner.
>
> Currently I run it locally, and get this exception:
>
> 17/01/20 11:13:49 WARN KafkaIO: Reader-0: exception while fetching latest
> offsets. ignored.
> org.apache.kafka.common.errors.WakeupException
>     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.
> maybeTriggerWakeup(ConsumerNetworkClient.java:404)
>     at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:245)
>     at org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209)
>     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.
> awaitMetadataUpdate(ConsumerNetworkClient.java:148)
>     at org.apache.kafka.clients.consumer.internals.Fetcher.
> getOffsetsByTimes(Fetcher.java:374)
>     at org.apache.kafka.clients.consumer.internals.Fetcher.
> resetOffset(Fetcher.java:341)
>     at org.apache.kafka.clients.consumer.internals.Fetcher.
> resetOffsetsIfNeeded(Fetcher.java:197)
>     at org.apache.kafka.clients.consumer.KafkaConsumer.
> updateFetchPositions(KafkaConsumer.java:1524)
>     at org.apache.kafka.clients.consumer.KafkaConsumer.
> position(KafkaConsumer.java:1242)
>     at com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader.
> updateLatestOffsets(KafkaIO.java:1059)
>     at com.ebay.dss.beam.common.kafka.KafkaIO$
> UnboundedKafkaReader.access$3(KafkaIO.java:1055)
>     at com.ebay.dss.beam.common.kafka.KafkaIO$UnboundedKafkaReader$3.run(
> KafkaIO.java:966)
>     at java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     at java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
>
> 17/01/20 11:13:50 ERROR StreamingListenerBus: StreamingListenerBus has
> already stopped! Dropping event StreamingListenerOutputOperationCompleted(
> OutputOperationInfo(1484939618000 ms,0,foreachRDD at
> UnboundedDataset.java:102,org.apache.spark.streaming.api.
> java.AbstractJavaDStreamLike.foreachRDD(JavaDStreamLike.scala:42)
> org.apache.beam.runners.spark.translation.streaming.
> UnboundedDataset.action(UnboundedDataset.java:102)
> org.apache.beam.runners.spark.translation.EvaluationContext.
> computeOutputs(EvaluationContext.java:164)
> org.apache.beam.runners.spark.translation.streaming.
> SparkRunnerStreamingContextFactory.create(SparkRunnerStreamingContextFac
> tory.java:78)
> org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(
> JavaStreamingContext.scala:706)
> org.apache.spark.streaming.api.java.JavaStreamingContext$$anonfun$7.apply(
> JavaStreamingContext.scala:705)
> scala.Option.getOrElse(Option.scala:120)
> org.apache.spark.streaming.StreamingContext$.getOrCreate(
> StreamingContext.scala:864)
> org.apache.spark.streaming.api.java.JavaStreamingContext$.getOrCreate(
> JavaStreamingContext.scala:705)
> org.apache.spark.streaming.api.java.JavaStreamingContext.getOrCreate(
> JavaStreamingContext.scala)
> org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:159)
> org.apache.beam.runners.spark.SparkRunner.run(SparkRunner.java:81)
> org.apache.beam.sdk.Pipeline.run(Pipeline.java:176)
> com.ebay.dss.beam.spark.streaming.KafkaInKafkaOut.
> main(KafkaInKafkaOut.java:132),Some(1484939629500),Some(
> 1484939630152),None))
>
>
> Here's my code:
>
> SparkPipelineOptions options = PipelineOptionsFactory.
> fromArgs(args).withValidation()
>         .as(SparkPipelineOptions.class);// PipelineOptionsFactory.create(
> );
>
> options.setRunner(SparkRunner.class);
> options.setStreaming(true);
>
> Pipeline pipeline = Pipeline.create(options);
>
> PCollection<String> inStream = pipeline.apply("source",
>         KafkaIO.read().withBootstrapServers(KAFKA_
> BROKER_IN).withTopics(Arrays.asList(KAFKA_IN_TOPIC))
>                 .withKeyCoder(ByteArrayCoder.of())
>                 .withValueCoder(StringUtf8Coder.of())
> //                .withMaxNumRecords(5)
>                 .withoutMetadata()
>                 )...
>
>
> Environment:
>
> Apache-Beam : 0.4.0
> Kafka: 0.9/0.10 (test both)
> Spark: 1.6.3
>
>
> I can run it by adding withMaxNumRecords(), however it's batch-onetime
> then.
>
> Any suggestion?
>
> Thanks!
> Mingmin
>
>
>
>
>

Reply via email to