Re: What is the best way to organize a join within a foreach?

2023-04-27 Thread Amit Joshi
Hi Marco,

I am not sure if you will get access to data frame inside the for each, as
spark context used to be non serialized, if I remember correctly.

One thing you can do.
Use cogroup operation on both the dataset.
This will help you have (Key- iter(v1),itr(V2).
And then use for each partition for performing your task of converting to
json and more.

Thus performance wise, you can group batch per user records and also share
the same connection in each partition if needed.

Hope this will help.

Regards
Amit


On Wed, 26 Apr, 2023, 15:58 Marco Costantini, <
marco.costant...@rocketfncl.com> wrote:

> Thanks team,
> Email was just an example. The point was to illustrate that some actions
> could be chained using Spark's foreach. In reality, this is an S3 write and
> a Kafka message production, which I think is quite reasonable for spark to
> do.
>
> To answer Ayan's first question. Yes, all a users orders, prepared for
> each and every user.
>
> Other than the remarks that email transmission is unwise (which I've now
> reminded is irrelevant) I am not seeing an alternative to using Spark's
> foreach. Unless, your proposal is for the Spark job to target 1 user, and
> just run the job 1000's of times taking the user_id as input. That doesn't
> sound attractive.
>
> Also, while we say that foreach is not optimal, I cannot find any evidence
> of it; neither here nor online. If there are any docs about the inner
> workings of this functionality, please pass them to me. I continue to
> search for them. Even late last night!
>
> Thanks for your help team,
> Marco.
>
> On Wed, Apr 26, 2023 at 6:21 AM Mich Talebzadeh 
> wrote:
>
>> Indeed very valid points by Ayan. How email is going to handle 1000s of
>> records. As a solution architect I tend to replace. Users by customers and
>> for each order there must be products sort of many to many relationship. If
>> I was a customer I would also be interested in product details as
>> well.sending via email sounds like a Jurassic park solution 😗
>>
>> On Wed, 26 Apr 2023 at 10:24, ayan guha  wrote:
>>
>>> Adding to what Mitch said,
>>>
>>> 1. Are you trying to send statements of all orders to all users? Or the
>>> latest order only?
>>>
>>> 2. Sending email is not a good use of spark. instead, I suggest to use a
>>> notification service or function. Spark should write to a queue (kafka,
>>> sqs...pick your choice here).
>>>
>>> Best regards
>>> Ayan
>>>
>>> On Wed, 26 Apr 2023 at 7:01 pm, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
>>>> Well OK in a nutshell you want the result set for every user prepared
>>>> and email to that user right.
>>>>
>>>> This is a form of ETL where those result sets need to be posted
>>>> somewhere. Say you create a table based on the result set prepared for each
>>>> user. You may have many raw target tables at the end of the first ETL. How
>>>> does this differ from using forEach? Performance wise forEach may not be
>>>> optimal.
>>>>
>>>> Can you take the sample tables and try your method?
>>>>
>>>> HTH
>>>>
>>>> Mich Talebzadeh,
>>>> Lead Solutions Architect/Engineering Lead
>>>> Palantir Technologies Limited
>>>> London
>>>> United Kingdom
>>>>
>>>>
>>>>view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, 26 Apr 2023 at 04:10, Marco Costantini <
>>>> marco.costant...@rocketfncl.com> wrote:
>>>>
>>>>> Hi Mich,
>>>>> First, thank you for that. Great effort put into helping.
>>>>>
>>>>> Second, I don't think this tackles the technical challenge here. I
>>>>> understand the windowing as it serves those ranks you created, but I don't
>>>>> see

[Spark SQL]: Spark 3.2 generates different results to query when columns name have mixed casing vs when they have same casing

2023-02-08 Thread Amit Singh Rathore
Hi Team,

I am running a query in Spark 3.2.

val df1 =
sc.parallelize(List((1,2,3,4,5),(1,2,3,4,5))).toDF("id","col2","col3","col4",
"col5")
val op_cols_same_case = List("id","col2","col3","col4", "col5", "id")
val df2 = df1.select(op_cols_same_case .head, op_cols_same_case .tail: _*)
df2.select("id").show()

This query runs fine. But when I change the casing of the op_cols to have
mix of upper & lower case ("id" & "ID") it throws an ambiguous col ref
error.

val df1 =
sc.parallelize(List((1,2,3,4,5),(1,2,3,4,5))).toDF("id","col2","col3","col4",
"col5")
val op_cols_same_case = List("id","col2","col3","col4", "col5", "ID")
val df2 = df1.select(op_cols_same_case .head, op_cols_same_case .tail: _*)
df2.select("id").show()

My question is why is this different behavior when I have duplicate columns
with the same names ("id", "id") vs the same name in different cases ("id",
"ID")? Either both should fail or non should fail considering spark
caseSensitive is false by default in 3.2

Note I checked, this issue is there in spark 2.4 as well. It works for both
case (mixed & single casing) spark 2.3.


Thanks
Spark user


Re: Splittable or not?

2022-09-14 Thread Amit Joshi
Hi Sid,

Snappy itself is not splittable. But the format that contains the actual
data like parquet (which are basically divided into row groups) can be
compressed using snappy.
This works because blocks(pages of parquet format) inside the parquet can
be independently compressed using snappy.

Thanks
Amit

On Wed, Sep 14, 2022 at 8:14 PM Sid  wrote:

> Hello experts,
>
> I know that Gzip and snappy files are not splittable i.e data won't be
> distributed into multiple blocks rather it would try to load the data in a
> single partition/block
>
> So, my question is when I write the parquet data via spark it gets stored
> at the destination with something like *part*.snappy.parquet*
>
> So, when I read this data will it affect my performance?
>
> Please help me if there is any understanding gap.
>
> Thanks,
> Sid
>


Re: Spark streaming

2022-08-19 Thread Ajit Kumar Amit
https://github.com/allwefantasy/spark-binlog

Sent from my iPhone

> On 19 Aug 2022, at 5:45 PM, sandra sukumaran  
> wrote:
> 
> 
> Dear Sir,
> 
> 
> 
>  Is there any possible method to fetch MySQL database bin log, with the 
> help of spark streaming.
> Kafka streaming is not applicable in this case.
> 
> 
> 
> Thanks and regards
> Sandra


Re: Salting technique doubt

2022-07-31 Thread Amit Joshi
Hi Sid,

I am not sure I understood your question.
But the keys cannot be different post salting in both the tables, this is
what i have shown in the explanation.
You salt Table A and then explode Table B to create all possible values.

In your case, I do not understand, what Table B has x_8/9. It should be all
possible values which you used to create salt.

I hope you understand.

Thanks



On Sun, Jul 31, 2022 at 10:02 AM Sid  wrote:

> Hi Amit,
>
> Thanks for your reply. However, your answer doesn't seem different from
> what I have explained.
>
> My question is after salting if the keys are different like in my example
> then post join there would be no results assuming the join type as inner
> join because even though the keys are segregated in different partitions
> based on unique keys they are not matching because x_1/x_2 !=x_8/x_9
>
> How do you ensure that the results are matched?
>
> Best,
> Sid
>
> On Sun, Jul 31, 2022 at 1:34 AM Amit Joshi 
> wrote:
>
>> Hi Sid,
>>
>> Salting is normally a technique to add random characters to existing
>> values.
>> In big data we can use salting to deal with the skewness.
>> Salting in join cas be used as :
>> * Table A-*
>> Col1, join_col , where join_col values are {x1, x2, x3}
>> x1
>> x1
>> x1
>> x2
>> x2
>> x3
>>
>> *Table B-*
>> join_col, Col3 , where join_col  value are {x1, x2}
>> x1
>> x2
>>
>> *Problem: *Let say for table A, data is skewed on x1
>> Now salting goes like this.  *Salt value =2*
>> For
>> *table A, *create a new col with values by salting join col
>> *New_Join_Col*
>> x1_1
>> x1_2
>> x1_1
>> x2_1
>> x2_2
>> x3_1
>>
>> For *Table B,*
>> Change the join_col to all possible values of the sale.
>> join_col
>> x1_1
>> x1_2
>> x2_1
>> x2_2
>>
>> And then join it like
>> table1.join(table2, where tableA.new_join_col == tableB. join_col)
>>
>> Let me know if you have any questions.
>>
>> Regards
>> Amit Joshi
>>
>>
>> On Sat, Jul 30, 2022 at 7:16 PM Sid  wrote:
>>
>>> Hi Team,
>>>
>>> I was trying to understand the Salting technique for the column where
>>> there would be a huge load on a single partition because of the same keys.
>>>
>>> I referred to one youtube video with the below understanding:
>>>
>>> So, using the salting technique we can actually change the joining
>>> column values by appending some random number in a specified range.
>>>
>>> So, suppose I have these two values in a partition of two different
>>> tables:
>>>
>>> Table A:
>>> Partition1:
>>> x
>>> .
>>> .
>>> .
>>> x
>>>
>>> Table B:
>>> Partition1:
>>> x
>>> .
>>> .
>>> .
>>> x
>>>
>>> After Salting it would be something like the below:
>>>
>>> Table A:
>>> Partition1:
>>> x_1
>>>
>>> Partition 2:
>>> x_2
>>>
>>> Table B:
>>> Partition1:
>>> x_3
>>>
>>> Partition 2:
>>> x_8
>>>
>>> Now, when I inner join these two tables after salting in order to avoid
>>> data skewness problems, I won't get a match since the keys are different
>>> after applying salting techniques.
>>>
>>> So how does this resolves the data skewness issue or if there is some
>>> understanding gap?
>>>
>>> Could anyone help me in layman's terms?
>>>
>>> TIA,
>>> Sid
>>>
>>


Re: Salting technique doubt

2022-07-30 Thread Amit Joshi
Hi Sid,

Salting is normally a technique to add random characters to existing values.
In big data we can use salting to deal with the skewness.
Salting in join cas be used as :
* Table A-*
Col1, join_col , where join_col values are {x1, x2, x3}
x1
x1
x1
x2
x2
x3

*Table B-*
join_col, Col3 , where join_col  value are {x1, x2}
x1
x2

*Problem: *Let say for table A, data is skewed on x1
Now salting goes like this.  *Salt value =2*
For
*table A, *create a new col with values by salting join col
*New_Join_Col*
x1_1
x1_2
x1_1
x2_1
x2_2
x3_1

For *Table B,*
Change the join_col to all possible values of the sale.
join_col
x1_1
x1_2
x2_1
x2_2

And then join it like
table1.join(table2, where tableA.new_join_col == tableB. join_col)

Let me know if you have any questions.

Regards
Amit Joshi


On Sat, Jul 30, 2022 at 7:16 PM Sid  wrote:

> Hi Team,
>
> I was trying to understand the Salting technique for the column where
> there would be a huge load on a single partition because of the same keys.
>
> I referred to one youtube video with the below understanding:
>
> So, using the salting technique we can actually change the joining column
> values by appending some random number in a specified range.
>
> So, suppose I have these two values in a partition of two different tables:
>
> Table A:
> Partition1:
> x
> .
> .
> .
> x
>
> Table B:
> Partition1:
> x
> .
> .
> .
> x
>
> After Salting it would be something like the below:
>
> Table A:
> Partition1:
> x_1
>
> Partition 2:
> x_2
>
> Table B:
> Partition1:
> x_3
>
> Partition 2:
> x_8
>
> Now, when I inner join these two tables after salting in order to avoid
> data skewness problems, I won't get a match since the keys are different
> after applying salting techniques.
>
> So how does this resolves the data skewness issue or if there is some
> understanding gap?
>
> Could anyone help me in layman's terms?
>
> TIA,
> Sid
>


Re: Spark 3.1 Json4s-native jar compatibility

2022-02-04 Thread Amit Sharma
Thanks Sean/Martin, my bad, Spark version was 3.0.1 so after using json
3.6.6 it fixed the issue.


Thanks
Amit

On Fri, Feb 4, 2022 at 3:37 PM Sean Owen  wrote:

> My guess is that something else you depend on is actually bringing in a
> different json4s, or you're otherwise mixing library/Spark versions. Use
> mvn dependency:tree or equivalent on your build to see what you actually
> build in. You probably do not need to include json4s at all as it is in
> Spark anway
>
> On Fri, Feb 4, 2022 at 2:35 PM Amit Sharma  wrote:
>
>> Martin Sean, changed it to  3.7.0-MS still getting the below error.
>> I am still getting the same issue
>> Exception in thread "streaming-job-executor-0"
>> java.lang.NoSuchMethodError:
>> org.json4s.ShortTypeHints$.apply$default$2()Ljava/lang/String;
>>
>>
>> Thanks
>> Amit
>>
>> On Fri, Feb 4, 2022 at 9:03 AM Martin Grigorov 
>> wrote:
>>
>>> Hi,
>>>
>>> Amit said that he uses Spark 3.1, so the link should be
>>> https://github.com/apache/spark/blob/branch-3.1/pom.xml#L879 (3.7.0-M5)
>>>
>>> @Amit: check your classpath. Maybe there are more jars of this
>>> dependency.
>>>
>>> On Thu, Feb 3, 2022 at 10:53 PM Sean Owen  wrote:
>>>
>>>> You can look it up:
>>>> https://github.com/apache/spark/blob/branch-3.2/pom.xml#L916
>>>> 3.7.0-M11
>>>>
>>>> On Thu, Feb 3, 2022 at 1:57 PM Amit Sharma 
>>>> wrote:
>>>>
>>>>> Hello, everyone. I am migrating my spark stream to spark version 3.1.
>>>>> I also upgraded  json version  as below
>>>>>
>>>>> libraryDependencies += "org.json4s" %% "json4s-native" % "3.7.0-M5"
>>>>>
>>>>>
>>>>> While running the job I getting an error for the below code where I am
>>>>> serializing the given inputs.
>>>>>
>>>>> implicit val formats = 
>>>>> Serialization.formats(ShortTypeHints(List(classOf[ForecastResponse], 
>>>>> classOf[OverlayRequest],
>>>>>   classOf[FTEResponseFromSpark], classOf[QuotaResponse], 
>>>>> classOf[CloneResponse]
>>>>>
>>>>> )))
>>>>>
>>>>>
>>>>> Exception in thread "streaming-job-executor-4" 
>>>>> java.lang.NoSuchMethodError: 
>>>>> org.json4s.ShortTypeHints$.apply$default$2()Ljava/lang/String;
>>>>>
>>>>> It seems to me jar issue, not sure which version of json4s-native should 
>>>>> I use with spark 3.1.
>>>>>
>>>>>


Re: Spark 3.1 Json4s-native jar compatibility

2022-02-04 Thread Amit Sharma
Martin Sean, changed it to  3.7.0-MS still getting the below error.
I am still getting the same issue
Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError:
org.json4s.ShortTypeHints$.apply$default$2()Ljava/lang/String;


Thanks
Amit

On Fri, Feb 4, 2022 at 9:03 AM Martin Grigorov  wrote:

> Hi,
>
> Amit said that he uses Spark 3.1, so the link should be
> https://github.com/apache/spark/blob/branch-3.1/pom.xml#L879 (3.7.0-M5)
>
> @Amit: check your classpath. Maybe there are more jars of this dependency.
>
> On Thu, Feb 3, 2022 at 10:53 PM Sean Owen  wrote:
>
>> You can look it up:
>> https://github.com/apache/spark/blob/branch-3.2/pom.xml#L916
>> 3.7.0-M11
>>
>> On Thu, Feb 3, 2022 at 1:57 PM Amit Sharma  wrote:
>>
>>> Hello, everyone. I am migrating my spark stream to spark version 3.1. I
>>> also upgraded  json version  as below
>>>
>>> libraryDependencies += "org.json4s" %% "json4s-native" % "3.7.0-M5"
>>>
>>>
>>> While running the job I getting an error for the below code where I am
>>> serializing the given inputs.
>>>
>>> implicit val formats = 
>>> Serialization.formats(ShortTypeHints(List(classOf[ForecastResponse], 
>>> classOf[OverlayRequest],
>>>   classOf[FTEResponseFromSpark], classOf[QuotaResponse], 
>>> classOf[CloneResponse]
>>>
>>> )))
>>>
>>>
>>> Exception in thread "streaming-job-executor-4" java.lang.NoSuchMethodError: 
>>> org.json4s.ShortTypeHints$.apply$default$2()Ljava/lang/String;
>>>
>>> It seems to me jar issue, not sure which version of json4s-native should I 
>>> use with spark 3.1.
>>>
>>>


Spark 3.1 Json4s-native jar compatibility

2022-02-03 Thread Amit Sharma
Hello, everyone. I am migrating my spark stream to spark version 3.1. I
also upgraded  json version  as below

libraryDependencies += "org.json4s" %% "json4s-native" % "3.7.0-M5"


While running the job I getting an error for the below code where I am
serializing the given inputs.

implicit val formats =
Serialization.formats(ShortTypeHints(List(classOf[ForecastResponse],
classOf[OverlayRequest],
  classOf[FTEResponseFromSpark], classOf[QuotaResponse], classOf[CloneResponse]

)))


Exception in thread "streaming-job-executor-4"
java.lang.NoSuchMethodError:
org.json4s.ShortTypeHints$.apply$default$2()Ljava/lang/String;

It seems to me jar issue, not sure which version of json4s-native
should I use with spark 3.1.


Re: Kafka to spark streaming

2022-01-29 Thread Amit Sharma
Thanks Mich. The link you shared have two options Kafka and Socket only.


Thanks
Amit

On Sat, Jan 29, 2022 at 3:49 AM Mich Talebzadeh 
wrote:

> So you have a classic architecture with spark receiving events through a
> kafka topic via kafka-spark-connector, do something with it and send data
> out to the consumer. Are you using Spark structured streaming here with
> batch streaming? check
>
>
> https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#structured-streaming-programming-guide
>
> HTH
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 28 Jan 2022 at 22:14, Amit Sharma  wrote:
>
>> Hello everyone, we have spark streaming application. We send request to
>> stream through Akka actor using Kafka topic. We wait for response as it is
>> real time. Just want a suggestion is there any better option like Livy
>> where we can send and receive request to spark streaming.
>>
>>
>> Thanks
>> Amit
>>
>


Kafka to spark streaming

2022-01-28 Thread Amit Sharma
Hello everyone, we have spark streaming application. We send request to
stream through Akka actor using Kafka topic. We wait for response as it is
real time. Just want a suggestion is there any better option like Livy
where we can send and receive request to spark streaming.


Thanks
Amit


Fwd: Cassandra driver upgrade

2022-01-24 Thread Amit Sharma
I am upgrading my cassandra java driver version to the latest  4.13. I have
a Cassandra cluster using Cassandra version 3.11.11.
I am getting the below runtime error while connecting to cassandra.
Before version 4.13 I was using version 3.9 and things were working fine.

 c.d.o.d.i.c.c.ControlConnection - [s0] Error connecting to Node(endPoint=/
127.0.0.1:9042, hostId=null, hashCode=5495a763), trying next node
(ConnectionInitException: [s0|control|connecting...] Protocol
initialization request, step 1 (OPTIONS): failed to send request
(io.netty.channel.StacklessClosedChannelException))


Please suggest. it has blocked my production release.


Thanks
Amit


Re: Spark 3.2.0 upgrade

2022-01-22 Thread Amit Sharma
Alex, Please find below the build.sbt. I am using the assembly command to
create a fat jar. These things were working fine until i changed the
version scala and spark cassandra connector and other dependent jars.
I tried to run my job on my local intellij then also i am getting the issue.

Thanks
Amit


name := """cft-common"""

ThisBuild / version := "0.0.1-SNAPSHOT"

ThisBuild/scalaVersion := "2.12.15"
Test / fork := true
Test / envVars := Map("env" -> "qa")

val jacksonVersion = "2.13.1"
val AkkaVersion = "2.6.17"

val sparkVersion = "3.1.0"


libraryDependencies += "com.typesafe" % "config" % "1.3.3"
libraryDependencies += "com.typesafe.play" %% "play-json" % "2.9.2"
//2021-10-29
Chuck replaced cake solutions with apache recent version
libraryDependencies += "com.typesafe.play" %% "play-json-joda" %
"2.9.2" ///2021-12-06
Chuck Updated play json to latest to match
libraryDependencies += "org.json4s" %% "json4s-native" % "3.6.12"
libraryDependencies += "com.datastax.cassandra" % "java-driver-core" %
"3.9.0"
libraryDependencies += "io.getquill" %% "quill-cassandra" % "3.4.9"
libraryDependencies += "org.scalaj" %% "scalaj-http" % "2.4.2"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.8.1"
//2021-10-29
Chuck replaced cake solutions with apache recent version
libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" %
"2.1.1" //2021-11-11
Chuck replaced cake solutions with akka recent version
libraryDependencies += "com.typesafe.akka" %% "akka-slf4j" %
AkkaVersion //2021-11-11
Chuck replaced cake solutions with akka recent version
libraryDependencies += "com.typesafe.akka" %% "akka-stream" %
AkkaVersion //2021-11-11
Chuck replaced cake solutions with akka recent version
libraryDependencies += "com.typesafe.akka" %% "akka-actor" %
AkkaVersion //2021-11-11
Chuck replaced cake solutions with akka recent version
libraryDependencies += "org.apache.logging.log4j" % "log4j-api" % "2.17.0"
libraryDependencies += "org.apache.logging.log4j" % "log4j-core" % "2.17.0"

/** pin jackson libraries to specific sbt version */
val jacksonDeps = Seq(
"com.fasterxml.jackson.core" % "jackson-core" % jacksonVersion,
"com.fasterxml.jackson.core" % "jackson-annotations" % jacksonVersion,
"com.fasterxml.jackson.core" % "jackson-databind" % jacksonVersion,
"com.fasterxml.jackson.datatype" % "jackson-datatype-jdk8" % jacksonVersion,
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" %
jacksonVersion,
"com.fasterxml.jackson.module" %% "jackson-module-scala" % jacksonVersion,
"com.fasterxml.jackson.module" % "jackson-module-paranamer" % jacksonVersion
).filter(_.name != "jackson-annotations")
.map(_.exclude("com.fasterxml.jackson.core","jackson-annotations"))

def excludeJackson(fromDependency: ModuleID): ModuleID = {
jacksonDeps.foldLeft(fromDependency){ (libDep, jackSonDep) =>
libDep.exclude(jackSonDep.organization, jackSonDep.name)
}
}
// https://mvnrepository.com/artifact/com.google.guava/guava
//libraryDependencies += "com.google.guava" % "guava" % "16.0.1"



libraryDependencies ++= (
Seq(
"org.scalatest" %% "scalatest" % "3.0.1" % Test,
"org.scalacheck" %% "scalacheck" % "1.13.4" % Test)
).map(excludeJackson) ++
jacksonDeps

//dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind"
% "2.8.9"
//dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-core" %
"2.8.9"


// https://mvnrepository.com/artifact/org.julienrf/play-json-derived-codecs
libraryDependencies += "org.julienrf" %% "play-json-derived-codecs" %
"5.0.0"
libraryDependencies += "org.scalatestplus.play" %%
"scalatestplus-play" % "3.0.0"
% Test
libraryDependencies += "net.liftweb" %% "lift-json" % "3.4.2"

libraryDependencies += "com.fasterxml.jackson.core" %
"jackson-annotations" % jacksonVersion force()
libraryDependencies += "net.liftweb" %% "lift-json" % "3.4.2"


val sparkDependencies = Seq(
  "org.apache.spark" %% "spark-sql" % sparkVersion  ,
  "org.apache.spark" %% "spark-catalyst" % sparkVersion,
  "org.apache.spar

Re: Spark 3.2.0 upgrade

2022-01-21 Thread Amit Sharma
Hello, I tried using a cassandra unshaded  connector or normal connector
both are giving the same error at runtime while connecting to cassandra.

"com.datastax.spark" %% "spark-cassandra-connector-unshaded" % "2.4.2"

Or

"com.datastax.spark" %% "spark-cassandra-connector" % "3.1.0"


Russ similar issue is reported here also but no solution

https://community.datastax.com/questions/3519/issue-with-spring-boot-starter-data-cassandra-and.html

Caused by: java.lang.ClassNotFoundException: com.codahale.metrics.JmxReporter
at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)




On Thu, Jan 20, 2022 at 5:17 PM Amit Sharma  wrote:

> Hello, I am trying to upgrade my project from spark 2.3.3 to spark 3.2.0.
> While running the application locally I am getting below error.
>
> Could you please let me know which version of the cassandra connector I
> should use. I am using below shaded connector  but i think that causing the
> issue
>
> "com.datastax.spark" %% "spark-cassandra-connector-unshaded" % "2.4.2"
>
>
> Caused by: java.lang.ClassNotFoundException: com.codahale.metrics.JmxReporter
>   at 
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
>   at 
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>   at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
>
>
> Thanks
>
> Amit
>
>


Spark 3.2.0 upgrade

2022-01-20 Thread Amit Sharma
Hello, I am trying to upgrade my project from spark 2.3.3 to spark 3.2.0.
While running the application locally I am getting below error.

Could you please let me know which version of the cassandra connector I
should use. I am using below shaded connector  but i think that causing the
issue

"com.datastax.spark" %% "spark-cassandra-connector-unshaded" % "2.4.2"


Caused by: java.lang.ClassNotFoundException: com.codahale.metrics.JmxReporter
at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)


Thanks

Amit


Log4j2 upgrade

2022-01-12 Thread Amit Sharma
Hello, everyone. I am replacing log4j with log4j2 in my spark streaming
application. When i deployed my application to spark cluster it is giving
me the below error .

" ERROR StatusLogger Log4j2 could not find a logging implementation. Please
add log4j-core to the classpath. Using SimpleLogger to log to the console "


I am including the core jar in my fat jar and core jar also included in the
jar. Although the application is running fine, I am doubtful the logs are
generated using log4j not log4j2 .
I am using sbt assembly jar and also noticed below  messages in the build

Fully-qualified classname does not match jar entry:
  jar entry: META-INF/versions/9/module-info.class


  class name: module-info.class
Omitting META-INF/versions/9/module-info.class.
Fully-qualified classname does not match jar entry:
  jar entry:
META-INF/versions/9/org/apache/logging/log4j/util/Base64Util.class
  class name: org/apache/logging/log4j/util/Base64Util.class
Omitting META-INF/versions/9/org/apache/logging/log4j/util/Base64Util.class.
Fully-qualified classname does not match jar entry:
  jar entry:
META-INF/versions/9/org/apache/logging/log4j/util/internal/DefaultObjectInputFilter.class


any idea how to resolve these.


Thanks
Amit


Re: [Spark] Optimize spark join on different keys for same data frame

2021-10-04 Thread Amit Joshi
Hi spark users,

Can anyone please provide any views on the topic.


Regards
Amit Joshi

On Sunday, October 3, 2021, Amit Joshi  wrote:

> Hi Spark-Users,
>
> Hope you are doing good.
>
> I have been working on cases where a dataframe is joined with more than
> one data frame separately, on different cols, that too frequently.
> I was wondering how to optimize the join to make them faster.
> We can consider the dataset to be big in size so broadcast joins is not an
> option.
>
> For eg:
>
> schema_df1  = new StructType()
> .add(StructField("key1", StringType, true))
> .add(StructField("key2", StringType, true))
> .add(StructField("val", DoubleType, true))
>
>
> schema_df2  = new StructType()
> .add(StructField("key1", StringType, true))
> .add(StructField("val", DoubleType, true))
>
>
> schema_df3  = new StructType()
> .add(StructField("key2", StringType, true))
> .add(StructField("val", DoubleType, true))
>
> Now if we want to join
> join1 =  df1.join(df2,"key1")
> join2 =  df1.join(df3,"key2")
>
> I was thinking of bucketing as a solution to speed up the joins. But if I
> bucket df1 on the key1,then join2  may not benefit, and vice versa (if
> bucket on key2 for df1).
>
> or Should we bucket df1 twice, one with key1 and another with key2?
> Is there a strategy to make both the joins faster for both the joins?
>
>
> Regards
> Amit Joshi
>
>
>
>


[Spark] Optimize spark join on different keys for same data frame

2021-10-03 Thread Amit Joshi
Hi Spark-Users,

Hope you are doing good.

I have been working on cases where a dataframe is joined with more than one
data frame separately, on different cols, that too frequently.
I was wondering how to optimize the join to make them faster.
We can consider the dataset to be big in size so broadcast joins is not an
option.

For eg:

schema_df1  = new StructType()
.add(StructField("key1", StringType, true))
.add(StructField("key2", StringType, true))
.add(StructField("val", DoubleType, true))


schema_df2  = new StructType()
.add(StructField("key1", StringType, true))
.add(StructField("val", DoubleType, true))


schema_df3  = new StructType()
.add(StructField("key2", StringType, true))
.add(StructField("val", DoubleType, true))

Now if we want to join
join1 =  df1.join(df2,"key1")
join2 =  df1.join(df3,"key2")

I was thinking of bucketing as a solution to speed up the joins. But if I
bucket df1 on the key1,then join2  may not benefit, and vice versa (if
bucket on key2 for df1).

or Should we bucket df1 twice, one with key1 and another with key2?
Is there a strategy to make both the joins faster for both the joins?


Regards
Amit Joshi


Kafka Sink Issue

2021-08-23 Thread Amit Sharma
$class.reportTimeTaken(ProgressReporter.scala:351)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
at org.apache.spark.sql.execution.streaming.StreamExecution.org
$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)

Can anyone help here and let me know about why it happened and what is
resolution for this.

-- 
Thanks & Regards,
Amit Sharma


Spark Null Pointer Exception

2021-06-30 Thread Amit Sharma
Hi , I am using spark 2.7 version with scala. I am calling a method as
below

1. val rddBacklog = spark.sparkContext.parallelize(MAs) // MA is list
of say city

2. rddBacklog.foreach(ma => doAlloc3Daily(ma, fteReview.forecastId,
startYear, endYear))

3.doAlloc3Daily method just doing a database call and doing some scala
calculation (no rdd or dataframe)


Line number 2 I  am getting below  nullpointer intermittently on
cluster but never on local.

java.lang.NullPointerException
at 
sparkStreaming.CalculateFteReview.doAlloc3Daily(CalculateFteReview.scala:1307)
at 
sparkStreaming.CalculateFteReview$$anonfun$getNewDistribution$2.apply(CalculateFteReview.scala:1199)
at 
sparkStreaming.CalculateFteReview$$anonfun$getNewDistribution$2.apply(CalculateFteReview.scala:1199)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at 
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at 
org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:927)
at 
org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:927)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


Thanks

Amit


Re: Does Rollups work with spark structured streaming with state.

2021-06-17 Thread Amit Joshi
HI Mich,

Thanks for your email.
I have tried for the batch mode,
Still looking to try in streaming mode.
Will update you as per.


Regards
Amit Joshi

On Thu, Jun 17, 2021 at 1:07 PM Mich Talebzadeh 
wrote:

> OK let us start with the basic cube
>
> create a DF first
>
> scala> val df = Seq(
>  |   ("bar", 2L),
>  |   ("bar", 2L),
>  |   ("foo", 1L),
>  |   ("foo", 2L)
>  | ).toDF("word", "num")
> df: org.apache.spark.sql.DataFrame = [word: string, num: bigint]
>
>
> Now try cube on it
>
>
> scala> df.cube($"word", $"num").count.sort(asc("word"), asc("num")).show
>
> +++-+
> |word| num|count|
> +++-+
> |null|null|4| Total rows in df
> |null|   1|1| Count where num equals 1
> |null|   2|3| Count where num equals 2
> | bar|null|2| Where word equals bar
> | bar|   2|2| Where word equals bar and num equals 2
> | foo|null|2| Where word equals foo
> | foo|   1|1| Where word equals foo and num equals 1
> | foo|   2|1| Where word equals foo and num equals 2
> +++-+
>
>
> and rollup
>
>
> scala> df.rollup($"word",$"num").count.sort(asc("word"), asc("num")).show
>
>
> +++-+
> |word| num|count|
> +++-+
> |null|null|4| Count of all rows
> | bar|null|2| Count when word is bar
> | bar|   2|2| Count when num is 2
> | foo|null|2| Count when word is foo
> | foo|   1|1| When word is foo and num is 1
> | foo|   2|1| When word is foo and num is 2
> +++-+
>
>
> So rollup() returns a subset of the rows returned by cube(). From the
> above, rollup returns 6 rows whereas cube returns 8 rows. Here are the
> missing rows.
>
> +++-+
> |word| num|count|
> +++-+
> |null|   1|1| Word is null and num is 1
> |null|   2|3| Word is null and num is 2
> +++-+
>
> Now back to Spark Structured Streaming (SSS), we have basic aggregations
>
>
> """
> We work out the window and the AVG(temperature) in the
> window's timeframe below
> This should return back the following Dataframe as struct
>
>  root
>  |-- window: struct (nullable = false)
>  ||-- start: timestamp (nullable = true)
>  ||-- end: timestamp (nullable = true)
>  |-- avg(temperature): double (nullable = true)
>
> """
> resultM = resultC. \
>  withWatermark("timestamp", "5 minutes"). \
>  groupBy(window(resultC.timestamp, "5 minutes", "5
> minutes")). \
>  avg('temperature')
>
> # We take the above Dataframe and flatten it to get the
> columns aliased as "startOfWindowFrame", "endOfWindowFrame" and
> "AVGTemperature"
> resultMF = resultM. \
>select( \
>
> F.col("window.start").alias("startOfWindowFrame") \
>   , F.col("window.end").alias("endOfWindowFrame") \
>   ,
> F.col("avg(temperature)").alias("AVGTemperature"))
>
> Now basic aggregation on singular columns can be done like
> avg('temperature'),max(),stddev() etc
>
>
> For cube() and rollup() I will require additional columns like location
> etc in my kafka topic. Personally I have not tried it but it will be
> interesting to see if it works.
>
>
> Have you tried cube() first?
>
>
> HTH
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 17 Jun 2021 at 07:44, Amit Joshi 
> wrote:
>
>> Hi Mich,
>>
>> Yes, you may think of cube rollups.
>> Let me try to give an example:
>> If we have a stream of data like (country,area,count, time), we would be
>> able to get the updated count with different combinations of keys.
>>
>>> As example -
>>>  (country - count)
>>>  (country , area - coun

Re: Does Rollups work with spark structured streaming with state.

2021-06-16 Thread Amit Joshi
Hi Mich,

Yes, you may think of cube rollups.
Let me try to give an example:
If we have a stream of data like (country,area,count, time), we would be
able to get the updated count with different combinations of keys.

> As example -
>  (country - count)
>  (country , area - count)


We may need to store the state to update the count. So spark structured
streaming states will come into picture.

As now with batch programming, we can do it with

> df.rollup(col1,col2).count


But if I try to use it with spark structured streaming state, will it store
the state of all the groups as well?
I hope I was able to make my point clear.

Regards
Amit Joshi

On Wed, Jun 16, 2021 at 11:36 PM Mich Talebzadeh 
wrote:

>
>
> Hi,
>
> Just to clarify
>
> Are we talking about* rollup* as a subset of a cube that computes
> hierarchical subtotals from left to right?
>
>
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 16 Jun 2021 at 16:37, Amit Joshi 
> wrote:
>
>> Appreciate if someone could give some pointers in the question below.
>>
>> -- Forwarded message -
>> From: Amit Joshi 
>> Date: Tue, Jun 15, 2021 at 12:19 PM
>> Subject: [Spark]Does Rollups work with spark structured streaming with
>> state.
>> To: spark-user 
>>
>>
>> Hi Spark-Users,
>>
>> Hope you are all doing well.
>> Recently I was looking into rollup operations in spark.
>>
>> As we know state based aggregation is supported in spark structured
>> streaming.
>> I was wondering if rollup operations are also supported?
>> Like the state of previous aggregation on the rollups are saved.
>>
>> If rollups are not supported, then what is the standard way to handle
>> this?
>>
>>
>> Regards
>> Amit Joshi
>>
>


Fwd: Does Rollups work with spark structured streaming with state.

2021-06-16 Thread Amit Joshi
Appreciate if someone could give some pointers in the question below.

-- Forwarded message -
From: Amit Joshi 
Date: Tue, Jun 15, 2021 at 12:19 PM
Subject: [Spark]Does Rollups work with spark structured streaming with
state.
To: spark-user 


Hi Spark-Users,

Hope you are all doing well.
Recently I was looking into rollup operations in spark.

As we know state based aggregation is supported in spark structured
streaming.
I was wondering if rollup operations are also supported?
Like the state of previous aggregation on the rollups are saved.

If rollups are not supported, then what is the standard way to handle this?


Regards
Amit Joshi


Does Rollups work with spark structured streaming with state.

2021-06-14 Thread Amit Joshi
Hi Spark-Users,

Hope you are all doing well.
Recently I was looking into rollup operations in spark.

As we know state based aggregation is supported in spark structured
streaming.
I was wondering if rollup operations are also supported?
Like the state of previous aggregation on the rollups are saved.

If rollups are not supported, then what is the standard way to handle this?


Regards
Amit Joshi


Re: multiple query with structured streaming in spark does not work

2021-05-21 Thread Amit Joshi
Hi Jian,

I found this link that could be useful.
https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application

By the way you can try once giving enough resources to run both jobs
without defining the scheduler.
I mean run the queries with default scheduler, but provide enough memory in
the spark cluster to run both.


Regards
Amit Joshi



On Sat, May 22, 2021 at 5:41 AM  wrote:

> Hi Amit;
>
>
>
> Thank you for your prompt reply and kind help. Wonder how to set the
> scheduler to FAIR mode in python. Following code seems to me does not work
> out.
>
>
>
> conf = SparkConf().setMaster("local").setAppName("HSMSTest1")
>
> sc = SparkContext(conf=conf)
>
> sc.setLocalProperty('spark.scheduler.mode', 'FAIR')
>
> spark =
> SparkSession.builder.appName("HSMSStructedStreaming1").getOrCreate()
>
>
>
> by the way, as I am using nc -lk  to input the stream, will it cause
> by the reason as the input stream can only be consumed by one query as
> mentioned in below post as;
>
>
>
>
> https://stackoverflow.com/questions/45618489/executing-separate-streaming-queries-in-spark-structured-streaming
>
>
>
> appreciate your further help/support.
>
>
>
> Best Regards,
>
>
>
> Jian Xu
>
>
>
> *From:* Amit Joshi 
> *Sent:* Friday, May 21, 2021 12:52 PM
> *To:* jia...@xtronica.no
> *Cc:* user@spark.apache.org
> *Subject:* Re: multiple query with structured streaming in spark does not
> work
>
>
>
> Hi Jian,
>
>
>
> You have to use same spark session to run all the queries.
>
> And use the following to wait for termination.
>
>
>
> q1 = writestream.start
>
> q2 = writstream2.start
>
> spark.streams.awaitAnyTermination
>
>
>
> And also set the scheduler in the spark config to FAIR scheduler.
>
>
>
>
>
> Regards
>
> Amit Joshi
>
>
>
>
>
> On Saturday, May 22, 2021,  wrote:
>
> Hi There;
>
>
>
> I am new to spark. We are using spark to develop our app for data
> streaming with sensor readings.
>
>
>
> I am having trouble to get two queries with structured streaming working
> concurrently.
>
>
>
> Following is the code. It can only work with one of them. Wonder if there
> is any way to get it doing. Appreciate help from the team.
>
>
>
> Regards,
>
>
>
> Jian Xu
>
>
>
>
>
> hostName = 'localhost'
>
> portNumber= 
>
> wSize= '10 seconds'
>
> sSize ='2 seconds'
>
>
>
> def wnq_fb_func(batch_df, batch_id):
>
> print("batch is processed from time:{}".format(datetime.now()))
>
> print(batch_df.collect())
>
> batch_df.show(10,False,False)
>
>
>
> lines = spark.readStream.format('socket').option('host',
> hostName).option('port', portNumber).option('includeTimestamp', True).load()
>
>
>
> nSensors=3
>
>
>
> scols = split(lines.value, ',').cast(ArrayType(FloatType()))
>
> sensorCols = []
>
> for i in range(nSensors):
>
> sensorCols.append(scols.getItem(i).alias('sensor'+ str(i)))
>
>
>
> nlines=lines.select(lines.timestamp,lines.value, *sensorCols)
>
> nlines.printSchema()
>
>
>
> wnlines =nlines.select(window(nlines.timestamp, wSize,
> sSize).alias('TimeWindow'), *lines.columns)
>
> wnquery= wnlines.writeStream.trigger(processingTime=sSize)\
>
> .outputMode('append').foreachBatch(wnq_fb_func).start()
>
>
>
> nquery=nlines.writeStream.outputMode('append').format('console').start()
>
> nquery.awaitTermination()
>
> wnquery.awaitTermination()
>
>
>
>
>
>
>
>


Re: multiple query with structured streaming in spark does not work

2021-05-21 Thread Amit Joshi
Hi Jian,

You have to use same spark session to run all the queries.
And use the following to wait for termination.

q1 = writestream.start
q2 = writstream2.start
spark.streams.awaitAnyTermination

And also set the scheduler in the spark config to FAIR scheduler.


Regards
Amit Joshi



On Saturday, May 22, 2021,  wrote:

> Hi There;
>
>
>
> I am new to spark. We are using spark to develop our app for data
> streaming with sensor readings.
>
>
>
> I am having trouble to get two queries with structured streaming working
> concurrently.
>
>
>
> Following is the code. It can only work with one of them. Wonder if there
> is any way to get it doing. Appreciate help from the team.
>
>
>
> Regards,
>
>
>
> Jian Xu
>
>
>
>
>
> hostName = 'localhost'
>
> portNumber= 
>
> wSize= '10 seconds'
>
> sSize ='2 seconds'
>
>
>
> def wnq_fb_func(batch_df, batch_id):
>
> print("batch is processed from time:{}".format(datetime.now()))
>
> print(batch_df.collect())
>
> batch_df.show(10,False,False)
>
>
>
> lines = spark.readStream.format('socket').option('host',
> hostName).option('port', portNumber).option('includeTimestamp',
> True).load()
>
>
>
> nSensors=3
>
>
>
> scols = split(lines.value, ',').cast(ArrayType(FloatType()))
>
> sensorCols = []
>
> for i in range(nSensors):
>
> sensorCols.append(scols.getItem(i).alias('sensor'+ str(i)))
>
>
>
> nlines=lines.select(lines.timestamp,lines.value, *sensorCols)
>
> nlines.printSchema()
>
>
>
> wnlines =nlines.select(window(nlines.timestamp, wSize,
> sSize).alias('TimeWindow'), *lines.columns)
>
> wnquery= wnlines.writeStream.trigger(processingTime=sSize)\
>
> .outputMode('append').foreachBatch(wnq_fb_func).start()
>
>
>
> nquery=nlines.writeStream.outputMode('append').format('console').start()
>
> nquery.awaitTermination()
>
> wnquery.awaitTermination()
>
>
>
>
>
>
>


Re: [EXTERNAL] Urgent Help - Py Spark submit error

2021-05-14 Thread Amit Joshi
Hi KhajaAsmath,

Client vs Cluster: In client mode driver runs in the machine from where you
submit your job. Whereas in cluster mode driver runs in one of the worker
nodes.

I think you need to pass the conf file to your driver, as you are using it
in the driver code, which runs in one of the worker nodes.
Use this command to pass it to driver
*--files  /appl/common/ftp/conf.json  --conf
spark.driver.extraJavaOptions="-Dconfig.file=conf.json*

And make sure you are able to access the file location from worker nodes.


Regards
Amit Joshi

On Sat, May 15, 2021 at 5:14 AM KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Here is my updated spark submit without any luck.,
>
> spark-submit --master yarn --deploy-mode cluster --files
> /appl/common/ftp/conf.json,/etc/hive/conf/hive-site.xml,/etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml
> --num-executors 6 --executor-cores 3 --driver-cores 3 --driver-memory 7g
> --executor-memory 7g /appl/common/ftp/ftp_event_data.py
> /appl/common/ftp/conf.json 2021-05-10 7
>
> On Fri, May 14, 2021 at 6:19 PM KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> Sorry my bad, it did not resolve the issue. I still have the same issue.
>> can anyone please guide me. I was still running as a client instead of a
>> cluster.
>>
>> On Fri, May 14, 2021 at 5:05 PM KhajaAsmath Mohammed <
>> mdkhajaasm...@gmail.com> wrote:
>>
>>> You are right. It worked but I still don't understand why I need to pass
>>> that to all executors.
>>>
>>> On Fri, May 14, 2021 at 5:03 PM KhajaAsmath Mohammed <
>>> mdkhajaasm...@gmail.com> wrote:
>>>
>>>> I am using json only to read properties before calling spark session. I
>>>> don't know why we need to pass that to all executors.
>>>>
>>>>
>>>> On Fri, May 14, 2021 at 5:01 PM Longjiang.Yang <
>>>> longjiang.y...@target.com> wrote:
>>>>
>>>>> Could you check whether this file is accessible in executors? (is it
>>>>> in HDFS or in the client local FS)
>>>>> /appl/common/ftp/conf.json
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *From: *KhajaAsmath Mohammed 
>>>>> *Date: *Friday, May 14, 2021 at 4:50 PM
>>>>> *To: *"user @spark" 
>>>>> *Subject: *[EXTERNAL] Urgent Help - Py Spark submit error
>>>>>
>>>>>
>>>>>
>>>>> /appl/common/ftp/conf.json
>>>>>
>>>>


Re: jar incompatibility with Spark 3.1.1 for structured streaming with kafka

2021-04-07 Thread Amit Joshi
Hi Mich,

If I correctly understood your problem, it is that the spark-kafka jar is
shadowed by the installed kafka client jar at run time.
I had been in that place earlier.
I can recommend resolving the issue using the shade plugin. The example I
am pasting here works for pom.xml.
I am very sure you will find something for sbt as well.
This is a maven shade plugin to change the name of the class while
packaging. This will form an uber jar.
<*relocations*>
<*relocation*>
<*pattern*>org.apache.kafka
<*shadedPattern*>shade.org.apache.kafka



Hope this helps.

Regards
Amit Joshi

On Wed, Apr 7, 2021 at 8:14 PM Mich Talebzadeh 
wrote:

>
> Did some tests. The concern is SSS job running under YARN
>
>
> *Scenario 1)*  use spark-sql-kafka-0-10_2.12-3.1.0.jar
>
>- Removed spark-sql-kafka-0-10_2.12-3.1.0.jar from anywhere on
>CLASSPATH including $SPARK_HOME/jars
>- Added the said jar file to spark-submit in client mode (the only
>mode available to PySpark) with --jars
>- spark-submit --master yarn --deploy-mode client --conf
>spark.pyspark.virtualenv.enabled=true .. bla bla..  --driver-memory 4G
>--executor-memory 4G --num-executors 2 --executor-cores 2 *--jars
>$HOME/jars/spark-sql-kafka-0-10_2.12-3.1.0.jar *xyz.py
>
> This works fine
>
>
> *Scenario 2)* use spark-sql-kafka-0-10_2.12-3.1.1.jar in spark-submit
>
>
>
>-  spark-submit --master yarn --deploy-mode client --conf
>spark.pyspark.virtualenv.enabled=true ..bla bla.. --driver-memory 4G
>--executor-memory 4G --num-executors 2 --executor-cores 2 *--jars
>$HOME/jars/spark-sql-kafka-0-10_2.12-*3.1.1*.jar *xyz.py
>
> it failed with
>
>
>
>- Caused by: java.lang.NoSuchMethodError:
>
> org.apache.spark.kafka010.KafkaTokenUtil$.needTokenUpdate(Ljava/util/Map;Lscala/Option;)Z
>
> Scenario 3) use the package as per Structured Streaming + Kafka
> Integration Guide (Kafka broker version 0.10.0 or higher) - Spark 3.1.1
> Documentation (apache.org)
> <https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#deploying>
>
>
>- spark-submit --master yarn --deploy-mode client --conf
>spark.pyspark.virtualenv.enabled=true ..bla bla.. --driver-memory 4G
>--executor-memory 4G --num-executors 2 --executor-cores 2 *--packages
>org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 *xyz.py
>
> it failed with
>
>- Caused by: java.lang.NoSuchMethodError:
>
> org.apache.spark.kafka010.KafkaTokenUtil$.needTokenUpdate(Ljava/util/Map;Lscala/Option;)Z
>
>
> HTH
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 7 Apr 2021 at 13:20, Gabor Somogyi 
> wrote:
>
>> +1 on Sean's opinion
>>
>> On Wed, Apr 7, 2021 at 2:17 PM Sean Owen  wrote:
>>
>>> You shouldn't be modifying your cluster install. You may at this point
>>> have conflicting, excess JARs in there somewhere. I'd start it over if you
>>> can.
>>>
>>> On Wed, Apr 7, 2021 at 7:15 AM Gabor Somogyi 
>>> wrote:
>>>
>>>> Not sure what you mean not working. You've added 3.1.1 to packages
>>>> which uses:
>>>> * 2.6.0 kafka-clients:
>>>> https://github.com/apache/spark/blob/1d550c4e90275ab418b9161925049239227f3dc9/pom.xml#L136
>>>> * 2.6.2 commons pool:
>>>> https://github.com/apache/spark/blob/1d550c4e90275ab418b9161925049239227f3dc9/pom.xml#L183
>>>>
>>>> I think it worth an end-to-end dep-tree analysis what is really
>>>> happening on the cluster...
>>>>
>>>> G
>>>>
>>>>
>>>> On Wed, Apr 7, 2021 at 11:11 AM Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Hi Gabor et. al.,
>>>>>
>>>>> To be honest I am not convinced this package --packages
>>>>> org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 is really working!
>>>>>
>>>>> I know for definite that spark-sql-kafka-0-10_2.12-3.1.0.jar works
>>>>> fine. I reported the package working before because under $SPARK_HOME/jars
>>>>> on all nodes there was a copy 3.0.1 jar file. 

unit testing for spark code

2021-03-22 Thread Amit Sharma
Hi, can we write unit tests for spark code. Is there any specific framework?


Thanks
Amit


Re: [Spark Structured Streaming] Processing the data path coming from kafka.

2021-01-18 Thread Amit Joshi
Hi Boris,

Thanks for your code block.
I understood what you are trying to achieve in the code.

But content in the file are json records seperated by new line.
And we have to make the dataframe out of it, as some processing has to be
done on it.

Regards
Amit
On Monday, January 18, 2021, Boris Litvak  wrote:

> HI Amit,
>
>
>
> I was thinking along the lines of (python):
>
>
>
>
> @udf(returnType=StringType())
> def reader_udf(filename: str) -> str:
> with open(filename, "r") as f:
> return f.read()
>
>
> def run_locally():
> with utils.build_spark_session("Local", local=True) as spark:
> df = spark.readStream.csv(r'testdata', schema=StructType([
> StructField('filename', StringType(), True)]))
> df = df.withColumn('content', reader_udf(col('filename')))
> q = df.select('content').writeStream.queryName('test').format(
> 'console').start()
> q.awaitTermination()
>
>
>
> Now each row contains the contents of the files, provided they are not
> large you can foreach() over the df/rdd and do whatever you want with it,
> such as json.loads()/etc.
>
> If you know the shema of the jsons, you can later explode() them into a
> flat DF, ala https://stackoverflow.com/questions/38243717/spark-
> explode-nested-json-with-array-in-scala
>
>
>
> Note that unless I am missing something you cannot access spark session
> from foreach as code is not running on the driver.
>
> Please say if it makes sense or did I miss anything.
>
>
>
> Boris
>
>
>
> *From:* Amit Joshi 
> *Sent:* Monday, 18 January 2021 17:10
> *To:* Boris Litvak 
> *Cc:* spark-user 
> *Subject:* Re: [Spark Structured Streaming] Processing the data path
> coming from kafka.
>
>
>
> Hi Boris,
>
>
>
> I need to do processing on the data present in the path.
>
> That is the reason I am trying to make the dataframe.
>
>
>
> Can you please provide the example of your solution?
>
>
>
> Regards
>
> Amit
>
>
>
> On Mon, Jan 18, 2021 at 7:15 PM Boris Litvak  wrote:
>
> Hi Amit,
>
>
>
> Why won’t you just map()/mapXXX() the kafkaDf with the mapping function
> that reads the paths?
>
> Also, do you really have to read the json into an additional dataframe?
>
>
>
> Thanks, Boris
>
>
>
> *From:* Amit Joshi 
> *Sent:* Monday, 18 January 2021 15:04
> *To:* spark-user 
> *Subject:* [Spark Structured Streaming] Processing the data path coming
> from kafka.
>
>
>
> Hi ,
>
>
>
> I have a use case where the file path of the json records stored in s3 are
> coming as a kafka
>
> message in kafka. I have to process the data using spark structured
> streaming.
>
>
>
> The design which I thought is as follows:
>
> 1. In kafka Spark structures streaming, read the message containing the
> data path.
>
> 2. Collect the message record in driver. (Messages are small in sizes)
>
> 3. Create the dataframe from the datalocation.
>
>
>
> *kafkaDf*.select(*$"value"*.cast(StringType))
>   .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) =>  {
>
> //rough code
>
> //collec to driver
>
> *val *records = batchDf.collect()
>
> //create dataframe and process
> records foreach((rec: Row) =>{
>   *println*(*"records:##"*,rec.toString())
>   val path = rec.getAs[String](*"data_path"*)
>
>   val dfToProcess =spark.read.json(path)
>
>   
>
> })
>
> }
>
> I would like to know the views, if this approach is fine? Specifically if 
> there is some problem with
>
> with creating the dataframe after calling collect.
>
> If there is any better approach, please let know the same.
>
>
>
> Regards
>
> Amit Joshi
>
>


Re: [Spark Structured Streaming] Processing the data path coming from kafka.

2021-01-18 Thread Amit Joshi
Hi Boris,

I need to do processing on the data present in the path.
That is the reason I am trying to make the dataframe.

Can you please provide the example of your solution?

Regards
Amit

On Mon, Jan 18, 2021 at 7:15 PM Boris Litvak  wrote:

> Hi Amit,
>
>
>
> Why won’t you just map()/mapXXX() the kafkaDf with the mapping function
> that reads the paths?
>
> Also, do you really have to read the json into an additional dataframe?
>
>
>
> Thanks, Boris
>
>
>
> *From:* Amit Joshi 
> *Sent:* Monday, 18 January 2021 15:04
> *To:* spark-user 
> *Subject:* [Spark Structured Streaming] Processing the data path coming
> from kafka.
>
>
>
> Hi ,
>
>
>
> I have a use case where the file path of the json records stored in s3 are
> coming as a kafka
>
> message in kafka. I have to process the data using spark structured
> streaming.
>
>
>
> The design which I thought is as follows:
>
> 1. In kafka Spark structures streaming, read the message containing the
> data path.
>
> 2. Collect the message record in driver. (Messages are small in sizes)
>
> 3. Create the dataframe from the datalocation.
>
>
>
> *kafkaDf*.select(*$"value"*.cast(StringType))
>   .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) =>  {
>
> //rough code
>
> //collec to driver
>
> *val *records = batchDf.collect()
>
> //create dataframe and process
> records foreach((rec: Row) =>{
>   *println*(*"records:##"*,rec.toString())
>   val path = rec.getAs[String](*"data_path"*)
>
>   val dfToProcess =spark.read.json(path)
>
>   
>
> })
>
> }
>
> I would like to know the views, if this approach is fine? Specifically if 
> there is some problem with
>
> with creating the dataframe after calling collect.
>
> If there is any better approach, please let know the same.
>
>
>
> Regards
>
> Amit Joshi
>
>


[Spark Structured Streaming] Processing the data path coming from kafka.

2021-01-18 Thread Amit Joshi
Hi ,

I have a use case where the file path of the json records stored in s3 are
coming as a kafka
message in kafka. I have to process the data using spark structured
streaming.

The design which I thought is as follows:
1. In kafka Spark structures streaming, read the message containing the
data path.
2. Collect the message record in driver. (Messages are small in sizes)
3. Create the dataframe from the datalocation.

kafkaDf.select($"value".cast(StringType))
  .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) =>  {

//rough code

//collec to driver

val records = batchDf.collect()

//create dataframe and process
records foreach((rec: Row) =>{
  println("records:##",rec.toString())
  val path = rec.getAs[String]("data_path")

  val dfToProcess =spark.read.json(path)

  

})

}

I would like to know the views, if this approach is fine? Specifically
if there is some problem with

with creating the dataframe after calling collect.

If there is any better approach, please let know the same.


Regards

Amit Joshi


Re: Understanding Executors UI

2021-01-09 Thread Amit Sharma
I believe it’s a spark Ui issue which do not display correct value. I
believe it is resolved for spark 3.0.

Thanks
Amit

On Fri, Jan 8, 2021 at 4:00 PM Luca Canali  wrote:

> You report 'Storage Memory': 3.3TB/ 598.5 GB -> The first number is the
> memory used for storage, the second one is the available memory (for
> storage) in the unified memory pool.
>
> The used memory shown in your webui snippet is indeed quite high (higher
> than the available memory!? ), you can probably profit by drilling down on
> that to understand better what is happening.
>
> For example look at the details per executor (the numbers you reported are
> aggregated values), then also look at the “storage tab” for a list of
> cached RDDs with details.
>
> In case, Spark 3.0 has improved memory instrumentation and improved
> instrumentation for streaming, so you can you profit from testing there too.
>
>
>
>
>
> *From:* Eric Beabes 
> *Sent:* Friday, January 8, 2021 04:23
> *To:* Luca Canali 
> *Cc:* spark-user 
> *Subject:* Re: Understanding Executors UI
>
>
>
> So when I see this for 'Storage Memory': *3.3TB/ 598.5 GB* *- it's
> telling me that Spark is using 3.3 TB of memory & 598.5 GB is used for
> caching data, correct?* What I am surprised about is that these numbers
> don't change at all throughout the day even though the load on the system
> is low after 5pm PST.
>
>
>
> I would expect the "Memory used" to be lower than 3.3Tb after 5pm PST.
>
>
>
> Does Spark 3.0 do a better job of memory management? Wondering if
> upgrading to Spark 3.0 would improve performance?
>
>
>
>
>
> On Wed, Jan 6, 2021 at 2:29 PM Luca Canali  wrote:
>
> Hi Eric,
>
>
>
> A few links, in case they can be useful for your troubleshooting:
>
>
>
> The Spark Web UI is documented in Spark 3.x documentation, although you
> can use most of it for Spark 2.4 too:
> https://spark.apache.org/docs/latest/web-ui.html
>
>
>
> Spark memory management is documented at
> https://spark.apache.org/docs/latest/tuning.html#memory-management-overview
>
>
> Additional resource: see also this diagram
> https://canali.web.cern.ch/docs/SparkExecutorMemory.png  and
> https://db-blog.web.cern.ch/blog/luca-canali/2020-08-spark3-memory-monitoring
>
>
>
> Best,
>
> Luca
>
>
>
> *From:* Eric Beabes 
> *Sent:* Wednesday, January 6, 2021 00:20
> *To:* spark-user 
> *Subject:* Understanding Executors UI
>
>
>
> [image: image.png]
>
>
>
>
>
> Not sure if this image will go through. (Never sent an email to this
> mailing list with an image).
>
>
>
> I am trying to understand this 'Executors' UI in Spark 2.4. I have a
> Stateful Structured Streaming job with 'State timeout' set to 10 minutes.
> When the load on the system is low a message gets written to Kafka
> immediately after the State times out BUT under heavy load it takes over 40
> minutes to get a message on the output topic. Trying to debug this issue &
> see if performance can be improved.
>
>
>
> Questions:
>
>
>
> 1) I am requesting 3.2 TB of memory but it seems the job keeps using only
> 598.5 GB as per the values in 'Storage Memory' as well as 'On Heap Storage
> Memory'. Wondering if this is a Cluster issue OR am I not setting values
> correctly?
>
> 2) Where can I find documentation to understand different 'Tabs' in the
> Spark UI? (Sorry, Googling didn't help. I will keep searching.)
>
>
>
> Any pointers would be appreciated. Thanks.
>
>
>
>


Re: Missing required configuration "partition.assignment.strategy" [ Kafka + Spark Structured Streaming ]

2020-12-08 Thread Amit Joshi
Hi All,

Can someone pls hellp with this.

Thanks

On Tuesday, December 8, 2020, Amit Joshi  wrote:

> Hi Gabor,
>
> Pls find the logs attached. These are truncated logs.
>
> Command used :
> spark-submit --verbose --packages org.apache.spark:spark-sql-
> kafka-0-10_2.12:3.0.1,com.typesafe:config:1.4.0 --master yarn
> --deploy-mode cluster --class com.stream.Main --num-executors 2
> --driver-memory 2g --executor-cores 1 --executor-memory 4g --files
> gs://x/jars_application.conf,gs://x/log4j.properties
> gs://x/a-synch-r-1.0-SNAPSHOT.jar
> For this I used a snapshot jar, not a fat jar.
>
>
> Regards
> Amit
>
> On Mon, Dec 7, 2020 at 10:15 PM Gabor Somogyi 
> wrote:
>
>> Well, I can't do miracle without cluster and logs access.
>> What I don't understand why you need fat jar?! Spark libraries normally
>> need provided scope because it must exist on all machines...
>> I would take a look at the driver and executor logs which contains the
>> consumer configs + I would take a look at the exact version of the consumer
>> (this is printed also in the same log)
>>
>> G
>>
>>
>> On Mon, Dec 7, 2020 at 5:07 PM Amit Joshi 
>> wrote:
>>
>>> Hi Gabor,
>>>
>>> The code is very simple Kafka consumption of data.
>>> I guess, it may be the cluster.
>>> Can you please point out the possible problem toook for in the cluster?
>>>
>>> Regards
>>> Amit
>>>
>>> On Monday, December 7, 2020, Gabor Somogyi 
>>> wrote:
>>>
>>>> + Adding back user list.
>>>>
>>>> I've had a look at the Spark code and it's not modifying 
>>>> "partition.assignment.strategy"
>>>> so the problem
>>>> must be either in your application or in your cluster setup.
>>>>
>>>> G
>>>>
>>>>
>>>> On Mon, Dec 7, 2020 at 12:31 PM Gabor Somogyi <
>>>> gabor.g.somo...@gmail.com> wrote:
>>>>
>>>>> It's super interesting because that field has default value:
>>>>> *org.apache.kafka.clients.consumer.RangeAssignor*
>>>>>
>>>>> On Mon, 7 Dec 2020, 10:51 Amit Joshi, 
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Thnks for the reply.
>>>>>> I did tried removing the client version.
>>>>>> But got the same exception.
>>>>>>
>>>>>>
>>>>>> Thnks
>>>>>>
>>>>>> On Monday, December 7, 2020, Gabor Somogyi 
>>>>>> wrote:
>>>>>>
>>>>>>> +1 on the mentioned change, Spark uses the following kafka-clients
>>>>>>> library:
>>>>>>>
>>>>>>> 2.4.1
>>>>>>>
>>>>>>> G
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Dec 7, 2020 at 9:30 AM German Schiavon <
>>>>>>> gschiavonsp...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I think the issue is that you are overriding the kafka-clients that
>>>>>>>> comes with  spark-sql-kafka-0-10_2.12
>>>>>>>>
>>>>>>>>
>>>>>>>> I'd try removing the kafka-clients and see if it works
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, 6 Dec 2020 at 08:01, Amit Joshi 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> I am running the Spark Structured Streaming along with Kafka.
>>>>>>>>> Below is the pom.xml
>>>>>>>>>
>>>>>>>>> 
>>>>>>>>> 1.8
>>>>>>>>> 1.8
>>>>>>>>> UTF-8
>>>>>>>>> 
>>>>>>>>> 2.12.10
>>>>>>>>> 3.0.1
>>>>>>>>> 
>>>>>>>>>
>>>>>>>>> 
>>>>>>>>> org.apache.kafka
>>>>>>>>> kafka-clients
>>>>>>>>> 2.1.0
>>>>>>>>> 
>>>>>>>>>
>&

Re: Missing required configuration "partition.assignment.strategy" [ Kafka + Spark Structured Streaming ]

2020-12-07 Thread Amit Joshi
Hi Gabor,

Pls find the logs attached. These are truncated logs.

Command used :
spark-submit --verbose --packages
org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.typesafe:config:1.4.0
--master yarn --deploy-mode cluster --class com.stream.Main --num-executors
2 --driver-memory 2g --executor-cores 1 --executor-memory 4g --files
gs://x/jars_application.conf,gs://x/log4j.properties
gs://x/a-synch-r-1.0-SNAPSHOT.jar
For this I used a snapshot jar, not a fat jar.


Regards
Amit

On Mon, Dec 7, 2020 at 10:15 PM Gabor Somogyi 
wrote:

> Well, I can't do miracle without cluster and logs access.
> What I don't understand why you need fat jar?! Spark libraries normally
> need provided scope because it must exist on all machines...
> I would take a look at the driver and executor logs which contains the
> consumer configs + I would take a look at the exact version of the consumer
> (this is printed also in the same log)
>
> G
>
>
> On Mon, Dec 7, 2020 at 5:07 PM Amit Joshi 
> wrote:
>
>> Hi Gabor,
>>
>> The code is very simple Kafka consumption of data.
>> I guess, it may be the cluster.
>> Can you please point out the possible problem toook for in the cluster?
>>
>> Regards
>> Amit
>>
>> On Monday, December 7, 2020, Gabor Somogyi 
>> wrote:
>>
>>> + Adding back user list.
>>>
>>> I've had a look at the Spark code and it's not
>>> modifying "partition.assignment.strategy" so the problem
>>> must be either in your application or in your cluster setup.
>>>
>>> G
>>>
>>>
>>> On Mon, Dec 7, 2020 at 12:31 PM Gabor Somogyi 
>>> wrote:
>>>
>>>> It's super interesting because that field has default value:
>>>> *org.apache.kafka.clients.consumer.RangeAssignor*
>>>>
>>>> On Mon, 7 Dec 2020, 10:51 Amit Joshi, 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Thnks for the reply.
>>>>> I did tried removing the client version.
>>>>> But got the same exception.
>>>>>
>>>>>
>>>>> Thnks
>>>>>
>>>>> On Monday, December 7, 2020, Gabor Somogyi 
>>>>> wrote:
>>>>>
>>>>>> +1 on the mentioned change, Spark uses the following kafka-clients
>>>>>> library:
>>>>>>
>>>>>> 2.4.1
>>>>>>
>>>>>> G
>>>>>>
>>>>>>
>>>>>> On Mon, Dec 7, 2020 at 9:30 AM German Schiavon <
>>>>>> gschiavonsp...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I think the issue is that you are overriding the kafka-clients that
>>>>>>> comes with  spark-sql-kafka-0-10_2.12
>>>>>>>
>>>>>>>
>>>>>>> I'd try removing the kafka-clients and see if it works
>>>>>>>
>>>>>>>
>>>>>>> On Sun, 6 Dec 2020 at 08:01, Amit Joshi 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I am running the Spark Structured Streaming along with Kafka.
>>>>>>>> Below is the pom.xml
>>>>>>>>
>>>>>>>> 
>>>>>>>> 1.8
>>>>>>>> 1.8
>>>>>>>> UTF-8
>>>>>>>> 
>>>>>>>> 2.12.10
>>>>>>>> 3.0.1
>>>>>>>> 
>>>>>>>>
>>>>>>>> 
>>>>>>>> org.apache.kafka
>>>>>>>> kafka-clients
>>>>>>>> 2.1.0
>>>>>>>> 
>>>>>>>>
>>>>>>>> 
>>>>>>>> org.apache.spark
>>>>>>>> spark-core_2.12
>>>>>>>> ${sparkVersion}
>>>>>>>> provided
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> org.apache.spark
>>>>>>>> spark-sql_2.12
>>>>>>>> ${sparkVersion}
>>>>>>>>     provided
>>>>>>>> 
>>>>>

Re: Spark UI Storage Memory

2020-12-07 Thread Amit Sharma
any suggestion please.

Thanks
Amit

On Fri, Dec 4, 2020 at 2:27 PM Amit Sharma  wrote:

> Is there any memory leak in spark 2.3.3 version as mentioned in below
> Jira.
> https://issues.apache.org/jira/browse/SPARK-29055.
>
> Please let me know how to solve it.
>
> Thanks
> Amit
>
> On Fri, Dec 4, 2020 at 1:55 PM Amit Sharma  wrote:
>
>> Can someone help me on this please.
>>
>>
>> Thanks
>> Amit
>>
>> On Wed, Dec 2, 2020 at 11:52 AM Amit Sharma  wrote:
>>
>>> Hi , I have a spark streaming job. When I am checking the Excetors tab ,
>>> there is a Storage Memory column. It displays used memory  /total memory.
>>> What is used memory. Is it memory in  use or memory used so far. How would
>>> I know how much memory is unused at 1 point of time.
>>>
>>>
>>> Thanks
>>> Amit
>>>
>>


Re: Caching

2020-12-07 Thread Amit Sharma
Jayesh, but during logical plan spark would be knowing to use the same DF
twice so it will optimize the query.


Thanks
Amit

On Mon, Dec 7, 2020 at 1:16 PM Lalwani, Jayesh  wrote:

> Since DF2 is dependent on DF1, and DF3 is dependent on both DF1 and DF2,
> without caching,  Spark will read the CSV twice: Once to load it for DF1,
> and once to load it for DF2. When you add a cache on DF1 or DF2, it reads
> from CSV only once.
>
>
>
> You might want to look at doing a windowed  query on DF1 to avoid joining
> DF1 with DF2. This should give you better or similar  performance when
> compared to  cache because Spark will optimize for cache the data during
> shuffle.
>
>
>
> *From: *Amit Sharma 
> *Reply-To: *"resolve...@gmail.com" 
> *Date: *Monday, December 7, 2020 at 12:47 PM
> *To: *Theodoros Gkountouvas , "
> user@spark.apache.org" 
> *Subject: *RE: [EXTERNAL] Caching
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> Thanks for the information. I am using  spark 2.3.3 There are few more
> questions
>
>
>
> 1. Yes I am using DF1 two times but at the end action is one on DF3. In
> that case action of DF1 should be just 1 or it depends how many times this
> dataframe is used in transformation.
>
>
>
> I believe even if we use a dataframe multiple times for transformation ,
> use caching should be based on actions. In my case action is one save call
> on DF3. Please correct me if i am wrong.
>
>
>
> Thanks
>
> Amit
>
>
>
> On Mon, Dec 7, 2020 at 11:54 AM Theodoros Gkountouvas <
> theo.gkountou...@futurewei.com> wrote:
>
> Hi Amit,
>
>
>
> One action might use the same DataFrame more than once. You can look at
> your LogicalPlan by executing DF3.explain (arguments different depending
> the version of Spark you are using) and see how many times you need to
> compute DF2 or DF1. Given the information you have provided I suspect that
> DF1 is used more than once (one time at  DF2 and another one at DF3). So,
> Spark is going to cache it the first time and it will load it from cache
> instead of running it again the second time.
>
>
>
> I hope this helped,
>
> Theo.
>
>
>
> *From:* Amit Sharma 
> *Sent:* Monday, December 7, 2020 11:32 AM
> *To:* user@spark.apache.org
> *Subject:* Caching
>
>
>
> Hi All, I am using caching in my code. I have a DF like
>
> val  DF1 = read csv.
>
> val DF2 = DF1.groupBy().agg().select(.)
>
>
>
> Val DF3 =  read csv .join(DF1).join(DF2)
>
>   DF3 .save.
>
>
>
> If I do not cache DF2 or Df1 it is taking longer time  . But i am doing 1
> action only why do I need to cache.
>
>
>
> Thanks
>
> Amit
>
>
>
>
>
>


Re: Caching

2020-12-07 Thread Amit Sharma
Sean, you mean if df  is used more than once in transformation then use
cache. But be frankly that is also not true because at many places even if
df is used once with caching and without cache also it gives same result.
How to decide should we use cache or not


Thanks
Amit

On Mon, Dec 7, 2020 at 1:01 PM Sean Owen  wrote:

> No, it's not true that one action means every DF is evaluated once. This
> is a good counterexample.
>
> On Mon, Dec 7, 2020 at 11:47 AM Amit Sharma  wrote:
>
>> Thanks for the information. I am using  spark 2.3.3 There are few more
>> questions
>>
>> 1. Yes I am using DF1 two times but at the end action is one on DF3. In
>> that case action of DF1 should be just 1 or it depends how many times this
>> dataframe is used in transformation.
>>
>> I believe even if we use a dataframe multiple times for transformation ,
>> use caching should be based on actions. In my case action is one save call
>> on DF3. Please correct me if i am wrong.
>>
>> Thanks
>> Amit
>>
>> On Mon, Dec 7, 2020 at 11:54 AM Theodoros Gkountouvas <
>> theo.gkountou...@futurewei.com> wrote:
>>
>>> Hi Amit,
>>>
>>>
>>>
>>> One action might use the same DataFrame more than once. You can look at
>>> your LogicalPlan by executing DF3.explain (arguments different depending
>>> the version of Spark you are using) and see how many times you need to
>>> compute DF2 or DF1. Given the information you have provided I suspect that
>>> DF1 is used more than once (one time at  DF2 and another one at DF3). So,
>>> Spark is going to cache it the first time and it will load it from cache
>>> instead of running it again the second time.
>>>
>>>
>>>
>>> I hope this helped,
>>>
>>> Theo.
>>>
>>>
>>>
>>> *From:* Amit Sharma 
>>> *Sent:* Monday, December 7, 2020 11:32 AM
>>> *To:* user@spark.apache.org
>>> *Subject:* Caching
>>>
>>>
>>>
>>> Hi All, I am using caching in my code. I have a DF like
>>>
>>> val  DF1 = read csv.
>>>
>>> val DF2 = DF1.groupBy().agg().select(.)
>>>
>>>
>>>
>>> Val DF3 =  read csv .join(DF1).join(DF2)
>>>
>>>   DF3 .save.
>>>
>>>
>>>
>>> If I do not cache DF2 or Df1 it is taking longer time  . But i am doing
>>> 1 action only why do I need to cache.
>>>
>>>
>>>
>>> Thanks
>>>
>>> Amit
>>>
>>>
>>>
>>>
>>>
>>


Re: Caching

2020-12-07 Thread Amit Sharma
Thanks for the information. I am using  spark 2.3.3 There are few more
questions

1. Yes I am using DF1 two times but at the end action is one on DF3. In
that case action of DF1 should be just 1 or it depends how many times this
dataframe is used in transformation.

I believe even if we use a dataframe multiple times for transformation ,
use caching should be based on actions. In my case action is one save call
on DF3. Please correct me if i am wrong.

Thanks
Amit

On Mon, Dec 7, 2020 at 11:54 AM Theodoros Gkountouvas <
theo.gkountou...@futurewei.com> wrote:

> Hi Amit,
>
>
>
> One action might use the same DataFrame more than once. You can look at
> your LogicalPlan by executing DF3.explain (arguments different depending
> the version of Spark you are using) and see how many times you need to
> compute DF2 or DF1. Given the information you have provided I suspect that
> DF1 is used more than once (one time at  DF2 and another one at DF3). So,
> Spark is going to cache it the first time and it will load it from cache
> instead of running it again the second time.
>
>
>
> I hope this helped,
>
> Theo.
>
>
>
> *From:* Amit Sharma 
> *Sent:* Monday, December 7, 2020 11:32 AM
> *To:* user@spark.apache.org
> *Subject:* Caching
>
>
>
> Hi All, I am using caching in my code. I have a DF like
>
> val  DF1 = read csv.
>
> val DF2 = DF1.groupBy().agg().select(.)
>
>
>
> Val DF3 =  read csv .join(DF1).join(DF2)
>
>   DF3 .save.
>
>
>
> If I do not cache DF2 or Df1 it is taking longer time  . But i am doing 1
> action only why do I need to cache.
>
>
>
> Thanks
>
> Amit
>
>
>
>
>


Caching

2020-12-07 Thread Amit Sharma
Hi All, I am using caching in my code. I have a DF like
val  DF1 = read csv.
val DF2 = DF1.groupBy().agg().select(.)

Val DF3 =  read csv .join(DF1).join(DF2)
  DF3 .save.

If I do not cache DF2 or Df1 it is taking longer time  . But i am doing 1
action only why do I need to cache.

Thanks
Amit


Re: Missing required configuration "partition.assignment.strategy" [ Kafka + Spark Structured Streaming ]

2020-12-07 Thread Amit Joshi
Hi Gabor,

The code is very simple Kafka consumption of data.
I guess, it may be the cluster.
Can you please point out the possible problem toook for in the cluster?

Regards
Amit

On Monday, December 7, 2020, Gabor Somogyi 
wrote:

> + Adding back user list.
>
> I've had a look at the Spark code and it's not modifying 
> "partition.assignment.strategy"
> so the problem
> must be either in your application or in your cluster setup.
>
> G
>
>
> On Mon, Dec 7, 2020 at 12:31 PM Gabor Somogyi 
> wrote:
>
>> It's super interesting because that field has default value:
>> *org.apache.kafka.clients.consumer.RangeAssignor*
>>
>> On Mon, 7 Dec 2020, 10:51 Amit Joshi,  wrote:
>>
>>> Hi,
>>>
>>> Thnks for the reply.
>>> I did tried removing the client version.
>>> But got the same exception.
>>>
>>>
>>> Thnks
>>>
>>> On Monday, December 7, 2020, Gabor Somogyi 
>>> wrote:
>>>
>>>> +1 on the mentioned change, Spark uses the following kafka-clients
>>>> library:
>>>>
>>>> 2.4.1
>>>>
>>>> G
>>>>
>>>>
>>>> On Mon, Dec 7, 2020 at 9:30 AM German Schiavon <
>>>> gschiavonsp...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I think the issue is that you are overriding the kafka-clients that
>>>>> comes with  spark-sql-kafka-0-10_2.12
>>>>>
>>>>>
>>>>> I'd try removing the kafka-clients and see if it works
>>>>>
>>>>>
>>>>> On Sun, 6 Dec 2020 at 08:01, Amit Joshi 
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I am running the Spark Structured Streaming along with Kafka.
>>>>>> Below is the pom.xml
>>>>>>
>>>>>> 
>>>>>> 1.8
>>>>>> 1.8
>>>>>> UTF-8
>>>>>> 
>>>>>> 2.12.10
>>>>>> 3.0.1
>>>>>> 
>>>>>>
>>>>>> 
>>>>>> org.apache.kafka
>>>>>> kafka-clients
>>>>>> 2.1.0
>>>>>> 
>>>>>>
>>>>>> 
>>>>>> org.apache.spark
>>>>>> spark-core_2.12
>>>>>> ${sparkVersion}
>>>>>> provided
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> org.apache.spark
>>>>>> spark-sql_2.12
>>>>>> ${sparkVersion}
>>>>>> provided
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> org.apache.spark
>>>>>> spark-sql-kafka-0-10_2.12
>>>>>> ${sparkVersion}
>>>>>> 
>>>>>>
>>>>>> Building the fat jar with shade plugin. The jar is running as expected 
>>>>>> in my local setup with the command
>>>>>>
>>>>>> *spark-submit --master local[*] --class com.stream.Main --num-executors 
>>>>>> 3 --driver-memory 2g --executor-cores 2 --executor-memory 3g 
>>>>>> prism-event-synch-rta.jar*
>>>>>>
>>>>>> But when I am trying to run same jar in spark cluster using yarn with 
>>>>>> command:
>>>>>>
>>>>>> *spark-submit --master yarn --deploy-mode cluster --class 
>>>>>> com.stream.Main --num-executors 4 --driver-memory 2g --executor-cores 1 
>>>>>> --executor-memory 4g  gs://jars/prism-event-synch-rta.jar*
>>>>>>
>>>>>> Getting the this exception:
>>>>>>
>>>>>>  
>>>>>>
>>>>>>
>>>>>> *at org.apache.spark.sql.execution.streaming.StreamExecution.org 
>>>>>> <http://org.apache.spark.sql.execution.streaming.StreamExecution.org>$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:355)
>>>>>> at 
>>>>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)Caused
>>>>>>  by: org.apache.kafka.common.config.ConfigException: Missing required 
>>>>>> configuration "partition.assignment.strategy" which has no default 
>>>>>> value. at 
>>>>>> org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)*
>>>>>>
>>>>>> I have tried setting up the "partition.assignment.strategy", then also 
>>>>>> its not working.
>>>>>>
>>>>>> Please help.
>>>>>>
>>>>>>
>>>>>> Regards
>>>>>>
>>>>>> Amit Joshi
>>>>>>
>>>>>>


Re: Missing required configuration "partition.assignment.strategy" [ Kafka + Spark Structured Streaming ]

2020-12-07 Thread Amit Joshi
Hi All,

Thnks for the reply.
I did tried removing the client version.
But got the same exception.

Though one point there is some dependent artifacts which I am using, which
contains refrence to the Kafka client saw version.
I am trying to make uber jar, which will choose the closest version.

Thnks


On Monday, December 7, 2020, Gabor Somogyi 
wrote:

> +1 on the mentioned change, Spark uses the following kafka-clients library:
>
> 2.4.1
>
> G
>
>
> On Mon, Dec 7, 2020 at 9:30 AM German Schiavon 
> wrote:
>
>> Hi,
>>
>> I think the issue is that you are overriding the kafka-clients that comes
>> with  spark-sql-kafka-0-10_2.12
>>
>>
>> I'd try removing the kafka-clients and see if it works
>>
>>
>> On Sun, 6 Dec 2020 at 08:01, Amit Joshi 
>> wrote:
>>
>>> Hi All,
>>>
>>> I am running the Spark Structured Streaming along with Kafka.
>>> Below is the pom.xml
>>>
>>> 
>>> 1.8
>>> 1.8
>>> UTF-8
>>> 
>>> 2.12.10
>>> 3.0.1
>>> 
>>>
>>> 
>>> org.apache.kafka
>>> kafka-clients
>>> 2.1.0
>>> 
>>>
>>> 
>>> org.apache.spark
>>> spark-core_2.12
>>> ${sparkVersion}
>>> provided
>>> 
>>> 
>>> 
>>> org.apache.spark
>>> spark-sql_2.12
>>> ${sparkVersion}
>>> provided
>>> 
>>> 
>>> 
>>> org.apache.spark
>>> spark-sql-kafka-0-10_2.12
>>> ${sparkVersion}
>>> 
>>>
>>> Building the fat jar with shade plugin. The jar is running as expected in 
>>> my local setup with the command
>>>
>>> *spark-submit --master local[*] --class com.stream.Main --num-executors 3 
>>> --driver-memory 2g --executor-cores 2 --executor-memory 3g 
>>> prism-event-synch-rta.jar*
>>>
>>> But when I am trying to run same jar in spark cluster using yarn with 
>>> command:
>>>
>>> *spark-submit --master yarn --deploy-mode cluster --class com.stream.Main 
>>> --num-executors 4 --driver-memory 2g --executor-cores 1 --executor-memory 
>>> 4g  gs://jars/prism-event-synch-rta.jar*
>>>
>>> Getting the this exception:
>>>
>>> 
>>>
>>>
>>> *at org.apache.spark.sql.execution.streaming.StreamExecution.org 
>>> <http://org.apache.spark.sql.execution.streaming.StreamExecution.org>$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:355)
>>>at 
>>> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)Caused
>>>  by: org.apache.kafka.common.config.ConfigException: Missing required 
>>> configuration "partition.assignment.strategy" which has no default value. 
>>> at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)*
>>>
>>> I have tried setting up the "partition.assignment.strategy", then also its 
>>> not working.
>>>
>>> Please help.
>>>
>>>
>>> Regards
>>>
>>> Amit Joshi
>>>
>>>


Missing required configuration "partition.assignment.strategy" [ Kafka + Spark Structured Streaming ]

2020-12-05 Thread Amit Joshi
Hi All,

I am running the Spark Structured Streaming along with Kafka.
Below is the pom.xml


1.8
1.8
UTF-8

2.12.10
3.0.1



org.apache.kafka
kafka-clients
2.1.0



org.apache.spark
spark-core_2.12
${sparkVersion}
provided



org.apache.spark
spark-sql_2.12
${sparkVersion}
provided



org.apache.spark
spark-sql-kafka-0-10_2.12
${sparkVersion}


Building the fat jar with shade plugin. The jar is running as expected
in my local setup with the command

*spark-submit --master local[*] --class com.stream.Main
--num-executors 3 --driver-memory 2g --executor-cores 2
--executor-memory 3g prism-event-synch-rta.jar*

But when I am trying to run same jar in spark cluster using yarn with command:

*spark-submit --master yarn --deploy-mode cluster --class
com.stream.Main --num-executors 4 --driver-memory 2g --executor-cores
1 --executor-memory 4g  gs://jars/prism-event-synch-rta.jar*

Getting the this exception:




*at org.apache.spark.sql.execution.streaming.StreamExecution.org
<http://org.apache.spark.sql.execution.streaming.StreamExecution.org>$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:355)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)Caused
by: org.apache.kafka.common.config.ConfigException: Missing required
configuration "partition.assignment.strategy" which has no default
value.  at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:124)*

I have tried setting up the "partition.assignment.strategy", then also
its not working.

Please help.


Regards

Amit Joshi


Re: Spark UI Storage Memory

2020-12-04 Thread Amit Sharma
Is there any memory leak in spark 2.3.3 version as mentioned in below Jira.
https://issues.apache.org/jira/browse/SPARK-29055.

Please let me know how to solve it.

Thanks
Amit

On Fri, Dec 4, 2020 at 1:55 PM Amit Sharma  wrote:

> Can someone help me on this please.
>
>
> Thanks
> Amit
>
> On Wed, Dec 2, 2020 at 11:52 AM Amit Sharma  wrote:
>
>> Hi , I have a spark streaming job. When I am checking the Excetors tab ,
>> there is a Storage Memory column. It displays used memory  /total memory.
>> What is used memory. Is it memory in  use or memory used so far. How would
>> I know how much memory is unused at 1 point of time.
>>
>>
>> Thanks
>> Amit
>>
>


Re: Spark UI Storage Memory

2020-12-04 Thread Amit Sharma
Can someone help me on this please.


Thanks
Amit

On Wed, Dec 2, 2020 at 11:52 AM Amit Sharma  wrote:

> Hi , I have a spark streaming job. When I am checking the Excetors tab ,
> there is a Storage Memory column. It displays used memory  /total memory.
> What is used memory. Is it memory in  use or memory used so far. How would
> I know how much memory is unused at 1 point of time.
>
>
> Thanks
> Amit
>


Spark UI Storage Memory

2020-12-02 Thread Amit Sharma
Hi , I have a spark streaming job. When I am checking the Excetors tab ,
there is a Storage Memory column. It displays used memory  /total memory.
What is used memory. Is it memory in  use or memory used so far. How would
I know how much memory is unused at 1 point of time.


Thanks
Amit


Re: Cache not getting cleaned.

2020-11-21 Thread Amit Sharma
please find attached the screenshot of no active task but memory i still
used .

[image: image.png]

On Sat, Nov 21, 2020 at 4:25 PM Amit Sharma  wrote:

> I am using df.cache and also unpersisting it. But when I check spark Ui
> storage I still see cache memory usage. Do I need to do any thing else.
>
> Also in executor tab on spark Ui for each executor memory used/total
> memory always display some used memory not sure if no request on streaming
> job then usages should be 0.
>
> Thanks
> Amit
>


Cache not getting cleaned.

2020-11-21 Thread Amit Sharma
I am using df.cache and also unpersisting it. But when I check spark Ui
storage I still see cache memory usage. Do I need to do any thing else.

Also in executor tab on spark Ui for each executor memory used/total memory
always display some used memory not sure if no request on streaming job
then usages should be 0.

Thanks
Amit


Re: Spark Exception

2020-11-20 Thread Amit Sharma
Russell i increased the rpc timeout to 240 seconds but i am still getting
this issue once a while and after this issue my spark streaming job stuck
and do not process any request then i need to restart this every time. Any
suggestion please.


Thanks
Amit

On Wed, Nov 18, 2020 at 12:05 PM Amit Sharma  wrote:

> Hi, we are running a spark streaming  job and sometimes it throws below
> two exceptions . I am not understanding  what is the difference between
> these two exception for one timeout is 120 seconds and another is 600
> seconds. What could be the reason for these
>
>
>  Error running job streaming job 1605709968000 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> serialization failed: org.apache.spark.rpc.RpcTimeoutException: Futures
> timed out after [120 seconds]. This timeout is controlled by
> spark.rpc.askTimeout
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
> seconds]. This timeout is controlled by spark.rpc.askTimeout
> at org.apache.spark.rpc.RpcTimeout.org
> $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
> at
> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
> at
> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76)
> at
> org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:76)
> at org.apache.spark.storage.BlockManager.org
> $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:466)
> at org.apache.spark.storage.BlockManager.org
> $apache$spark$storage$BlockManager$$reportBlockStatus(BlockManager.scala:445)
> at
> org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1519)
> at
> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1047)
>
>
>
>
>
> 2020-11-18 14:44:03 ERROR Utils:91 - Uncaught exception in thread
> heartbeat-receiver-event-loop-thread
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [600
> seconds]. This timeout is controlled by BlockManagerHeartbeat
> at org.apache.spark.rpc.RpcTimeout.org
> $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
> at
> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
> at
> org.apache.spark.scheduler.DAGScheduler.executorHeartbeatReceived(DAGScheduler.scala:251)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:455)
> at
> org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2$$anonfun$run$2.apply$mcV$sp(HeartbeatReceiver.scala:129)
> at
> org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1361)
> at
> org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2.run(HeartbeatReceiver.scala:128)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>


Re: Out of memory issue

2020-11-20 Thread Amit Sharma
please help.


Thanks
Amit

On Mon, Nov 9, 2020 at 4:18 PM Amit Sharma  wrote:

> Please find below the exact exception
>
> Exception in thread "streaming-job-executor-3" java.lang.OutOfMemoryError:
> Java heap space
> at java.util.Arrays.copyOf(Arrays.java:3332)
> at
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
> at
> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
> at java.lang.StringBuilder.append(StringBuilder.java:136)
> at
> scala.StringContext.standardInterpolator(StringContext.scala:126)
> at scala.StringContext.s(StringContext.scala:95)
> at sparkStreaming.TRReview.getTRReviews(TRReview.scala:307)
> at
> sparkStreaming.KafkaListener$$anonfun$1$$anonfun$apply$1$$anonfun$3.apply(KafkaListener.scala:154)
> at
> sparkStreaming.KafkaListener$$anonfun$1$$anonfun$apply$1$$anonfun$3.apply(KafkaListener.scala:138)
> at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
> at scala.util.Try$.apply(Try.scala:192)
> at scala.util.Success.map(Try.scala:237)
>
> On Sun, Nov 8, 2020 at 1:35 PM Amit Sharma  wrote:
>
>> Hi , I am using 16 nodes spark cluster with below config
>> 1. Executor memory  8 GB
>> 2. 5 cores per executor
>> 3. Driver memory 12 GB.
>>
>>
>> We have streaming job. We do not see problem but sometimes we get
>> exception executor-1 heap memory issue. I am not understanding if data size
>> is same and this job receive a request and process it but suddenly it’s
>> start giving out of memory error . It will throw exception for 1 executor
>> then throw for other executor also and it stop processing the request.
>>
>> Thanks
>> Amit
>>
>


Re: Spark Exception

2020-11-20 Thread Amit Sharma
Please help.


Thanks
Amit

On Wed, Nov 18, 2020 at 12:05 PM Amit Sharma  wrote:

> Hi, we are running a spark streaming  job and sometimes it throws below
> two exceptions . I am not understanding  what is the difference between
> these two exception for one timeout is 120 seconds and another is 600
> seconds. What could be the reason for these
>
>
>  Error running job streaming job 1605709968000 ms.0
> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> serialization failed: org.apache.spark.rpc.RpcTimeoutException: Futures
> timed out after [120 seconds]. This timeout is controlled by
> spark.rpc.askTimeout
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
> seconds]. This timeout is controlled by spark.rpc.askTimeout
> at org.apache.spark.rpc.RpcTimeout.org
> $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
> at
> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
> at
> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76)
> at
> org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:76)
> at org.apache.spark.storage.BlockManager.org
> $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:466)
> at org.apache.spark.storage.BlockManager.org
> $apache$spark$storage$BlockManager$$reportBlockStatus(BlockManager.scala:445)
> at
> org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1519)
> at
> org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1047)
>
>
>
>
>
> 2020-11-18 14:44:03 ERROR Utils:91 - Uncaught exception in thread
> heartbeat-receiver-event-loop-thread
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [600
> seconds]. This timeout is controlled by BlockManagerHeartbeat
> at org.apache.spark.rpc.RpcTimeout.org
> $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
> at
> org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
> at
> org.apache.spark.scheduler.DAGScheduler.executorHeartbeatReceived(DAGScheduler.scala:251)
> at
> org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:455)
> at
> org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2$$anonfun$run$2.apply$mcV$sp(HeartbeatReceiver.scala:129)
> at
> org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1361)
> at
> org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2.run(HeartbeatReceiver.scala:128)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>


Spark Exception

2020-11-18 Thread Amit Sharma
Hi, we are running a spark streaming  job and sometimes it throws below two
exceptions . I am not understanding  what is the difference between these
two exception for one timeout is 120 seconds and another is 600 seconds.
What could be the reason for these


 Error running job streaming job 1605709968000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task
serialization failed: org.apache.spark.rpc.RpcTimeoutException: Futures
timed out after [120 seconds]. This timeout is controlled by
spark.rpc.askTimeout
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
seconds]. This timeout is controlled by spark.rpc.askTimeout
at org.apache.spark.rpc.RpcTimeout.org
$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
at
org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
at
org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:76)
at
org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:76)
at org.apache.spark.storage.BlockManager.org
$apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:466)
at org.apache.spark.storage.BlockManager.org
$apache$spark$storage$BlockManager$$reportBlockStatus(BlockManager.scala:445)
at
org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1519)
at
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1047)





2020-11-18 14:44:03 ERROR Utils:91 - Uncaught exception in thread
heartbeat-receiver-event-loop-thread
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [600
seconds]. This timeout is controlled by BlockManagerHeartbeat
at org.apache.spark.rpc.RpcTimeout.org
$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
at
org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
at
org.apache.spark.scheduler.DAGScheduler.executorHeartbeatReceived(DAGScheduler.scala:251)
at
org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:455)
at
org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2$$anonfun$run$2.apply$mcV$sp(HeartbeatReceiver.scala:129)
at
org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1361)
at
org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2.run(HeartbeatReceiver.scala:128)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


spark UI storage tab

2020-11-11 Thread Amit Sharma
Hi , I have few questions as below

1. In the spark ui storage tab is displayed 'storage level',' size in
memory' and size on disk, i am not sure it displays RDD ID 16 with memory
usage 76 MB not sure why it is not getting 0 once a request for spark
streaming is completed. I am caching some RDD inside a method and uncaching
it.

2. Similarly on Executor tab it display  'Storage Memory' used and
available, is that used means currently in use  or memory used on that
executor at some point of time (maximum memory used so far)




Thanks
Amit


Re: Out of memory issue

2020-11-09 Thread Amit Sharma
Please find below the exact exception

Exception in thread "streaming-job-executor-3" java.lang.OutOfMemoryError:
Java heap space
at java.util.Arrays.copyOf(Arrays.java:3332)
at
java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
at
java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
at java.lang.StringBuilder.append(StringBuilder.java:136)
at scala.StringContext.standardInterpolator(StringContext.scala:126)
at scala.StringContext.s(StringContext.scala:95)
at sparkStreaming.TRReview.getTRReviews(TRReview.scala:307)
at
sparkStreaming.KafkaListener$$anonfun$1$$anonfun$apply$1$$anonfun$3.apply(KafkaListener.scala:154)
at
sparkStreaming.KafkaListener$$anonfun$1$$anonfun$apply$1$$anonfun$3.apply(KafkaListener.scala:138)
at scala.util.Success$$anonfun$map$1.apply(Try.scala:237)
at scala.util.Try$.apply(Try.scala:192)
at scala.util.Success.map(Try.scala:237)

On Sun, Nov 8, 2020 at 1:35 PM Amit Sharma  wrote:

> Hi , I am using 16 nodes spark cluster with below config
> 1. Executor memory  8 GB
> 2. 5 cores per executor
> 3. Driver memory 12 GB.
>
>
> We have streaming job. We do not see problem but sometimes we get
> exception executor-1 heap memory issue. I am not understanding if data size
> is same and this job receive a request and process it but suddenly it’s
> start giving out of memory error . It will throw exception for 1 executor
> then throw for other executor also and it stop processing the request.
>
> Thanks
> Amit
>


Re: Out of memory issue

2020-11-09 Thread Amit Sharma
Can you please help.


Thanks
Amit

On Sun, Nov 8, 2020 at 1:35 PM Amit Sharma  wrote:

> Hi , I am using 16 nodes spark cluster with below config
> 1. Executor memory  8 GB
> 2. 5 cores per executor
> 3. Driver memory 12 GB.
>
>
> We have streaming job. We do not see problem but sometimes we get
> exception executor-1 heap memory issue. I am not understanding if data size
> is same and this job receive a request and process it but suddenly it’s
> start giving out of memory error . It will throw exception for 1 executor
> then throw for other executor also and it stop processing the request.
>
> Thanks
> Amit
>


Out of memory issue

2020-11-08 Thread Amit Sharma
Hi , I am using 16 nodes spark cluster with below config
1. Executor memory  8 GB
2. 5 cores per executor
3. Driver memory 12 GB.


We have streaming job. We do not see problem but sometimes we get exception
executor-1 heap memory issue. I am not understanding if data size is same
and this job receive a request and process it but suddenly it’s start
giving out of memory error . It will throw exception for 1 executor then
throw for other executor also and it stop processing the request.

Thanks
Amit


Spark reading from cassandra

2020-11-04 Thread Amit Sharma
Hi, i have a question while we are reading from cassandra should we use
partition key only in where clause from performance perspective or it does
not matter from spark perspective because it always allows filtering.


Thanks
Amit


Re: [Spark SQL] does pyspark udf support spark.sql inside def

2020-09-30 Thread Amit Joshi
Can you pls post the schema of both the tables.

On Wednesday, September 30, 2020, Lakshmi Nivedita 
wrote:

> Thank you for the clarification.I would like to how can I  proceed for
> this kind of scenario in pyspark
>
> I have a scenario subtracting the total number of days with the number of
> holidays in pyspark by using dataframes
>
> I have a table with dates  date1  date2 in one table and number of
> holidays in another table
> df1 = select date1,date2 ,ctry ,unixtimestamp(date2-date1)
> totalnumberofdays  - df2.holidays  from table A;
>
> df2 = select count(holiays)
> from table B
> where holidate >= 'date1'(table A)
> and holidate < = date2(table A)
> and country = A.ctry(table A)
>
> Except country no other column is not a unique key
>
>
>
>
> On Wed, Sep 30, 2020 at 6:05 PM Sean Owen  wrote:
>
>> No, you can't use the SparkSession from within a function executed by
>> Spark tasks.
>>
>> On Wed, Sep 30, 2020 at 7:29 AM Lakshmi Nivedita 
>> wrote:
>>
>>> Here is a spark udf structure as an example
>>>
>>> Def sampl_fn(x):
>>>Spark.sql(“select count(Id) from sample Where Id = x ”)
>>>
>>>
>>> Spark.udf.register(“sample_fn”, sample_fn)
>>>
>>> Spark.sql(“select id, sampl_fn(Id) from example”)
>>>
>>> Advance Thanks for the help
>>> --
>>> k.Lakshmi Nivedita
>>>
>>>
>>>
>>>
>>
>>


Re: Query around Spark Checkpoints

2020-09-27 Thread Amit Joshi
Hi,

As far as I know, it depends on whether you are using spark streaming or
structured streaming.
In spark streaming you can write your own code to checkpoint.
But in case of structured streaming it should be file location.
But main question in why do you want to checkpoint in
Nosql, as it's eventual consistence.


Regards
Amit

On Sunday, September 27, 2020, Debabrata Ghosh 
wrote:

> Hi,
> I had a query around Spark checkpoints - Can I store the checkpoints
> in NoSQL or Kafka instead of Filesystem ?
>
> Regards,
>
> Debu
>


Re: [pyspark 2.4] broadcasting DataFrame throws error

2020-09-18 Thread Amit Joshi
Hi Rishi,

May be you have aready done these steps.
Can you check the size of the dataframe you are trying to broadcast using
logInfo(SizeEstimator.estimate(df))
and adjust the driver similarly.

There is one more issue which I found was in spark 2.
Broadcast does not work in cache data. It is possible this may not be the
issue. You can check at your end the same problem.

https://github.com/apache/spark/blame/master/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala#L219

And can you pls tell what issue was solved in spark 3, which you are
referring.

Regards
Amit


On Saturday, September 19, 2020, Rishi Shah 
wrote:

> Thanks Amit. I have tried increasing driver memory , also tried increasing
> max result size returned to the driver. Nothing works, I believe spark is
> not able to determine the fact that the result to be broadcasted is small
> enough because input data is huge? When I tried this in 2 stages, write out
> the grouped data and use that to join using broadcast, spark has no issues
> broadcasting this.
>
> When I was checking Spark 3 documentation, it seems like this issue may
> have been addressed in Spark 3 but not in earlier version?
>
> On Thu, Sep 17, 2020 at 11:35 PM Amit Joshi 
> wrote:
>
>> Hi,
>>
>> I think problem lies with driver memory. Broadcast in spark work by
>> collecting all the data to driver and then driver broadcasting to all the
>> executors. Different strategy could be employed for trasfer like bit
>> torrent though.
>>
>> Please try increasing the driver memory. See if it works.
>>
>> Regards,
>> Amit
>>
>>
>> On Thursday, September 17, 2020, Rishi Shah 
>> wrote:
>>
>>> Hello All,
>>>
>>> Hope this email finds you well. I have a dataframe of size 8TB (parquet
>>> snappy compressed), however I group it by a column and get a much smaller
>>> aggregated dataframe of size 700 rows (just two columns, key and count).
>>> When I use it like below to broadcast this aggregated result, it throws
>>> dataframe can not be broadcasted error.
>>>
>>> df_agg = df.groupBy('column1').count().cache()
>>> # df_agg.count()
>>> df_join = df.join(broadcast(df_agg), 'column1', 'left_outer')
>>> df_join.write.parquet('PATH')
>>>
>>> The same code works with input df size of 3TB without any modifications.
>>>
>>> Any suggestions?
>>>
>>> --
>>> Regards,
>>>
>>> Rishi Shah
>>>
>>
>
> --
> Regards,
>
> Rishi Shah
>


Re: [pyspark 2.4] broadcasting DataFrame throws error

2020-09-17 Thread Amit Joshi
Hi,

I think problem lies with driver memory. Broadcast in spark work by
collecting all the data to driver and then driver broadcasting to all the
executors. Different strategy could be employed for trasfer like bit
torrent though.

Please try increasing the driver memory. See if it works.

Regards,
Amit


On Thursday, September 17, 2020, Rishi Shah 
wrote:

> Hello All,
>
> Hope this email finds you well. I have a dataframe of size 8TB (parquet
> snappy compressed), however I group it by a column and get a much smaller
> aggregated dataframe of size 700 rows (just two columns, key and count).
> When I use it like below to broadcast this aggregated result, it throws
> dataframe can not be broadcasted error.
>
> df_agg = df.groupBy('column1').count().cache()
> # df_agg.count()
> df_join = df.join(broadcast(df_agg), 'column1', 'left_outer')
> df_join.write.parquet('PATH')
>
> The same code works with input df size of 3TB without any modifications.
>
> Any suggestions?
>
> --
> Regards,
>
> Rishi Shah
>


Re: Submitting Spark Job thru REST API?

2020-09-02 Thread Amit Joshi
Hi,
There is other option like apache Livy which lets you submit the job using
Rest api.
Other option can be using AWS Datapipeline to configure your job as EMR
activity.
To activate pipeline, you need console or a program.

Regards
Amit

On Thursday, September 3, 2020, Eric Beabes 
wrote:

> Under Spark 2.4 is it possible to submit a Spark job thru REST API - just
> like the Flink job?
>
> Here's the use case: We need to submit a Spark Job to the EMR cluster but
> our security team is not allowing us to submit a job from the Master node
> or thru UI. They want us to create a "Docker Container" to submit a job.
>
> If it's possible to submit the Spark job thru REST then we don't need to
> install Spark/Hadoop JARs on the Container. If it's not possible to use
> REST API, can we do something like this?
>
> spark-2.4.6-bin-hadoop2.7/bin/spark-submit \
>  --class myclass --master "yarn url" --deploy-mode cluster \
>
> In other words, instead of --master yarn, specify a URL. Would this still
> work the same way?
>


Re: [Spark Kafka Structured Streaming] Adding partition and topic to the kafka dynamically

2020-08-28 Thread Amit Joshi
Hi Jungtaek,

Thanks for the input. I did tried and it worked.
I got confused earlier after reading some blogs.

Regards
Amit

On Friday, August 28, 2020, Jungtaek Lim 
wrote:

> Hi Amit,
>
> if I remember correctly, you don't need to restart the query to reflect
> the newly added topic and partition, if your subscription covers the topic
> (like subscribe pattern). Please try it out.
>
> Hope this helps.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> On Fri, Aug 28, 2020 at 1:56 PM Amit Joshi 
> wrote:
>
>> Any pointers will be appreciated.
>>
>> On Thursday, August 27, 2020, Amit Joshi 
>> wrote:
>>
>>> Hi All,
>>>
>>> I am trying to understand the effect of adding topics and partitions to
>>> a topic in kafka, which is being consumed by spark structured streaming
>>> applications.
>>>
>>> Do we have to restart the spark structured streaming application to read
>>> from the newly added topic?
>>> Do we have to restart the spark structured streaming application to read
>>> from the newly added partition to a topic?
>>>
>>> Kafka consumers have a meta data refresh property that works without
>>> restarting.
>>>
>>> Thanks advance.
>>>
>>> Regards
>>> Amit Joshi
>>>
>>


Re: [Spark Kafka Structured Streaming] Adding partition and topic to the kafka dynamically

2020-08-27 Thread Amit Joshi
Any pointers will be appreciated.

On Thursday, August 27, 2020, Amit Joshi  wrote:

> Hi All,
>
> I am trying to understand the effect of adding topics and partitions to a
> topic in kafka, which is being consumed by spark structured streaming
> applications.
>
> Do we have to restart the spark structured streaming application to read
> from the newly added topic?
> Do we have to restart the spark structured streaming application to read
> from the newly added partition to a topic?
>
> Kafka consumers have a meta data refresh property that works without
> restarting.
>
> Thanks advance.
>
> Regards
> Amit Joshi
>


[Spark Kafka Structured Streaming] Adding partition and topic to the kafka dynamically

2020-08-27 Thread Amit Joshi
Hi All,

I am trying to understand the effect of adding topics and partitions to a
topic in kafka, which is being consumed by spark structured streaming
applications.

Do we have to restart the spark structured streaming application to read
from the newly added topic?
Do we have to restart the spark structured streaming application to read
from the newly added partition to a topic?

Kafka consumers have a meta data refresh property that works without
restarting.

Thanks advance.

Regards
Amit Joshi


Driver Information

2020-08-17 Thread Amit Sharma
Hi, I have 20 node clusters. I run multiple batch jobs. in spark submit
file ,driver memory=2g and executor memory=4g and I have 8 GB worker. I
have below questions

1. Is there any way I know in each batch job which worker is the driver
node?
2. Will the driver node be part of one of the executors or it is
independent ?



Thanks
Amit


Re: help on use case - spark parquet processing

2020-08-13 Thread Amit Sharma
Can you keep option field in your case class.


Thanks
Amit

On Thu, Aug 13, 2020 at 12:47 PM manjay kumar 
wrote:

> Hi ,
>
> I have a use case,
>
> where i need to merge three data set and build one where ever data is
> available.
>
> And my dataset is a complex object.
>
> Customer
> - name - string
> - accounts - List
>
> Account
> - type - String
> - Adressess - List
>
> Address
> -name - String
>
> 
>
> ---
>
>
> And it goes on.
>
> These file are in parquet ,
>
>
> All 3 input datasets are having some details , which need to merge.
>
> And build one dataset , which has all the information ( i know the files
> which need to merge )
>
>
> I want to know , how should I proceed on this  ??
>
> - my approach is to build case class of actual output and parse the three
> dataset.
>  ( but this is failing because the input response have not all the fields).
>
> So basically , what should be the approach to deal this kind of problem ?
>
> 2nd , how can i convert parquet dataframe to dataset, considering the
> pauquet struct does not have all the fields. but case class has all the
> field ( i am getting error no struct type found)
>
> Thanks
> Manjay Kumar
> 8320 120 839
>
>
>


[Spark-Kafka-Streaming] Verifying the approach for multiple queries

2020-08-09 Thread Amit Joshi
Hi,

I have a scenario where a kafka topic is being written with different types
of json records.
I have to regroup the records based on the type and then fetch the schema
and parse and write as parquet.
I have tried structured programming. But dynamic schema is a constraint.
So I have used DStreams and though I know the approach I have taken may not
be good.
If anyone can pls let me know if the approach will scale and possible pros
and cons.
I am collecting the grouped records and then again forming the dataframe
for each grouped record.
createKeyValue -> This is creating the key value pair with schema
information.

stream.foreachRDD { (rdd, time) =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  val result = rdd.map(createKeyValue).reduceByKey((x,y) => x ++ y).collect()
  result.foreach(x=> println(x._1))
  result.map(x=> {
val spark =
SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val df = x._2 toDF("value")
df.select(from_json($"value", x._1._2, Map.empty[String,String]).as("data"))
  .select($"data.*")
  //.withColumn("entity", lit("invoice"))
  .withColumn("year",year($"TimeUpdated"))
  .withColumn("month",month($"TimeUpdated"))
  .withColumn("day",dayofmonth($"TimeUpdated"))
  
.write.partitionBy("name","year","month","day").mode("append").parquet(path)
  })
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}


Re: Spark batch job chaining

2020-08-08 Thread Amit Sharma
Any help is appreciated. I have spark batch job based on condition I would
like to start another batch job by invoking .sh file. Just want to know can
we achieve that?

Thanks
Amit

On Fri, Aug 7, 2020 at 3:58 PM Amit Sharma  wrote:

> Hi, I want to write a batch job which would call another batch job based
> on condition. Can I call one batch job through another in scala or I can do
> it just by python script. Example would be really helpful.
>
>
> Thanks
> Amit
>
>
>


Spark batch job chaining

2020-08-07 Thread Amit Sharma
Hi, I want to write a batch job which would call another batch job based on
condition. Can I call one batch job through another in scala or I can do it
just by python script. Example would be really helpful.


Thanks
Amit


[SPARK-STRUCTURED-STREAMING] IllegalStateException: Race while writing batch 4

2020-08-07 Thread Amit Joshi
Hi,

I have 2spark structure streaming queries writing to the same outpath in
object storage.
Once in a while I am getting the "IllegalStateException: Race while writing
batch 4".
I found that this error is because there are two writers writing to the
output path. The file streaming sink doesn't support multiple writers.
It assumes there is only one writer writing to the path. Each query needs
to use its own output directory.

Is there a way to write the output to the same path by both queries, as I
need the output at the same path.?

Regards
Amit Joshi


[SPARK-SQL] How to return GenericInternalRow from spark udf

2020-08-06 Thread Amit Joshi
Hi,

<https://stackoverflow.com/posts/63277463/timeline>

I have a spark udf written in scala that takes couuple of columns and apply
some logic and output InternalRow. There is spark schema of StructType also
present. But when I try to return the InternalRow from UDF there is
exception

java.lang.UnsupportedOperationException: Schema for type
org.apache.spark.sql.catalyst.GenericInternalRow is not supported

  val getData = (hash : String, type : String) => {
val schema = hash match {
  case "people" =>
peopleSchema
  case "empl" =>  emplSchema
}
getGenericInternalRow(schema)
  }

  val data = udf(getData)

Spark Version : 2.4.5


Please Help.


Regards

Amit Joshi


how to copy from one cassandra cluster to another

2020-07-28 Thread Amit Sharma
Hi, I have table A in the cassandra cluster  cluster -1  in one data
center. I have table B in cluster -2 in another data center. I want to copy
the data from one cluster to another using spark. I faced the problem that
I can not create two spark sessions as we need spark sessions per cluster.
Please let me know if there is any way to use spark batch job to copy data
among two cassandra clusters.


Thanks
Amit


spark exception

2020-07-24 Thread Amit Sharma
Hi All, sometimes i get this error in spark logs. I notice few executors
are shown as dead in the executor tab during this error. Although my job
get success. Please help me out the root cause of this issue. I have 3
workers with 30 cores each and 64 GB RAM each. My job uses 3 cores per
executor and uses a total of 63 cores and 4GB RAM per executor.

Remote RPC client disassociated. Likely due to containers exceeding
thresholds, or network issues. Check driver logs for WARN messages


Re: Garbage collection issue

2020-07-20 Thread Amit Sharma
Please help on this.


Thanks
Amit

On Fri, Jul 17, 2020 at 2:34 PM Amit Sharma  wrote:

> Hi All, i am running the same batch job in my two separate spark clusters.
> In one of the clusters it is showing GC warning  on spark -ui  under
> executer tag. Garbage collection is taking longer time around 20 %  while
> in another cluster it is under 10 %. I am using the same configuration in
> my spark submit and using G1GC .
>
> Please let me know what could be the reason for GC slowness.
>
>
> Thanks
> Amit
>


Re: Future timeout

2020-07-20 Thread Amit Sharma
Please help on this.


Thanks
Amit

On Fri, Jul 17, 2020 at 9:10 AM Amit Sharma  wrote:

> Hi, sometimes my spark streaming job throw this exception  Futures timed
> out after [300 seconds].
> I am not sure where is the default timeout configuration. Can i increase
> it. Please help.
>
>
>
> Thanks
> Amit
>
>
>
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [300 seconds]
> at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at
> org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
> at
> org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
> at
> org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:372)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at
> org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
> at
> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:116)
> at
> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:257)
> at
> org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:101)
> at
> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:186)
> at
> org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:35)
> at
> org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:65)
> at
> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:186)
> at
> org.apache.spark.sql.execution.SerializeFromObjectExec.consume(objects.scala:101)
> at
> org.apache.spark.sql.execution.SerializeFromObjectExec.doConsume(objects.scala:121)
> at
> org.apache.spark.sql.execution.CodegenSupport$class.constructDoConsumeFunction(WholeStageCodegenExec.scala:213)
> at
> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:184)
> at
> org.apache.spark.sql.execution.MapElementsExec.consume(objects.scala:200)
> at
> org.apache.spark.sql.execution.MapElementsExec.doConsume(objects.scala:224)
> at
> org.apache.spark.sql.execution.CodegenSupport$class.constructDoConsumeFunction(WholeStageCodegenExec.scala:213)
> at
> org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:184)
> at
> org.apache.spark.sql.execution.DeserializeToObjectExec.consume(objects.scala:68)
>


Garbage collection issue

2020-07-17 Thread Amit Sharma
Hi All, i am running the same batch job in my two separate spark clusters.
In one of the clusters it is showing GC warning  on spark -ui  under
executer tag. Garbage collection is taking longer time around 20 %  while
in another cluster it is under 10 %. I am using the same configuration in
my spark submit and using G1GC .

Please let me know what could be the reason for GC slowness.


Thanks
Amit


Future timeout

2020-07-17 Thread Amit Sharma
Hi, sometimes my spark streaming job throw this exception  Futures timed
out after [300 seconds].
I am not sure where is the default timeout configuration. Can i increase
it. Please help.



Thanks
Amit



Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[300 seconds]
at
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
at
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:136)
at
org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:372)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:144)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:140)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at
org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:140)
at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:116)
at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:257)
at
org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:101)
at
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:186)
at
org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:35)
at
org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:65)
at
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:186)
at
org.apache.spark.sql.execution.SerializeFromObjectExec.consume(objects.scala:101)
at
org.apache.spark.sql.execution.SerializeFromObjectExec.doConsume(objects.scala:121)
at
org.apache.spark.sql.execution.CodegenSupport$class.constructDoConsumeFunction(WholeStageCodegenExec.scala:213)
at
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:184)
at
org.apache.spark.sql.execution.MapElementsExec.consume(objects.scala:200)
at
org.apache.spark.sql.execution.MapElementsExec.doConsume(objects.scala:224)
at
org.apache.spark.sql.execution.CodegenSupport$class.constructDoConsumeFunction(WholeStageCodegenExec.scala:213)
at
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:184)
at
org.apache.spark.sql.execution.DeserializeToObjectExec.consume(objects.scala:68)


Cassandra raw deletion

2020-07-04 Thread Amit Sharma
Hi, I have to delete certain raw from Cassandra during my spark batch
process. Is there any way to delete Rawat using spark Cassandra connector.

Thanks
Amit


Truncate table

2020-07-01 Thread Amit Sharma
Hi, i have scenario where i have to read certain raw from a table and
truncate the table and store the certain raws back to the table. I am doing
below steps

1. reading certain raws in DF1 from cassandra table A.
2. saving into cassandra as override in table A


the problem is when I truncate the table at step 2 I will lose the data  in
DF1 as it shows empty.
I have two solutions
1. Store the DF1 in another temp table before truncating table A
2. Cache DF1 before truncating.

Do we have any better solution ?


Thanks
Amit


unsubscribe

2020-01-02 Thread Amit Jain
unsubscribe


Unsubscribe

2019-12-13 Thread Amit Jain
Unsubscribe


No of cores per executor.

2019-12-08 Thread Amit Sharma
I have set  5 cores per executor. Is there any formula to determine best
combination of executor and cores and memory per core for better
performance. Also when I am running local spark instance in my web jar
getting better speed than running in cluster.



Thanks
Amit


issue with regexp_replace

2019-10-26 Thread amit kumar singh
Hi Team,


I am trying to use regexp_replace in spark sql  it throwing error

expected , but found Scalar
 in 'reader', line 9, column 45:
 ... select translate(payload, '"', '"') as payload


i am trying to remove all character  from \\\"  with "


Re: spark streaming exception

2019-10-17 Thread Amit Sharma
Please update me if any one knows about it.


Thanks
Amit

On Thu, Oct 10, 2019 at 3:49 PM Amit Sharma  wrote:

> Hi , we have spark streaming job to which we send a request through our UI
> using kafka. It process and returned the response. We are getting below
> error and this stareming is not processing any request.
>
> Listener StreamingJobProgressListener threw an exception
> java.util.NoSuchElementException: key not found: 1570689515000 ms
> at scala.collection.MapLike$class.default(MapLike.scala:228)
> at scala.collection.AbstractMap.default(Map.scala:59)
> at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
> at
> org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:134)
> at
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
> at
> org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29).
>
> Please help me in find out the root cause of this issue.
>


convert josn string in spark sql

2019-10-16 Thread amit kumar singh
Hi Team,

I have kafka messages where json is coming as string  how can create table
after converting json string to json using spark sql


spark streaming exception

2019-10-10 Thread Amit Sharma
Hi , we have spark streaming job to which we send a request through our UI
using kafka. It process and returned the response. We are getting below
error and this stareming is not processing any request.

Listener StreamingJobProgressListener threw an exception
java.util.NoSuchElementException: key not found: 1570689515000 ms
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.mutable.HashMap.apply(HashMap.scala:65)
at
org.apache.spark.streaming.ui.StreamingJobProgressListener.onOutputOperationCompleted(StreamingJobProgressListener.scala:134)
at
org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:67)
at
org.apache.spark.streaming.scheduler.StreamingListenerBus.doPostEvent(StreamingListenerBus.scala:29).

Please help me in find out the root cause of this issue.


Re: Driver vs master

2019-10-07 Thread Amit Sharma
Thanks Andrew but I am asking specific to driver memory not about executors
memory. We have just one master and if each jobs driver.memory=4g and
master nodes total memory is 16gb then we can not execute more than 4 jobs
at a time.

On Monday, October 7, 2019, Andrew Melo  wrote:

> Hi Amit
>
> On Mon, Oct 7, 2019 at 18:33 Amit Sharma  wrote:
>
>> Can you please help me understand this. I believe driver programs runs on
>> master node
>
> If we are running 4 spark job and driver memory config is 4g then total 16
>> 6b would be used of master node.
>
>
> This depends on what master/deploy mode you're using: if it's "local"
> master and "client mode" then yes tasks execute in the same JVM as the
> driver. In this case though, the driver JVM uses whatever much space is
> allocated for the driver regardless of how many threads you have.
>
>
> So if we will run more jobs then we need more memory on master. Please
>> correct me if I am wrong.
>>
>
> This depends on your application, but in general more threads will require
> more memory.
>
>
>
>>
>> Thanks
>> Amit
>>
> --
> It's dark in this basement.
>


Driver vs master

2019-10-07 Thread Amit Sharma
Can you please help me understand this. I believe driver programs runs on
master node. If we are running 4 spark job and driver memory config is 4g
then total 16 6b would be used of master node. So if we will run more jobs
then we need more memory on master. Please correct me if I am wrong.


Thanks
Amit


Re: Memory Limits error

2019-08-16 Thread Amit Sharma
Increasing your driver memory as 12g.

On Thursday, August 15, 2019, Dennis Suhari 
wrote:

> Hi community,
>
> I am using Spark on Yarn. When submiting a job after a long time I get an
> error mesage and retry.
>
> It happens when I want to store the dataframe to a table.
>
> spark_df.write.option("path", "/nlb_datalake/golden_zone/
> webhose/sentiment").saveAsTable("news_summary_test", mode="overwrite")
>
> The error is (after long time):
>
>  Hive Session ID = be590d1b-ed5b-404b-bcb4-77cbb977a847 [Stage 2:> (0 +
> 16) / 16]19/08/15 15:42:08 WARN BlockManagerMasterEndpoint: No more
> replicas available for rdd_9_2 ! 19/08/15 15:42:08 WARN
> BlockManagerMasterEndpoint: No more replicas available for rdd_9_1 !
> 19/08/15 15:42:08 WARN BlockManagerMasterEndpoint: No more replicas
> available for rdd_9_4 ! 19/08/15 15:42:08 WARN BlockManagerMasterEndpoint:
> No more replicas available for rdd_9_6 ! 19/08/15 15:42:08 WARN
> BlockManagerMasterEndpoint: No more replicas available for rdd_9_7 !
> 19/08/15 15:42:08 WARN BlockManagerMasterEndpoint: No more replicas
> available for rdd_9_0 ! 19/08/15 15:42:08 WARN BlockManagerMasterEndpoint:
> No more replicas available for rdd_9_5 ! 19/08/15 15:42:08 WARN
> BlockManagerMasterEndpoint: No more replicas available for rdd_9_3 !
> 19/08/15 15:42:08 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
> Requesting driver to remove executor 2 for reason Container killed by YARN
> for exceeding memory limits. 9.1 GB of 9 GB physical memory used. Consider
> boosting spark.yarn.executor.memoryOverhead. 19/08/15 15:42:08 ERROR
> YarnScheduler: Lost executor 2 on nlb-srv-hd-08.i-lab.local: Container
> killed by YARN for exceeding memory limits. 9.1 GB of 9 GB physical memory
> used. Consider boosting spark.yarn.executor.memoryOverhead. 19/08/15
> 15:42:08 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 17,
> nlb-srv-hd-08.i-lab.local, executor 2): ExecutorLostFailure (executor 2
> exited caused by one of the running tasks) Reason: Container killed by YARN
> for exceeding memory limits. 9.1 GB of 9 GB physical memory used. Consider
> boosting spark.yarn.executor.memoryOverhead. 19/08/15 15:42:08 WARN
> TaskSetManager: Lost task 5.0 in stage 2.0 (TID 26,
> nlb-srv-hd-08.i-lab.local, executor 2): ExecutorLostFailure (executor 2
> exite
>
> Do you have a rough idea where to tweak ?
>
> Br,
>
> Dennis
>


Spark Streaming concurrent calls

2019-08-13 Thread Amit Sharma
I am using kafka spark streming. My UI application send request to
streaming through kafka. Problem is streaming handles one request at a time
so if multiple users send request at the same time they have to wait till
earlier request are done.
Is there any way it can handle multiple request.


Thanks
Amit


spark job getting hang

2019-08-05 Thread Amit Sharma
I am running spark job and if i run it sometimes it ran successfully but
most of the time getting

 ERROR Dropping event from queue appStatus. This likely means one of the
listeners is too slow and cannot keep up with the rate at which tasks are
being started by the scheduler.
(org.apache.spark.scheduler.AsyncEventQueue).


Please suggest what  how to debug this issue.


Thanks
Amit


Core allocation is scattered

2019-07-25 Thread Amit Sharma
I have cluster with 26 nodes having 16 cores on each. I am running a spark
job with 20 cores but i did not understand why my application get 1-2 cores
on couple of machines why not it just run on two nodes like node1=16 cores
and node 2=4 cores . but cores are allocated like node1=2 node
=1-node 14=1 like that. Is there any conf property i need to
change. I know with dynamic allocation we can use below but without dynamic
allocation is there any?
--conf "spark.dynamicAllocation.maxExecutors=2"


Thanks
Amit


Re: spark dataset.cache is not thread safe

2019-07-22 Thread Amit Sharma
please update me if any one knows how to handle it.

On Sun, Jul 21, 2019 at 7:18 PM Amit Sharma  wrote:

> Hi , I wrote a code in future block which read data from dataset and cache
> it which is used later in the code. I faced a issue that data.cached() data
> will be replaced by concurrent running thread . Is there any way we can
> avoid this condition.
>
> val dailyData = callDetailsDS.collect.toList
> val adjustedData = dailyData.map(callDataPerDay => Future{
>
>
>
>   val data = callDetailsDS.filter((callDetailsDS(DateColumn) geq (some 
> conditional date ))
> data.cache()
>
> 
>
> }
>
>
>


spark dataset.cache is not thread safe

2019-07-21 Thread Amit Sharma
Hi , I wrote a code in future block which read data from dataset and cache
it which is used later in the code. I faced a issue that data.cached() data
will be replaced by concurrent running thread . Is there any way we can
avoid this condition.

val dailyData = callDetailsDS.collect.toList
val adjustedData = dailyData.map(callDataPerDay => Future{



  val data = callDetailsDS.filter((callDetailsDS(DateColumn) geq (some
conditional date ))
data.cache()



}


Re: spark standalone mode problem about executor add and removed again and again!

2019-07-17 Thread Amit Sharma
Do you have dynamic resource allocation enabled?


On Wednesday, July 17, 2019, zenglong chen  wrote:

> Hi,all,
> My standalone mode has two slaves.When I submit my job,the
> localhost slave is working well,but second slave do add and remove executor
> action always!The log are below:
>2019-07-17 10:51:38,889 INFO client.StandaloneAppClient$ClientEndpoint:
> Executor updated: app-20190717105135-0008/2 is now EXITED (Command exited
> with code 1)
> 2019-07-17 10:51:38,890 INFO cluster.StandaloneSchedulerBackend: Executor
> app-20190717105135-0008/2 removed: Command exited with code 1
> 2019-07-17 10:51:38,890 INFO storage.BlockManagerMasterEndpoint: Trying
> to remove executor 2 from BlockManagerMaster.
> 2019-07-17 10:51:38,890 INFO storage.BlockManagerMaster: Removal of
> executor 2 requested
> 2019-07-17 10:51:38,891 INFO 
> cluster.CoarseGrainedSchedulerBackend$DriverEndpoint:
> Asked to remove non-existent executor 2
> 2019-07-17 10:51:38,892 INFO client.StandaloneAppClient$ClientEndpoint:
> Executor added: app-20190717105135-0008/3 on 
> worker-20190717093045-172.22.9.179-40573
> (172.22.9.179:40573) with 8 core(s)
> 2019-07-17 10:51:38,892 INFO cluster.StandaloneSchedulerBackend: Granted
> executor ID app-20190717105135-0008/3 on hostPort 172.22.9.179:40573 with
> 8 core(s), 12.0 GB RAM
> 2019-07-17 10:51:38,893 INFO client.StandaloneAppClient$ClientEndpoint:
> Executor updated: app-20190717105135-0008/3 is now RUNNING
> 2019-07-17 10:51:40,521 INFO client.StandaloneAppClient$ClientEndpoint:
> Executor updated: app-20190717105135-0008/3 is now EXITED (Command exited
> with code 1)
> 2019-07-17 10:51:40,521 INFO cluster.StandaloneSchedulerBackend: Executor
> app-20190717105135-0008/3 removed: Command exited with code 1
> 2019-07-17 10:51:40,521 INFO client.StandaloneAppClient$ClientEndpoint:
> Executor added: app-20190717105135-0008/4 on 
> worker-20190717093045-172.22.9.179-40573
> (172.22.9.179:40573) with 8 core(s)
> 2019-07-17 10:51:40,521 INFO storage.BlockManagerMaster: Removal of
> executor 3 requested
> 2019-07-17 10:51:40,521 INFO 
> cluster.CoarseGrainedSchedulerBackend$DriverEndpoint:
> Asked to remove non-existent executor 3
> 2019-07-17 10:51:40,521 INFO storage.BlockManagerMasterEndpoint: Trying
> to remove executor 3 from BlockManagerMaster.
> 2019-07-17 10:51:40,521 INFO cluster.StandaloneSchedulerBackend: Granted
> executor ID app-20190717105135-0008/4 on hostPort 172.22.9.179:40573 with
> 8 core(s), 12.0 GB RAM
> 2019-07-17 10:51:40,523 INFO client.StandaloneAppClient$ClientEndpoint:
> Executor updated: app-20190717105135-0008/4 is now RUNNING
>
>
> And the slave output are below:
>19/07/17 10:47:12 INFO ExecutorRunner: Launch command:
> "/home/ubuntu/data/jdk/jre/bin/java" "-cp" "/home/ubuntu/spark-2.4.3/
> conf/:/home/ubuntu/spark-2.4.3/jars/*" "-Xmx12288M"
> "-Dspark.driver.port=40335" 
> "org.apache.spark.executor.CoarseGrainedExecutorBackend"
> "--driver-url" "spark://CoarseGrainedScheduler@iZk1a7vdbutmi6eluaskecZ:40335"
> "--executor-id" "18" "--hostname" "172.22.9.179" "--cores" "8" "--app-id"
> "app-20190717104645-0007" "--worker-url" "spark://Worker@172.22.9.179:
> 40573"
> 19/07/17 10:47:13 INFO Worker: Executor app-20190717104645-0007/18
> finished with state EXITED message Command exited with code 1 exitStatus 1
> 19/07/17 10:47:13 INFO ExternalShuffleBlockResolver: Clean up non-shuffle
> files associated with the finished executor 18
> 19/07/17 10:47:13 INFO ExternalShuffleBlockResolver: Executor is not
> registered (appId=app-20190717104645-0007, execId=18)
> 19/07/17 10:47:13 INFO Worker: Asked to launch executor
> app-20190717104645-0007/19 for ph_user_pre_level
> 19/07/17 10:47:13 INFO SecurityManager: Changing view acls to: ubuntu
> 19/07/17 10:47:13 INFO SecurityManager: Changing modify acls to: ubuntu
> 19/07/17 10:47:13 INFO SecurityManager: Changing view acls groups to:
> 19/07/17 10:47:13 INFO SecurityManager: Changing modify acls groups to:
> 19/07/17 10:47:13 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users  with view permissions: Set(ubuntu);
> groups with view permissions: Set(); users  with modify permissions:
> Set(ubuntu); groups with modify permissions: Set()
> 19/07/17 10:47:14 INFO ExecutorRunner: Launch command:
> "/home/ubuntu/data/jdk/jre/bin/java" "-cp" "/home/ubuntu/spark-2.4.3/
> conf/:/home/ubuntu/spark-2.4.3/jars/*" "-Xmx12288M"
> "-Dspark.driver.port=40335" 
> "org.apache.spark.executor.CoarseGrainedExecutorBackend"
> "--driver-url" "spark://CoarseGrainedScheduler@iZk1a7vdbutmi6eluaskecZ:40335"
> "--executor-id" "19" "--hostname" "172.22.9.179" "--cores" "8" "--app-id"
> "app-20190717104645-0007" "--worker-url" "spark://Worker@172.22.9.179:
> 40573"
>
> I guest that  may be  "Dspark.driver.port=40335" problem.
> Any suggests will help me a lot!
>


Dynamic allocation not working

2019-07-08 Thread Amit Sharma
Hi All, i have set the dynamic allocation propertt = true in my script file
and also shuffle property in script as well as on all worker nodes
spark-env file. I am using spark kafka streaming. I checked that as request
comes no of cores allocation increase but even after request is completed
no of cores are not getting released. Please help me to to know what
property i missed other than those 2 properties .


Spark-cluster slowness

2019-06-20 Thread Amit Sharma
I have spark cluster on two data centers each. Cluster on spark cluster B
is 6 times slower than cluster A. I ran the same job on both cluster and
time difference is of 6 times. I used the same config and using spark
2.3.3. I checked  that on spark UI it displays the slaves nodes but when i
check under Executor tab i saw all the nodes there but do not see active
tasks while task status is active. Please help me to find the root cause.

Thanks
Amit


Spark Kafka Streaming stopped

2019-06-14 Thread Amit Sharma
we are using spark kafka streaming. We have 6 nodes in kafka cluster if any
of the node is getting down we are getting below exception and streaming
stopped.
ERROR DirectKafkaInputDStream:70 -
ArrayBuffer(kafka.common.NotLeaderForPartitionException,
kafka.common.NotLeaderForPartitionException,
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([techops-prod2,4],
[techops-prod2,0]))

Please let me know do we missed any setting so that streaming should not
stopped even if couple of Kafka nodes are down.


Thanks
Amit


Re: Spark kafka streaming job stopped

2019-06-11 Thread Amit Sharma
Please provide update if any one knows.

On Monday, June 10, 2019, Amit Sharma  wrote:

>
> We have spark kafka sreaming job running on standalone spark cluster. We
> have below kafka architecture
>
> 1. Two cluster running on two data centers.
> 2. There is LTM on top on each data center (load balance)
> 3. There is GSLB on top of LTM.
>
> I observed when ever any of the node in kafka cluster is down  our spark
> stream job stopped. We are using GLSB url in our code to connect to Kafka
> not the IP address. Please let me know is it expected behavior if not then
> what config we need to change.
>
> Thanks
> Amit
>


  1   2   3   >