Re: Spark streaming with Confluent kafka
The error is clear: Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config On Fri, 3 Jul 2020, 15:40 dwgw, wrote: > Hi > > I am trying to stream confluent kafka topic in the spark shell. For that i > have invoked spark shell using following command. > > # spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 > --conf > > "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf" > --driver-java-options > "-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf" --files > /home/spark/kafka_jaas.conf > > kafka_jaas.conf > - > > KafkaClient { > > org.apache.kafka.common.security.plain.PlainLoginModule required >username="XXX" >password="XXX"; > }; > > Readstream > - > > scala> val df = spark. > | readStream. > | format("kafka"). > | option("kafka.bootstrap.servers", "pkc-XXX.cloud:9092"). > | option("subscribe", "INTERNAL_VIS_R1227_CDC_ICX_SESSIONS"). > | option("kafka.sasl.mechanisms", "PLAIN"). > | option("kafka.security.protocol", "SASL_SSL"). > | option("startingOffsets", "earliest"). > | load. > | select($"value".cast("string").alias("value")) > df: org.apache.spark.sql.DataFrame = [value: string] > > Writestream > -- > > scala> df.writeStream. > | format("console"). > | outputMode("append"). > | trigger(Trigger.ProcessingTime("20 seconds")). > | start > 2020-07-03 04:07:48,366 WARN streaming.StreamingQueryManager: Temporary > checkpoint location created which is deleted normally when the query didn't > fail: /tmp/temporary-897996c3-a86a-4b7d-ac19-62168a14d279. If it's required > to delete it under any circumstances, please set > spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to > know deleting temp checkpoint folder is best effort. > res0: org.apache.spark.sql.streaming.StreamingQuery = > org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@324a5741 > > scala> 2020-07-03 04:07:49,139 WARN kafka010.KafkaOffsetReader: Error in > attempt 1 getting Kafka offsets: > org.apache.kafka.common.KafkaException: Failed to construct kafka consumer > at > > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:820) > at > > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:631) > at > > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:612) > at > > org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:76) > at > > org.apache.spark.sql.kafka010.KafkaOffsetReader.consumer(KafkaOffsetReader.scala:88) > at > > org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:538) > at > > org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:600) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at > > org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) > at > > org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:599) > at > > org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:536) > at > > org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:567) > at > > org.apache.spark.sql.kafka010.KafkaOffsetReader.partitionsAssignedToConsumer(KafkaOffsetReader.scala:536) > at > > org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchEarliestOffsets(KafkaOffsetReader.scala:298) > at > > org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:151) > at scala.Option.getOrElse(Option.scala:189) > at > > org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:148) > at > > org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:76) > at > > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:378) > at scala.Option.getOrElse(Option.scala:189) > at > > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:378) > at > > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352) > at > > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350) > at > > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) > at > > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:371) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > at scala.collection.immutable.Map$Map1.foreach(Map.scala:128) > at scala.collec
Spark streaming with Confluent kafka
Hi I am trying to stream confluent kafka topic in the spark shell. For that i have invoked spark shell using following command. # spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf" --driver-java-options "-Djava.security.auth.login.config=/home/spark/kafka_jaas.conf" --files /home/spark/kafka_jaas.conf kafka_jaas.conf - KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required username="XXX" password="XXX"; }; Readstream - scala> val df = spark. | readStream. | format("kafka"). | option("kafka.bootstrap.servers", "pkc-XXX.cloud:9092"). | option("subscribe", "INTERNAL_VIS_R1227_CDC_ICX_SESSIONS"). | option("kafka.sasl.mechanisms", "PLAIN"). | option("kafka.security.protocol", "SASL_SSL"). | option("startingOffsets", "earliest"). | load. | select($"value".cast("string").alias("value")) df: org.apache.spark.sql.DataFrame = [value: string] Writestream -- scala> df.writeStream. | format("console"). | outputMode("append"). | trigger(Trigger.ProcessingTime("20 seconds")). | start 2020-07-03 04:07:48,366 WARN streaming.StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-897996c3-a86a-4b7d-ac19-62168a14d279. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort. res0: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@324a5741 scala> 2020-07-03 04:07:49,139 WARN kafka010.KafkaOffsetReader: Error in attempt 1 getting Kafka offsets: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:820) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:631) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:612) at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:76) at org.apache.spark.sql.kafka010.KafkaOffsetReader.consumer(KafkaOffsetReader.scala:88) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$2(KafkaOffsetReader.scala:538) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:600) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) at org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:599) at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$partitionsAssignedToConsumer$1(KafkaOffsetReader.scala:536) at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:567) at org.apache.spark.sql.kafka010.KafkaOffsetReader.partitionsAssignedToConsumer(KafkaOffsetReader.scala:536) at org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchEarliestOffsets(KafkaOffsetReader.scala:298) at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.$anonfun$getOrCreateInitialPartitionOffsets$1(KafkaMicroBatchStream.scala:151) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.getOrCreateInitialPartitionOffsets(KafkaMicroBatchStream.scala:148) at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.initialOffset(KafkaMicroBatchStream.scala:76) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$5(MicroBatchExecution.scala:378) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$3(MicroBatchExecution.scala:378) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:371) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.immutable.Map$Map1.foreach(Map.scala:128) at scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:368) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at org.a
Re: REST Structured Steaming Sink
Hi Folks, Great discussion! I will take into account rate-limiting and make it configurable for the http request as well as all I was wondering if there is anything I might have missed that would make it technically impossible to do or at least difficult enough to not warrant the effort Is there anything I might have overlooked? Also, would this be useful to people? My idea is from a business perspective, why are we making them wait till the next scheduled batch run for data that is already available from an API. You could run a job every minute/hour but that in itself sounds like a streaming use-case Thoughts? Regards Sam On Thu, Jul 2, 2020 at 3:31 AM Burak Yavuz wrote: > Well, the difference is, a technical user writes the UDF and a > non-technical user may use this built-in thing (misconfigure it) and shoot > themselves in the foot. > > On Wed, Jul 1, 2020, 6:40 PM Andrew Melo wrote: > >> On Wed, Jul 1, 2020 at 8:13 PM Burak Yavuz wrote: >> > >> > I'm not sure having a built-in sink that allows you to DDOS servers is >> the best idea either. foreachWriter is typically used for such use cases, >> not foreachBatch. It's also pretty hard to guarantee exactly-once, rate >> limiting, etc. >> >> If you control the machines and can run arbitrary code, you can DDOS >> whatever you want. What's the difference between this proposal and >> writing a UDF that opens 1,000 connections to a target machine? >> >> > Best, >> > Burak >> > >> > On Wed, Jul 1, 2020 at 5:54 PM Holden Karau >> wrote: >> >> >> >> I think adding something like this (if it doesn't already exist) could >> help make structured streaming easier to use, foreachBatch is not the best >> API. >> >> >> >> On Wed, Jul 1, 2020 at 2:21 PM Jungtaek Lim < >> kabhwan.opensou...@gmail.com> wrote: >> >>> >> >>> I guess the method, query parameter, header, and the payload would be >> all different for almost every use case - that makes it hard to generalize >> and requires implementation to be pretty much complicated to be flexible >> enough. >> >>> >> >>> I'm not aware of any custom sink implementing REST so your best bet >> would be simply implementing your own with foreachBatch, but so someone >> might jump in and provide a pointer if there is something in the Spark >> ecosystem. >> >>> >> >>> Thanks, >> >>> Jungtaek Lim (HeartSaVioR) >> >>> >> >>> On Thu, Jul 2, 2020 at 3:21 AM Sam Elamin >> wrote: >> >> Hi All, >> >> >> We ingest alot of restful APIs into our lake and I'm wondering if it >> is at all possible to created a rest sink in structured streaming? >> >> For now I'm only focusing on restful services that have an >> incremental ID so my sink can just poll for new data then ingest. >> >> I can't seem to find a connector that does this and my gut instinct >> tells me it's probably because it isn't possible due to something >> completely obvious that I am missing >> >> I know some RESTful API obfuscate the IDs to a hash of strings and >> that could be a problem but since I'm planning on focusing on just >> numerical IDs that just get incremented I think I won't be facing that issue >> >> >> Can anyone let me know if this sounds like a daft idea? Will I need >> something like Kafka or kinesis as a buffer and redundancy or am I >> overthinking this? >> >> >> I would love to bounce ideas with people who runs structured >> streaming jobs in production >> >> >> Kind regards >> San >> >> >> >> >> >> >> >> -- >> >> Twitter: https://twitter.com/holdenkarau >> >> Books (Learning Spark, High Performance Spark, etc.): >> https://amzn.to/2MaRAG9 >> >> YouTube Live Streams: https://www.youtube.com/user/holdenkarau >> >