Re: Spark 3.1 Json4s-native jar compatibility
Thanks Sean/Martin, my bad, Spark version was 3.0.1 so after using json 3.6.6 it fixed the issue. Thanks Amit On Fri, Feb 4, 2022 at 3:37 PM Sean Owen wrote: > My guess is that something else you depend on is actually bringing in a > different json4s, or you're otherwise mixing library/Spark versions. Use > mvn dependency:tree or equivalent on your build to see what you actually > build in. You probably do not need to include json4s at all as it is in > Spark anway > > On Fri, Feb 4, 2022 at 2:35 PM Amit Sharma wrote: > >> Martin Sean, changed it to 3.7.0-MS still getting the below error. >> I am still getting the same issue >> Exception in thread "streaming-job-executor-0" >> java.lang.NoSuchMethodError: >> org.json4s.ShortTypeHints$.apply$default$2()Ljava/lang/String; >> >> >> Thanks >> Amit >> >> On Fri, Feb 4, 2022 at 9:03 AM Martin Grigorov >> wrote: >> >>> Hi, >>> >>> Amit said that he uses Spark 3.1, so the link should be >>> https://github.com/apache/spark/blob/branch-3.1/pom.xml#L879 (3.7.0-M5) >>> >>> @Amit: check your classpath. Maybe there are more jars of this >>> dependency. >>> >>> On Thu, Feb 3, 2022 at 10:53 PM Sean Owen wrote: >>> >>>> You can look it up: >>>> https://github.com/apache/spark/blob/branch-3.2/pom.xml#L916 >>>> 3.7.0-M11 >>>> >>>> On Thu, Feb 3, 2022 at 1:57 PM Amit Sharma >>>> wrote: >>>> >>>>> Hello, everyone. I am migrating my spark stream to spark version 3.1. >>>>> I also upgraded json version as below >>>>> >>>>> libraryDependencies += "org.json4s" %% "json4s-native" % "3.7.0-M5" >>>>> >>>>> >>>>> While running the job I getting an error for the below code where I am >>>>> serializing the given inputs. >>>>> >>>>> implicit val formats = >>>>> Serialization.formats(ShortTypeHints(List(classOf[ForecastResponse], >>>>> classOf[OverlayRequest], >>>>> classOf[FTEResponseFromSpark], classOf[QuotaResponse], >>>>> classOf[CloneResponse] >>>>> >>>>> ))) >>>>> >>>>> >>>>> Exception in thread "streaming-job-executor-4" >>>>> java.lang.NoSuchMethodError: >>>>> org.json4s.ShortTypeHints$.apply$default$2()Ljava/lang/String; >>>>> >>>>> It seems to me jar issue, not sure which version of json4s-native should >>>>> I use with spark 3.1. >>>>> >>>>>
Re: Spark 3.1 Json4s-native jar compatibility
Martin Sean, changed it to 3.7.0-MS still getting the below error. I am still getting the same issue Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError: org.json4s.ShortTypeHints$.apply$default$2()Ljava/lang/String; Thanks Amit On Fri, Feb 4, 2022 at 9:03 AM Martin Grigorov wrote: > Hi, > > Amit said that he uses Spark 3.1, so the link should be > https://github.com/apache/spark/blob/branch-3.1/pom.xml#L879 (3.7.0-M5) > > @Amit: check your classpath. Maybe there are more jars of this dependency. > > On Thu, Feb 3, 2022 at 10:53 PM Sean Owen wrote: > >> You can look it up: >> https://github.com/apache/spark/blob/branch-3.2/pom.xml#L916 >> 3.7.0-M11 >> >> On Thu, Feb 3, 2022 at 1:57 PM Amit Sharma wrote: >> >>> Hello, everyone. I am migrating my spark stream to spark version 3.1. I >>> also upgraded json version as below >>> >>> libraryDependencies += "org.json4s" %% "json4s-native" % "3.7.0-M5" >>> >>> >>> While running the job I getting an error for the below code where I am >>> serializing the given inputs. >>> >>> implicit val formats = >>> Serialization.formats(ShortTypeHints(List(classOf[ForecastResponse], >>> classOf[OverlayRequest], >>> classOf[FTEResponseFromSpark], classOf[QuotaResponse], >>> classOf[CloneResponse] >>> >>> ))) >>> >>> >>> Exception in thread "streaming-job-executor-4" java.lang.NoSuchMethodError: >>> org.json4s.ShortTypeHints$.apply$default$2()Ljava/lang/String; >>> >>> It seems to me jar issue, not sure which version of json4s-native should I >>> use with spark 3.1. >>> >>>
Spark 3.1 Json4s-native jar compatibility
Hello, everyone. I am migrating my spark stream to spark version 3.1. I also upgraded json version as below libraryDependencies += "org.json4s" %% "json4s-native" % "3.7.0-M5" While running the job I getting an error for the below code where I am serializing the given inputs. implicit val formats = Serialization.formats(ShortTypeHints(List(classOf[ForecastResponse], classOf[OverlayRequest], classOf[FTEResponseFromSpark], classOf[QuotaResponse], classOf[CloneResponse] ))) Exception in thread "streaming-job-executor-4" java.lang.NoSuchMethodError: org.json4s.ShortTypeHints$.apply$default$2()Ljava/lang/String; It seems to me jar issue, not sure which version of json4s-native should I use with spark 3.1.
Re: Kafka to spark streaming
Thanks Mich. The link you shared have two options Kafka and Socket only. Thanks Amit On Sat, Jan 29, 2022 at 3:49 AM Mich Talebzadeh wrote: > So you have a classic architecture with spark receiving events through a > kafka topic via kafka-spark-connector, do something with it and send data > out to the consumer. Are you using Spark structured streaming here with > batch streaming? check > > > https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#structured-streaming-programming-guide > > HTH > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Fri, 28 Jan 2022 at 22:14, Amit Sharma wrote: > >> Hello everyone, we have spark streaming application. We send request to >> stream through Akka actor using Kafka topic. We wait for response as it is >> real time. Just want a suggestion is there any better option like Livy >> where we can send and receive request to spark streaming. >> >> >> Thanks >> Amit >> >
Kafka to spark streaming
Hello everyone, we have spark streaming application. We send request to stream through Akka actor using Kafka topic. We wait for response as it is real time. Just want a suggestion is there any better option like Livy where we can send and receive request to spark streaming. Thanks Amit
Fwd: Cassandra driver upgrade
I am upgrading my cassandra java driver version to the latest 4.13. I have a Cassandra cluster using Cassandra version 3.11.11. I am getting the below runtime error while connecting to cassandra. Before version 4.13 I was using version 3.9 and things were working fine. c.d.o.d.i.c.c.ControlConnection - [s0] Error connecting to Node(endPoint=/ 127.0.0.1:9042, hostId=null, hashCode=5495a763), trying next node (ConnectionInitException: [s0|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (io.netty.channel.StacklessClosedChannelException)) Please suggest. it has blocked my production release. Thanks Amit
Re: Spark 3.2.0 upgrade
k" %% "spark-streaming" % sparkVersion , "org.apache.spark" %% "spark-mllib" % sparkVersion , "com.datastax.spark" %% "spark-cassandra-connector" % "3.1.0", // this includes cassandra-driver "org.apache.spark" %% "spark-hive" % sparkVersion, "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion //2021-11-11 updating streaming from .8 to .10 -- replaced cake solutions with akka recent version ).map(_.exclude("org.slf4j","slf4j-api")) .map(_.exclude("org.slf4j" , "jul-to-slf4j")) .map(_.exclude("org.slf4j","jcl-over-slf4j")) .map(_.exclude("org.slf4j","slf4j-log4j12")) //.map(_.exclude("log4j","log4j")) .map(_.exclude("org.apache.kafka", "kafka-clients")) libraryDependencies ++= sparkDependencies .map(excludeJackson) ++ jacksonDeps assembly / assemblyShadeRules := Seq( ShadeRule.rename("com.google.**" -> "shade.com.google.@1" ).inAll ) On Sat, Jan 22, 2022 at 7:55 AM Alex Ott wrote: > Show how do you execute your code - either you didn't pack it as uberjar, > or didn't provide all necessary dependencies, if you're using `--jars` > option. You may try `-assembly` variant when submitting your application > > Amit Sharma at "Fri, 21 Jan 2022 11:17:38 -0500" wrote: > AS> Hello, I tried using a cassandra unshaded connector or normal > connector both are giving the same error at runtime while > AS> connecting to cassandra. > > AS> "com.datastax.spark" %% "spark-cassandra-connector-unshaded" % "2.4.2" > > AS> Or > > AS> "com.datastax.spark" %% "spark-cassandra-connector" % "3.1.0" > > AS> Russ similar issue is reported here also but no solution > > AS> > https://community.datastax.com/questions/3519/issue-with-spring-boot-starter-data-cassandra-and.html > > AS> Caused by: java.lang.ClassNotFoundException: > com.codahale.metrics.JmxReporter > AS> at > java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) > AS> at > java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) > AS> at > java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521) > > AS> On Thu, Jan 20, 2022 at 5:17 PM Amit Sharma > wrote: > > AS> Hello, I am trying to upgrade my project from spark 2.3.3 to > spark 3.2.0. While running the application locally I am getting > AS> below error. > AS> > AS> Could you please let me know which version of the cassandra > connector I should use. I am using below shaded connector but i > AS> think that causing the issue > > AS> "com.datastax.spark" %% "spark-cassandra-connector-unshaded" % > "2.4.2" > > AS> Caused by: java.lang.ClassNotFoundException: > com.codahale.metrics.JmxReporter > AS> at > java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) > AS> at > java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) > AS> at > java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521) > > AS> Thanks > AS> > AS> Amit > > > -- > With best wishes,Alex Ott > http://alexott.net/ > Twitter: alexott_en (English), alexott (Russian) >
Re: Spark 3.2.0 upgrade
Hello, I tried using a cassandra unshaded connector or normal connector both are giving the same error at runtime while connecting to cassandra. "com.datastax.spark" %% "spark-cassandra-connector-unshaded" % "2.4.2" Or "com.datastax.spark" %% "spark-cassandra-connector" % "3.1.0" Russ similar issue is reported here also but no solution https://community.datastax.com/questions/3519/issue-with-spring-boot-starter-data-cassandra-and.html Caused by: java.lang.ClassNotFoundException: com.codahale.metrics.JmxReporter at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521) On Thu, Jan 20, 2022 at 5:17 PM Amit Sharma wrote: > Hello, I am trying to upgrade my project from spark 2.3.3 to spark 3.2.0. > While running the application locally I am getting below error. > > Could you please let me know which version of the cassandra connector I > should use. I am using below shaded connector but i think that causing the > issue > > "com.datastax.spark" %% "spark-cassandra-connector-unshaded" % "2.4.2" > > > Caused by: java.lang.ClassNotFoundException: com.codahale.metrics.JmxReporter > at > java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) > at > java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) > at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521) > > > Thanks > > Amit > >
Spark 3.2.0 upgrade
Hello, I am trying to upgrade my project from spark 2.3.3 to spark 3.2.0. While running the application locally I am getting below error. Could you please let me know which version of the cassandra connector I should use. I am using below shaded connector but i think that causing the issue "com.datastax.spark" %% "spark-cassandra-connector-unshaded" % "2.4.2" Caused by: java.lang.ClassNotFoundException: com.codahale.metrics.JmxReporter at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521) Thanks Amit
Log4j2 upgrade
Hello, everyone. I am replacing log4j with log4j2 in my spark streaming application. When i deployed my application to spark cluster it is giving me the below error . " ERROR StatusLogger Log4j2 could not find a logging implementation. Please add log4j-core to the classpath. Using SimpleLogger to log to the console " I am including the core jar in my fat jar and core jar also included in the jar. Although the application is running fine, I am doubtful the logs are generated using log4j not log4j2 . I am using sbt assembly jar and also noticed below messages in the build Fully-qualified classname does not match jar entry: jar entry: META-INF/versions/9/module-info.class class name: module-info.class Omitting META-INF/versions/9/module-info.class. Fully-qualified classname does not match jar entry: jar entry: META-INF/versions/9/org/apache/logging/log4j/util/Base64Util.class class name: org/apache/logging/log4j/util/Base64Util.class Omitting META-INF/versions/9/org/apache/logging/log4j/util/Base64Util.class. Fully-qualified classname does not match jar entry: jar entry: META-INF/versions/9/org/apache/logging/log4j/util/internal/DefaultObjectInputFilter.class any idea how to resolve these. Thanks Amit
Kafka Sink Issue
$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org $apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) Can anyone help here and let me know about why it happened and what is resolution for this. -- Thanks & Regards, Amit Sharma
Spark Null Pointer Exception
Hi , I am using spark 2.7 version with scala. I am calling a method as below 1. val rddBacklog = spark.sparkContext.parallelize(MAs) // MA is list of say city 2. rddBacklog.foreach(ma => doAlloc3Daily(ma, fteReview.forecastId, startYear, endYear)) 3.doAlloc3Daily method just doing a database call and doing some scala calculation (no rdd or dataframe) Line number 2 I am getting below nullpointer intermittently on cluster but never on local. java.lang.NullPointerException at sparkStreaming.CalculateFteReview.doAlloc3Daily(CalculateFteReview.scala:1307) at sparkStreaming.CalculateFteReview$$anonfun$getNewDistribution$2.apply(CalculateFteReview.scala:1199) at sparkStreaming.CalculateFteReview$$anonfun$getNewDistribution$2.apply(CalculateFteReview.scala:1199) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:927) at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:927) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Thanks Amit
unit testing for spark code
Hi, can we write unit tests for spark code. Is there any specific framework? Thanks Amit
Re: Understanding Executors UI
I believe it’s a spark Ui issue which do not display correct value. I believe it is resolved for spark 3.0. Thanks Amit On Fri, Jan 8, 2021 at 4:00 PM Luca Canali wrote: > You report 'Storage Memory': 3.3TB/ 598.5 GB -> The first number is the > memory used for storage, the second one is the available memory (for > storage) in the unified memory pool. > > The used memory shown in your webui snippet is indeed quite high (higher > than the available memory!? ), you can probably profit by drilling down on > that to understand better what is happening. > > For example look at the details per executor (the numbers you reported are > aggregated values), then also look at the “storage tab” for a list of > cached RDDs with details. > > In case, Spark 3.0 has improved memory instrumentation and improved > instrumentation for streaming, so you can you profit from testing there too. > > > > > > *From:* Eric Beabes > *Sent:* Friday, January 8, 2021 04:23 > *To:* Luca Canali > *Cc:* spark-user > *Subject:* Re: Understanding Executors UI > > > > So when I see this for 'Storage Memory': *3.3TB/ 598.5 GB* *- it's > telling me that Spark is using 3.3 TB of memory & 598.5 GB is used for > caching data, correct?* What I am surprised about is that these numbers > don't change at all throughout the day even though the load on the system > is low after 5pm PST. > > > > I would expect the "Memory used" to be lower than 3.3Tb after 5pm PST. > > > > Does Spark 3.0 do a better job of memory management? Wondering if > upgrading to Spark 3.0 would improve performance? > > > > > > On Wed, Jan 6, 2021 at 2:29 PM Luca Canali wrote: > > Hi Eric, > > > > A few links, in case they can be useful for your troubleshooting: > > > > The Spark Web UI is documented in Spark 3.x documentation, although you > can use most of it for Spark 2.4 too: > https://spark.apache.org/docs/latest/web-ui.html > > > > Spark memory management is documented at > https://spark.apache.org/docs/latest/tuning.html#memory-management-overview > > > Additional resource: see also this diagram > https://canali.web.cern.ch/docs/SparkExecutorMemory.png and > https://db-blog.web.cern.ch/blog/luca-canali/2020-08-spark3-memory-monitoring > > > > Best, > > Luca > > > > *From:* Eric Beabes > *Sent:* Wednesday, January 6, 2021 00:20 > *To:* spark-user > *Subject:* Understanding Executors UI > > > > [image: image.png] > > > > > > Not sure if this image will go through. (Never sent an email to this > mailing list with an image). > > > > I am trying to understand this 'Executors' UI in Spark 2.4. I have a > Stateful Structured Streaming job with 'State timeout' set to 10 minutes. > When the load on the system is low a message gets written to Kafka > immediately after the State times out BUT under heavy load it takes over 40 > minutes to get a message on the output topic. Trying to debug this issue & > see if performance can be improved. > > > > Questions: > > > > 1) I am requesting 3.2 TB of memory but it seems the job keeps using only > 598.5 GB as per the values in 'Storage Memory' as well as 'On Heap Storage > Memory'. Wondering if this is a Cluster issue OR am I not setting values > correctly? > > 2) Where can I find documentation to understand different 'Tabs' in the > Spark UI? (Sorry, Googling didn't help. I will keep searching.) > > > > Any pointers would be appreciated. Thanks. > > > >
Re: Spark UI Storage Memory
any suggestion please. Thanks Amit On Fri, Dec 4, 2020 at 2:27 PM Amit Sharma wrote: > Is there any memory leak in spark 2.3.3 version as mentioned in below > Jira. > https://issues.apache.org/jira/browse/SPARK-29055. > > Please let me know how to solve it. > > Thanks > Amit > > On Fri, Dec 4, 2020 at 1:55 PM Amit Sharma wrote: > >> Can someone help me on this please. >> >> >> Thanks >> Amit >> >> On Wed, Dec 2, 2020 at 11:52 AM Amit Sharma wrote: >> >>> Hi , I have a spark streaming job. When I am checking the Excetors tab , >>> there is a Storage Memory column. It displays used memory /total memory. >>> What is used memory. Is it memory in use or memory used so far. How would >>> I know how much memory is unused at 1 point of time. >>> >>> >>> Thanks >>> Amit >>> >>
Re: Caching
Jayesh, but during logical plan spark would be knowing to use the same DF twice so it will optimize the query. Thanks Amit On Mon, Dec 7, 2020 at 1:16 PM Lalwani, Jayesh wrote: > Since DF2 is dependent on DF1, and DF3 is dependent on both DF1 and DF2, > without caching, Spark will read the CSV twice: Once to load it for DF1, > and once to load it for DF2. When you add a cache on DF1 or DF2, it reads > from CSV only once. > > > > You might want to look at doing a windowed query on DF1 to avoid joining > DF1 with DF2. This should give you better or similar performance when > compared to cache because Spark will optimize for cache the data during > shuffle. > > > > *From: *Amit Sharma > *Reply-To: *"resolve...@gmail.com" > *Date: *Monday, December 7, 2020 at 12:47 PM > *To: *Theodoros Gkountouvas , " > user@spark.apache.org" > *Subject: *RE: [EXTERNAL] Caching > > > > *CAUTION*: This email originated from outside of the organization. Do not > click links or open attachments unless you can confirm the sender and know > the content is safe. > > > > Thanks for the information. I am using spark 2.3.3 There are few more > questions > > > > 1. Yes I am using DF1 two times but at the end action is one on DF3. In > that case action of DF1 should be just 1 or it depends how many times this > dataframe is used in transformation. > > > > I believe even if we use a dataframe multiple times for transformation , > use caching should be based on actions. In my case action is one save call > on DF3. Please correct me if i am wrong. > > > > Thanks > > Amit > > > > On Mon, Dec 7, 2020 at 11:54 AM Theodoros Gkountouvas < > theo.gkountou...@futurewei.com> wrote: > > Hi Amit, > > > > One action might use the same DataFrame more than once. You can look at > your LogicalPlan by executing DF3.explain (arguments different depending > the version of Spark you are using) and see how many times you need to > compute DF2 or DF1. Given the information you have provided I suspect that > DF1 is used more than once (one time at DF2 and another one at DF3). So, > Spark is going to cache it the first time and it will load it from cache > instead of running it again the second time. > > > > I hope this helped, > > Theo. > > > > *From:* Amit Sharma > *Sent:* Monday, December 7, 2020 11:32 AM > *To:* user@spark.apache.org > *Subject:* Caching > > > > Hi All, I am using caching in my code. I have a DF like > > val DF1 = read csv. > > val DF2 = DF1.groupBy().agg().select(.) > > > > Val DF3 = read csv .join(DF1).join(DF2) > > DF3 .save. > > > > If I do not cache DF2 or Df1 it is taking longer time . But i am doing 1 > action only why do I need to cache. > > > > Thanks > > Amit > > > > > >
Re: Caching
Sean, you mean if df is used more than once in transformation then use cache. But be frankly that is also not true because at many places even if df is used once with caching and without cache also it gives same result. How to decide should we use cache or not Thanks Amit On Mon, Dec 7, 2020 at 1:01 PM Sean Owen wrote: > No, it's not true that one action means every DF is evaluated once. This > is a good counterexample. > > On Mon, Dec 7, 2020 at 11:47 AM Amit Sharma wrote: > >> Thanks for the information. I am using spark 2.3.3 There are few more >> questions >> >> 1. Yes I am using DF1 two times but at the end action is one on DF3. In >> that case action of DF1 should be just 1 or it depends how many times this >> dataframe is used in transformation. >> >> I believe even if we use a dataframe multiple times for transformation , >> use caching should be based on actions. In my case action is one save call >> on DF3. Please correct me if i am wrong. >> >> Thanks >> Amit >> >> On Mon, Dec 7, 2020 at 11:54 AM Theodoros Gkountouvas < >> theo.gkountou...@futurewei.com> wrote: >> >>> Hi Amit, >>> >>> >>> >>> One action might use the same DataFrame more than once. You can look at >>> your LogicalPlan by executing DF3.explain (arguments different depending >>> the version of Spark you are using) and see how many times you need to >>> compute DF2 or DF1. Given the information you have provided I suspect that >>> DF1 is used more than once (one time at DF2 and another one at DF3). So, >>> Spark is going to cache it the first time and it will load it from cache >>> instead of running it again the second time. >>> >>> >>> >>> I hope this helped, >>> >>> Theo. >>> >>> >>> >>> *From:* Amit Sharma >>> *Sent:* Monday, December 7, 2020 11:32 AM >>> *To:* user@spark.apache.org >>> *Subject:* Caching >>> >>> >>> >>> Hi All, I am using caching in my code. I have a DF like >>> >>> val DF1 = read csv. >>> >>> val DF2 = DF1.groupBy().agg().select(.) >>> >>> >>> >>> Val DF3 = read csv .join(DF1).join(DF2) >>> >>> DF3 .save. >>> >>> >>> >>> If I do not cache DF2 or Df1 it is taking longer time . But i am doing >>> 1 action only why do I need to cache. >>> >>> >>> >>> Thanks >>> >>> Amit >>> >>> >>> >>> >>> >>
Re: Caching
Thanks for the information. I am using spark 2.3.3 There are few more questions 1. Yes I am using DF1 two times but at the end action is one on DF3. In that case action of DF1 should be just 1 or it depends how many times this dataframe is used in transformation. I believe even if we use a dataframe multiple times for transformation , use caching should be based on actions. In my case action is one save call on DF3. Please correct me if i am wrong. Thanks Amit On Mon, Dec 7, 2020 at 11:54 AM Theodoros Gkountouvas < theo.gkountou...@futurewei.com> wrote: > Hi Amit, > > > > One action might use the same DataFrame more than once. You can look at > your LogicalPlan by executing DF3.explain (arguments different depending > the version of Spark you are using) and see how many times you need to > compute DF2 or DF1. Given the information you have provided I suspect that > DF1 is used more than once (one time at DF2 and another one at DF3). So, > Spark is going to cache it the first time and it will load it from cache > instead of running it again the second time. > > > > I hope this helped, > > Theo. > > > > *From:* Amit Sharma > *Sent:* Monday, December 7, 2020 11:32 AM > *To:* user@spark.apache.org > *Subject:* Caching > > > > Hi All, I am using caching in my code. I have a DF like > > val DF1 = read csv. > > val DF2 = DF1.groupBy().agg().select(.) > > > > Val DF3 = read csv .join(DF1).join(DF2) > > DF3 .save. > > > > If I do not cache DF2 or Df1 it is taking longer time . But i am doing 1 > action only why do I need to cache. > > > > Thanks > > Amit > > > > >
Caching
Hi All, I am using caching in my code. I have a DF like val DF1 = read csv. val DF2 = DF1.groupBy().agg().select(.) Val DF3 = read csv .join(DF1).join(DF2) DF3 .save. If I do not cache DF2 or Df1 it is taking longer time . But i am doing 1 action only why do I need to cache. Thanks Amit
Re: Spark UI Storage Memory
Is there any memory leak in spark 2.3.3 version as mentioned in below Jira. https://issues.apache.org/jira/browse/SPARK-29055. Please let me know how to solve it. Thanks Amit On Fri, Dec 4, 2020 at 1:55 PM Amit Sharma wrote: > Can someone help me on this please. > > > Thanks > Amit > > On Wed, Dec 2, 2020 at 11:52 AM Amit Sharma wrote: > >> Hi , I have a spark streaming job. When I am checking the Excetors tab , >> there is a Storage Memory column. It displays used memory /total memory. >> What is used memory. Is it memory in use or memory used so far. How would >> I know how much memory is unused at 1 point of time. >> >> >> Thanks >> Amit >> >
Re: Spark UI Storage Memory
Can someone help me on this please. Thanks Amit On Wed, Dec 2, 2020 at 11:52 AM Amit Sharma wrote: > Hi , I have a spark streaming job. When I am checking the Excetors tab , > there is a Storage Memory column. It displays used memory /total memory. > What is used memory. Is it memory in use or memory used so far. How would > I know how much memory is unused at 1 point of time. > > > Thanks > Amit >
Spark UI Storage Memory
Hi , I have a spark streaming job. When I am checking the Excetors tab , there is a Storage Memory column. It displays used memory /total memory. What is used memory. Is it memory in use or memory used so far. How would I know how much memory is unused at 1 point of time. Thanks Amit
Re: Cache not getting cleaned.
please find attached the screenshot of no active task but memory i still used . [image: image.png] On Sat, Nov 21, 2020 at 4:25 PM Amit Sharma wrote: > I am using df.cache and also unpersisting it. But when I check spark Ui > storage I still see cache memory usage. Do I need to do any thing else. > > Also in executor tab on spark Ui for each executor memory used/total > memory always display some used memory not sure if no request on streaming > job then usages should be 0. > > Thanks > Amit >
Cache not getting cleaned.
I am using df.cache and also unpersisting it. But when I check spark Ui storage I still see cache memory usage. Do I need to do any thing else. Also in executor tab on spark Ui for each executor memory used/total memory always display some used memory not sure if no request on streaming job then usages should be 0. Thanks Amit
Re: Spark Exception
Russell i increased the rpc timeout to 240 seconds but i am still getting this issue once a while and after this issue my spark streaming job stuck and do not process any request then i need to restart this every time. Any suggestion please. Thanks Amit On Wed, Nov 18, 2020 at 12:05 PM Amit Sharma wrote: > Hi, we are running a spark streaming job and sometimes it throws below > two exceptions . I am not understanding what is the difference between > these two exception for one timeout is 120 seconds and another is 600 > seconds. What could be the reason for these > > > Error running job streaming job 1605709968000 ms.0 > org.apache.spark.SparkException: Job aborted due to stage failure: Task > serialization failed: org.apache.spark.rpc.RpcTimeoutException: Futures > timed out after [120 seconds]. This timeout is controlled by > spark.rpc.askTimeout > org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 > seconds]. This timeout is controlled by spark.rpc.askTimeout > at org.apache.spark.rpc.RpcTimeout.org > $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47) > at > org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62) > at > org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) > at > org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92) > at > org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76) > at > org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:76) > at org.apache.spark.storage.BlockManager.org > $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:466) > at org.apache.spark.storage.BlockManager.org > $apache$spark$storage$BlockManager$$reportBlockStatus(BlockManager.scala:445) > at > org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1519) > at > org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1047) > > > > > > 2020-11-18 14:44:03 ERROR Utils:91 - Uncaught exception in thread > heartbeat-receiver-event-loop-thread > org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [600 > seconds]. This timeout is controlled by BlockManagerHeartbeat > at org.apache.spark.rpc.RpcTimeout.org > $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47) > at > org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62) > at > org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) > at > org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92) > at > org.apache.spark.scheduler.DAGScheduler.executorHeartbeatReceived(DAGScheduler.scala:251) > at > org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:455) > at > org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2$$anonfun$run$2.apply$mcV$sp(HeartbeatReceiver.scala:129) > at > org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1361) > at > org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2.run(HeartbeatReceiver.scala:128) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) >
Re: Out of memory issue
please help. Thanks Amit On Mon, Nov 9, 2020 at 4:18 PM Amit Sharma wrote: > Please find below the exact exception > > Exception in thread "streaming-job-executor-3" java.lang.OutOfMemoryError: > Java heap space > at java.util.Arrays.copyOf(Arrays.java:3332) > at > java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124) > at > java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448) > at java.lang.StringBuilder.append(StringBuilder.java:136) > at > scala.StringContext.standardInterpolator(StringContext.scala:126) > at scala.StringContext.s(StringContext.scala:95) > at sparkStreaming.TRReview.getTRReviews(TRReview.scala:307) > at > sparkStreaming.KafkaListener$$anonfun$1$$anonfun$apply$1$$anonfun$3.apply(KafkaListener.scala:154) > at > sparkStreaming.KafkaListener$$anonfun$1$$anonfun$apply$1$$anonfun$3.apply(KafkaListener.scala:138) > at scala.util.Success$$anonfun$map$1.apply(Try.scala:237) > at scala.util.Try$.apply(Try.scala:192) > at scala.util.Success.map(Try.scala:237) > > On Sun, Nov 8, 2020 at 1:35 PM Amit Sharma wrote: > >> Hi , I am using 16 nodes spark cluster with below config >> 1. Executor memory 8 GB >> 2. 5 cores per executor >> 3. Driver memory 12 GB. >> >> >> We have streaming job. We do not see problem but sometimes we get >> exception executor-1 heap memory issue. I am not understanding if data size >> is same and this job receive a request and process it but suddenly it’s >> start giving out of memory error . It will throw exception for 1 executor >> then throw for other executor also and it stop processing the request. >> >> Thanks >> Amit >> >
Re: Spark Exception
Please help. Thanks Amit On Wed, Nov 18, 2020 at 12:05 PM Amit Sharma wrote: > Hi, we are running a spark streaming job and sometimes it throws below > two exceptions . I am not understanding what is the difference between > these two exception for one timeout is 120 seconds and another is 600 > seconds. What could be the reason for these > > > Error running job streaming job 1605709968000 ms.0 > org.apache.spark.SparkException: Job aborted due to stage failure: Task > serialization failed: org.apache.spark.rpc.RpcTimeoutException: Futures > timed out after [120 seconds]. This timeout is controlled by > spark.rpc.askTimeout > org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 > seconds]. This timeout is controlled by spark.rpc.askTimeout > at org.apache.spark.rpc.RpcTimeout.org > $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47) > at > org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62) > at > org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) > at > org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92) > at > org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76) > at > org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:76) > at org.apache.spark.storage.BlockManager.org > $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:466) > at org.apache.spark.storage.BlockManager.org > $apache$spark$storage$BlockManager$$reportBlockStatus(BlockManager.scala:445) > at > org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1519) > at > org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1047) > > > > > > 2020-11-18 14:44:03 ERROR Utils:91 - Uncaught exception in thread > heartbeat-receiver-event-loop-thread > org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [600 > seconds]. This timeout is controlled by BlockManagerHeartbeat > at org.apache.spark.rpc.RpcTimeout.org > $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47) > at > org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62) > at > org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) > at > org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92) > at > org.apache.spark.scheduler.DAGScheduler.executorHeartbeatReceived(DAGScheduler.scala:251) > at > org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:455) > at > org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2$$anonfun$run$2.apply$mcV$sp(HeartbeatReceiver.scala:129) > at > org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1361) > at > org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2.run(HeartbeatReceiver.scala:128) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) >
Spark Exception
Hi, we are running a spark streaming job and sometimes it throws below two exceptions . I am not understanding what is the difference between these two exception for one timeout is 120 seconds and another is 600 seconds. What could be the reason for these Error running job streaming job 1605709968000 ms.0 org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout at org.apache.spark.rpc.RpcTimeout.org $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92) at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76) at org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:76) at org.apache.spark.storage.BlockManager.org $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:466) at org.apache.spark.storage.BlockManager.org $apache$spark$storage$BlockManager$$reportBlockStatus(BlockManager.scala:445) at org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1519) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1047) 2020-11-18 14:44:03 ERROR Utils:91 - Uncaught exception in thread heartbeat-receiver-event-loop-thread org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [600 seconds]. This timeout is controlled by BlockManagerHeartbeat at org.apache.spark.rpc.RpcTimeout.org $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92) at org.apache.spark.scheduler.DAGScheduler.executorHeartbeatReceived(DAGScheduler.scala:251) at org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:455) at org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2$$anonfun$run$2.apply$mcV$sp(HeartbeatReceiver.scala:129) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1361) at org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2.run(HeartbeatReceiver.scala:128) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
spark UI storage tab
Hi , I have few questions as below 1. In the spark ui storage tab is displayed 'storage level',' size in memory' and size on disk, i am not sure it displays RDD ID 16 with memory usage 76 MB not sure why it is not getting 0 once a request for spark streaming is completed. I am caching some RDD inside a method and uncaching it. 2. Similarly on Executor tab it display 'Storage Memory' used and available, is that used means currently in use or memory used on that executor at some point of time (maximum memory used so far) Thanks Amit
Re: Out of memory issue
Please find below the exact exception Exception in thread "streaming-job-executor-3" java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3332) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448) at java.lang.StringBuilder.append(StringBuilder.java:136) at scala.StringContext.standardInterpolator(StringContext.scala:126) at scala.StringContext.s(StringContext.scala:95) at sparkStreaming.TRReview.getTRReviews(TRReview.scala:307) at sparkStreaming.KafkaListener$$anonfun$1$$anonfun$apply$1$$anonfun$3.apply(KafkaListener.scala:154) at sparkStreaming.KafkaListener$$anonfun$1$$anonfun$apply$1$$anonfun$3.apply(KafkaListener.scala:138) at scala.util.Success$$anonfun$map$1.apply(Try.scala:237) at scala.util.Try$.apply(Try.scala:192) at scala.util.Success.map(Try.scala:237) On Sun, Nov 8, 2020 at 1:35 PM Amit Sharma wrote: > Hi , I am using 16 nodes spark cluster with below config > 1. Executor memory 8 GB > 2. 5 cores per executor > 3. Driver memory 12 GB. > > > We have streaming job. We do not see problem but sometimes we get > exception executor-1 heap memory issue. I am not understanding if data size > is same and this job receive a request and process it but suddenly it’s > start giving out of memory error . It will throw exception for 1 executor > then throw for other executor also and it stop processing the request. > > Thanks > Amit >
Re: Out of memory issue
Can you please help. Thanks Amit On Sun, Nov 8, 2020 at 1:35 PM Amit Sharma wrote: > Hi , I am using 16 nodes spark cluster with below config > 1. Executor memory 8 GB > 2. 5 cores per executor > 3. Driver memory 12 GB. > > > We have streaming job. We do not see problem but sometimes we get > exception executor-1 heap memory issue. I am not understanding if data size > is same and this job receive a request and process it but suddenly it’s > start giving out of memory error . It will throw exception for 1 executor > then throw for other executor also and it stop processing the request. > > Thanks > Amit >
Out of memory issue
Hi , I am using 16 nodes spark cluster with below config 1. Executor memory 8 GB 2. 5 cores per executor 3. Driver memory 12 GB. We have streaming job. We do not see problem but sometimes we get exception executor-1 heap memory issue. I am not understanding if data size is same and this job receive a request and process it but suddenly it’s start giving out of memory error . It will throw exception for 1 executor then throw for other executor also and it stop processing the request. Thanks Amit
Spark reading from cassandra
Hi, i have a question while we are reading from cassandra should we use partition key only in where clause from performance perspective or it does not matter from spark perspective because it always allows filtering. Thanks Amit
Driver Information
Hi, I have 20 node clusters. I run multiple batch jobs. in spark submit file ,driver memory=2g and executor memory=4g and I have 8 GB worker. I have below questions 1. Is there any way I know in each batch job which worker is the driver node? 2. Will the driver node be part of one of the executors or it is independent ? Thanks Amit
Re: help on use case - spark parquet processing
Can you keep option field in your case class. Thanks Amit On Thu, Aug 13, 2020 at 12:47 PM manjay kumar wrote: > Hi , > > I have a use case, > > where i need to merge three data set and build one where ever data is > available. > > And my dataset is a complex object. > > Customer > - name - string > - accounts - List > > Account > - type - String > - Adressess - List > > Address > -name - String > > > > --- > > > And it goes on. > > These file are in parquet , > > > All 3 input datasets are having some details , which need to merge. > > And build one dataset , which has all the information ( i know the files > which need to merge ) > > > I want to know , how should I proceed on this ?? > > - my approach is to build case class of actual output and parse the three > dataset. > ( but this is failing because the input response have not all the fields). > > So basically , what should be the approach to deal this kind of problem ? > > 2nd , how can i convert parquet dataframe to dataset, considering the > pauquet struct does not have all the fields. but case class has all the > field ( i am getting error no struct type found) > > Thanks > Manjay Kumar > 8320 120 839 > > >
Re: Spark batch job chaining
Any help is appreciated. I have spark batch job based on condition I would like to start another batch job by invoking .sh file. Just want to know can we achieve that? Thanks Amit On Fri, Aug 7, 2020 at 3:58 PM Amit Sharma wrote: > Hi, I want to write a batch job which would call another batch job based > on condition. Can I call one batch job through another in scala or I can do > it just by python script. Example would be really helpful. > > > Thanks > Amit > > >
Spark batch job chaining
Hi, I want to write a batch job which would call another batch job based on condition. Can I call one batch job through another in scala or I can do it just by python script. Example would be really helpful. Thanks Amit
how to copy from one cassandra cluster to another
Hi, I have table A in the cassandra cluster cluster -1 in one data center. I have table B in cluster -2 in another data center. I want to copy the data from one cluster to another using spark. I faced the problem that I can not create two spark sessions as we need spark sessions per cluster. Please let me know if there is any way to use spark batch job to copy data among two cassandra clusters. Thanks Amit
spark exception
Hi All, sometimes i get this error in spark logs. I notice few executors are shown as dead in the executor tab during this error. Although my job get success. Please help me out the root cause of this issue. I have 3 workers with 30 cores each and 64 GB RAM each. My job uses 3 cores per executor and uses a total of 63 cores and 4GB RAM per executor. Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages
Re: Garbage collection issue
Please help on this. Thanks Amit On Fri, Jul 17, 2020 at 2:34 PM Amit Sharma wrote: > Hi All, i am running the same batch job in my two separate spark clusters. > In one of the clusters it is showing GC warning on spark -ui under > executer tag. Garbage collection is taking longer time around 20 % while > in another cluster it is under 10 %. I am using the same configuration in > my spark submit and using G1GC . > > Please let me know what could be the reason for GC slowness. > > > Thanks > Amit >
Re: Future timeout
Please help on this. Thanks Amit On Fri, Jul 17, 2020 at 9:10 AM Amit Sharma wrote: > Hi, sometimes my spark streaming job throw this exception Futures timed > out after [300 seconds]. > I am not sure where is the default timeout configuration. Can i increase > it. Please help. > > > > Thanks > Amit > > > > Caused by: java.util.concurrent.TimeoutException: Futures timed out after > [300 seconds] > at > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) > at > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > at > org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201) > at > org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136) > at > org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:372) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at > org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140) > at > org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:116) > at > org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:257) > at > org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:101) > at > org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:186) > at > org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:35) > at > org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:65) > at > org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:186) > at > org.apache.spark.sql.execution.SerializeFromObjectExec.consume(objects.scala:101) > at > org.apache.spark.sql.execution.SerializeFromObjectExec.doConsume(objects.scala:121) > at > org.apache.spark.sql.execution.CodegenSupport$class.constructDoConsumeFunction(WholeStageCodegenExec.scala:213) > at > org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:184) > at > org.apache.spark.sql.execution.MapElementsExec.consume(objects.scala:200) > at > org.apache.spark.sql.execution.MapElementsExec.doConsume(objects.scala:224) > at > org.apache.spark.sql.execution.CodegenSupport$class.constructDoConsumeFunction(WholeStageCodegenExec.scala:213) > at > org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:184) > at > org.apache.spark.sql.execution.DeserializeToObjectExec.consume(objects.scala:68) >
Garbage collection issue
Hi All, i am running the same batch job in my two separate spark clusters. In one of the clusters it is showing GC warning on spark -ui under executer tag. Garbage collection is taking longer time around 20 % while in another cluster it is under 10 %. I am using the same configuration in my spark submit and using G1GC . Please let me know what could be the reason for GC slowness. Thanks Amit
Future timeout
Hi, sometimes my spark streaming job throw this exception Futures timed out after [300 seconds]. I am not sure where is the default timeout configuration. Can i increase it. Please help. Thanks Amit Caused by: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201) at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136) at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:372) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:116) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:257) at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:101) at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:186) at org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:35) at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:65) at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:186) at org.apache.spark.sql.execution.SerializeFromObjectExec.consume(objects.scala:101) at org.apache.spark.sql.execution.SerializeFromObjectExec.doConsume(objects.scala:121) at org.apache.spark.sql.execution.CodegenSupport$class.constructDoConsumeFunction(WholeStageCodegenExec.scala:213) at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:184) at org.apache.spark.sql.execution.MapElementsExec.consume(objects.scala:200) at org.apache.spark.sql.execution.MapElementsExec.doConsume(objects.scala:224) at org.apache.spark.sql.execution.CodegenSupport$class.constructDoConsumeFunction(WholeStageCodegenExec.scala:213) at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:184) at org.apache.spark.sql.execution.DeserializeToObjectExec.consume(objects.scala:68)
Cassandra raw deletion
Hi, I have to delete certain raw from Cassandra during my spark batch process. Is there any way to delete Rawat using spark Cassandra connector. Thanks Amit
Truncate table
Hi, i have scenario where i have to read certain raw from a table and truncate the table and store the certain raws back to the table. I am doing below steps 1. reading certain raws in DF1 from cassandra table A. 2. saving into cassandra as override in table A the problem is when I truncate the table at step 2 I will lose the data in DF1 as it shows empty. I have two solutions 1. Store the DF1 in another temp table before truncating table A 2. Cache DF1 before truncating. Do we have any better solution ? Thanks Amit
No of cores per executor.
I have set 5 cores per executor. Is there any formula to determine best combination of executor and cores and memory per core for better performance. Also when I am running local spark instance in my web jar getting better speed than running in cluster. Thanks Amit
Re: spark streaming exception
Please update me if any one knows about it. Thanks Amit On Thu, Oct 10, 2019 at 3:49 PM Amit Sharma wrote: > Hi , we have spark streaming job to which we send a request through our UI > using kafka. It process and returned the response. We are getting below > error and this stareming is not processing any request. > > Listener StreamingJobProgressListener threw an exception > java.util.NoSuchElementException: key not found: 1570689515000 ms > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:59) > at scala.collection.mutable.HashMap.apply(HashMap.scala:65) > at > org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:134) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67) > at > org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29). > > Please help me in find out the root cause of this issue. >
spark streaming exception
Hi , we have spark streaming job to which we send a request through our UI using kafka. It process and returned the response. We are getting below error and this stareming is not processing any request. Listener StreamingJobProgressListener threw an exception java.util.NoSuchElementException: key not found: 1570689515000 ms at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:59) at scala.collection.mutable.HashMap.apply(HashMap.scala:65) at org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:134) at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67) at org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29). Please help me in find out the root cause of this issue.
Re: Driver vs master
Thanks Andrew but I am asking specific to driver memory not about executors memory. We have just one master and if each jobs driver.memory=4g and master nodes total memory is 16gb then we can not execute more than 4 jobs at a time. On Monday, October 7, 2019, Andrew Melo wrote: > Hi Amit > > On Mon, Oct 7, 2019 at 18:33 Amit Sharma wrote: > >> Can you please help me understand this. I believe driver programs runs on >> master node > > If we are running 4 spark job and driver memory config is 4g then total 16 >> 6b would be used of master node. > > > This depends on what master/deploy mode you're using: if it's "local" > master and "client mode" then yes tasks execute in the same JVM as the > driver. In this case though, the driver JVM uses whatever much space is > allocated for the driver regardless of how many threads you have. > > > So if we will run more jobs then we need more memory on master. Please >> correct me if I am wrong. >> > > This depends on your application, but in general more threads will require > more memory. > > > >> >> Thanks >> Amit >> > -- > It's dark in this basement. >
Driver vs master
Can you please help me understand this. I believe driver programs runs on master node. If we are running 4 spark job and driver memory config is 4g then total 16 6b would be used of master node. So if we will run more jobs then we need more memory on master. Please correct me if I am wrong. Thanks Amit
Re: Memory Limits error
Increasing your driver memory as 12g. On Thursday, August 15, 2019, Dennis Suhari wrote: > Hi community, > > I am using Spark on Yarn. When submiting a job after a long time I get an > error mesage and retry. > > It happens when I want to store the dataframe to a table. > > spark_df.write.option("path", "/nlb_datalake/golden_zone/ > webhose/sentiment").saveAsTable("news_summary_test", mode="overwrite") > > The error is (after long time): > > Hive Session ID = be590d1b-ed5b-404b-bcb4-77cbb977a847 [Stage 2:> (0 + > 16) / 16]19/08/15 15:42:08 WARN BlockManagerMasterEndpoint: No more > replicas available for rdd_9_2 ! 19/08/15 15:42:08 WARN > BlockManagerMasterEndpoint: No more replicas available for rdd_9_1 ! > 19/08/15 15:42:08 WARN BlockManagerMasterEndpoint: No more replicas > available for rdd_9_4 ! 19/08/15 15:42:08 WARN BlockManagerMasterEndpoint: > No more replicas available for rdd_9_6 ! 19/08/15 15:42:08 WARN > BlockManagerMasterEndpoint: No more replicas available for rdd_9_7 ! > 19/08/15 15:42:08 WARN BlockManagerMasterEndpoint: No more replicas > available for rdd_9_0 ! 19/08/15 15:42:08 WARN BlockManagerMasterEndpoint: > No more replicas available for rdd_9_5 ! 19/08/15 15:42:08 WARN > BlockManagerMasterEndpoint: No more replicas available for rdd_9_3 ! > 19/08/15 15:42:08 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: > Requesting driver to remove executor 2 for reason Container killed by YARN > for exceeding memory limits. 9.1 GB of 9 GB physical memory used. Consider > boosting spark.yarn.executor.memoryOverhead. 19/08/15 15:42:08 ERROR > YarnScheduler: Lost executor 2 on nlb-srv-hd-08.i-lab.local: Container > killed by YARN for exceeding memory limits. 9.1 GB of 9 GB physical memory > used. Consider boosting spark.yarn.executor.memoryOverhead. 19/08/15 > 15:42:08 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 17, > nlb-srv-hd-08.i-lab.local, executor 2): ExecutorLostFailure (executor 2 > exited caused by one of the running tasks) Reason: Container killed by YARN > for exceeding memory limits. 9.1 GB of 9 GB physical memory used. Consider > boosting spark.yarn.executor.memoryOverhead. 19/08/15 15:42:08 WARN > TaskSetManager: Lost task 5.0 in stage 2.0 (TID 26, > nlb-srv-hd-08.i-lab.local, executor 2): ExecutorLostFailure (executor 2 > exite > > Do you have a rough idea where to tweak ? > > Br, > > Dennis >
Spark Streaming concurrent calls
I am using kafka spark streming. My UI application send request to streaming through kafka. Problem is streaming handles one request at a time so if multiple users send request at the same time they have to wait till earlier request are done. Is there any way it can handle multiple request. Thanks Amit
spark job getting hang
I am running spark job and if i run it sometimes it ran successfully but most of the time getting ERROR Dropping event from queue appStatus. This likely means one of the listeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler. (org.apache.spark.scheduler.AsyncEventQueue). Please suggest what how to debug this issue. Thanks Amit
Core allocation is scattered
I have cluster with 26 nodes having 16 cores on each. I am running a spark job with 20 cores but i did not understand why my application get 1-2 cores on couple of machines why not it just run on two nodes like node1=16 cores and node 2=4 cores . but cores are allocated like node1=2 node =1-node 14=1 like that. Is there any conf property i need to change. I know with dynamic allocation we can use below but without dynamic allocation is there any? --conf "spark.dynamicAllocation.maxExecutors=2" Thanks Amit
Re: spark dataset.cache is not thread safe
please update me if any one knows how to handle it. On Sun, Jul 21, 2019 at 7:18 PM Amit Sharma wrote: > Hi , I wrote a code in future block which read data from dataset and cache > it which is used later in the code. I faced a issue that data.cached() data > will be replaced by concurrent running thread . Is there any way we can > avoid this condition. > > val dailyData = callDetailsDS.collect.toList > val adjustedData = dailyData.map(callDataPerDay => Future{ > > > > val data = callDetailsDS.filter((callDetailsDS(DateColumn) geq (some > conditional date )) > data.cache() > > > > } > > >
spark dataset.cache is not thread safe
Hi , I wrote a code in future block which read data from dataset and cache it which is used later in the code. I faced a issue that data.cached() data will be replaced by concurrent running thread . Is there any way we can avoid this condition. val dailyData = callDetailsDS.collect.toList val adjustedData = dailyData.map(callDataPerDay => Future{ val data = callDetailsDS.filter((callDetailsDS(DateColumn) geq (some conditional date )) data.cache() }
Re: spark standalone mode problem about executor add and removed again and again!
Do you have dynamic resource allocation enabled? On Wednesday, July 17, 2019, zenglong chen wrote: > Hi,all, > My standalone mode has two slaves.When I submit my job,the > localhost slave is working well,but second slave do add and remove executor > action always!The log are below: >2019-07-17 10:51:38,889 INFO client.StandaloneAppClient$ClientEndpoint: > Executor updated: app-20190717105135-0008/2 is now EXITED (Command exited > with code 1) > 2019-07-17 10:51:38,890 INFO cluster.StandaloneSchedulerBackend: Executor > app-20190717105135-0008/2 removed: Command exited with code 1 > 2019-07-17 10:51:38,890 INFO storage.BlockManagerMasterEndpoint: Trying > to remove executor 2 from BlockManagerMaster. > 2019-07-17 10:51:38,890 INFO storage.BlockManagerMaster: Removal of > executor 2 requested > 2019-07-17 10:51:38,891 INFO > cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: > Asked to remove non-existent executor 2 > 2019-07-17 10:51:38,892 INFO client.StandaloneAppClient$ClientEndpoint: > Executor added: app-20190717105135-0008/3 on > worker-20190717093045-172.22.9.179-40573 > (172.22.9.179:40573) with 8 core(s) > 2019-07-17 10:51:38,892 INFO cluster.StandaloneSchedulerBackend: Granted > executor ID app-20190717105135-0008/3 on hostPort 172.22.9.179:40573 with > 8 core(s), 12.0 GB RAM > 2019-07-17 10:51:38,893 INFO client.StandaloneAppClient$ClientEndpoint: > Executor updated: app-20190717105135-0008/3 is now RUNNING > 2019-07-17 10:51:40,521 INFO client.StandaloneAppClient$ClientEndpoint: > Executor updated: app-20190717105135-0008/3 is now EXITED (Command exited > with code 1) > 2019-07-17 10:51:40,521 INFO cluster.StandaloneSchedulerBackend: Executor > app-20190717105135-0008/3 removed: Command exited with code 1 > 2019-07-17 10:51:40,521 INFO client.StandaloneAppClient$ClientEndpoint: > Executor added: app-20190717105135-0008/4 on > worker-20190717093045-172.22.9.179-40573 > (172.22.9.179:40573) with 8 core(s) > 2019-07-17 10:51:40,521 INFO storage.BlockManagerMaster: Removal of > executor 3 requested > 2019-07-17 10:51:40,521 INFO > cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: > Asked to remove non-existent executor 3 > 2019-07-17 10:51:40,521 INFO storage.BlockManagerMasterEndpoint: Trying > to remove executor 3 from BlockManagerMaster. > 2019-07-17 10:51:40,521 INFO cluster.StandaloneSchedulerBackend: Granted > executor ID app-20190717105135-0008/4 on hostPort 172.22.9.179:40573 with > 8 core(s), 12.0 GB RAM > 2019-07-17 10:51:40,523 INFO client.StandaloneAppClient$ClientEndpoint: > Executor updated: app-20190717105135-0008/4 is now RUNNING > > > And the slave output are below: >19/07/17 10:47:12 INFO ExecutorRunner: Launch command: > "/home/ubuntu/data/jdk/jre/bin/java" "-cp" "/home/ubuntu/spark-2.4.3/ > conf/:/home/ubuntu/spark-2.4.3/jars/*" "-Xmx12288M" > "-Dspark.driver.port=40335" > "org.apache.spark.executor.CoarseGrainedExecutorBackend" > "--driver-url" "spark://CoarseGrainedScheduler@iZk1a7vdbutmi6eluaskecZ:40335" > "--executor-id" "18" "--hostname" "172.22.9.179" "--cores" "8" "--app-id" > "app-20190717104645-0007" "--worker-url" "spark://Worker@172.22.9.179: > 40573" > 19/07/17 10:47:13 INFO Worker: Executor app-20190717104645-0007/18 > finished with state EXITED message Command exited with code 1 exitStatus 1 > 19/07/17 10:47:13 INFO ExternalShuffleBlockResolver: Clean up non-shuffle > files associated with the finished executor 18 > 19/07/17 10:47:13 INFO ExternalShuffleBlockResolver: Executor is not > registered (appId=app-20190717104645-0007, execId=18) > 19/07/17 10:47:13 INFO Worker: Asked to launch executor > app-20190717104645-0007/19 for ph_user_pre_level > 19/07/17 10:47:13 INFO SecurityManager: Changing view acls to: ubuntu > 19/07/17 10:47:13 INFO SecurityManager: Changing modify acls to: ubuntu > 19/07/17 10:47:13 INFO SecurityManager: Changing view acls groups to: > 19/07/17 10:47:13 INFO SecurityManager: Changing modify acls groups to: > 19/07/17 10:47:13 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: Set(ubuntu); > groups with view permissions: Set(); users with modify permissions: > Set(ubuntu); groups with modify permissions: Set() > 19/07/17 10:47:14 INFO ExecutorRunner: Launch command: > "/home/ubuntu/data/jdk/jre/bin/java" "-cp" "/home/ubuntu/spark-2.4.3/ > conf/:/home/ubuntu/spark-2.4.3/jars/*" "-Xmx12288M" > "-Dspark.driver.port=40335" > "org.apache.spark.executor.CoarseGrainedExecutorBackend" > "--driver-url" "spark://CoarseGrainedScheduler@iZk1a7vdbutmi6eluaskecZ:40335" > "--executor-id" "19" "--hostname" "172.22.9.179" "--cores" "8" "--app-id" > "app-20190717104645-0007" "--worker-url" "spark://Worker@172.22.9.179: > 40573" > > I guest that may be "Dspark.driver.port=40335" problem. > Any suggests will help me a lot! >
Dynamic allocation not working
Hi All, i have set the dynamic allocation propertt = true in my script file and also shuffle property in script as well as on all worker nodes spark-env file. I am using spark kafka streaming. I checked that as request comes no of cores allocation increase but even after request is completed no of cores are not getting released. Please help me to to know what property i missed other than those 2 properties .
Spark-cluster slowness
I have spark cluster on two data centers each. Cluster on spark cluster B is 6 times slower than cluster A. I ran the same job on both cluster and time difference is of 6 times. I used the same config and using spark 2.3.3. I checked that on spark UI it displays the slaves nodes but when i check under Executor tab i saw all the nodes there but do not see active tasks while task status is active. Please help me to find the root cause. Thanks Amit
Spark Kafka Streaming stopped
we are using spark kafka streaming. We have 6 nodes in kafka cluster if any of the node is getting down we are getting below exception and streaming stopped. ERROR DirectKafkaInputDStream:70 - ArrayBuffer(kafka.common.NotLeaderForPartitionException, kafka.common.NotLeaderForPartitionException, org.apache.spark.SparkException: Couldn't find leader offsets for Set([techops-prod2,4], [techops-prod2,0])) Please let me know do we missed any setting so that streaming should not stopped even if couple of Kafka nodes are down. Thanks Amit
Re: Spark kafka streaming job stopped
Please provide update if any one knows. On Monday, June 10, 2019, Amit Sharma wrote: > > We have spark kafka sreaming job running on standalone spark cluster. We > have below kafka architecture > > 1. Two cluster running on two data centers. > 2. There is LTM on top on each data center (load balance) > 3. There is GSLB on top of LTM. > > I observed when ever any of the node in kafka cluster is down our spark > stream job stopped. We are using GLSB url in our code to connect to Kafka > not the IP address. Please let me know is it expected behavior if not then > what config we need to change. > > Thanks > Amit >
Fwd: Spark kafka streaming job stopped
We have spark kafka sreaming job running on standalone spark cluster. We have below kafka architecture 1. Two cluster running on two data centers. 2. There is LTM on top on each data center (load balance) 3. There is GSLB on top of LTM. I observed when ever any of the node in kafka cluster is down our spark stream job stopped. We are using GLSB url in our code to connect to Kafka not the IP address. Please let me know is it expected behavior if not then what config we need to change. Thanks Amit