Late arriving updates to fact tables

2023-02-25 Thread rajat kumar
Hi Users,

We are getting updates in Kafka Topic(Through CDC). Can you please tell how
do I correct/replay/reprocess the late arriving records in Data lake?

Thanks
Rajat


Re: Profiling data quality with Spark

2022-12-28 Thread rajat kumar
Thanks for the input folks.

Hi Vaquar ,

I saw that we have various types of checks in GE and Deequ. Could you
please suggest what types of check did you use for Metric based columns


Regards
Rajat

On Wed, Dec 28, 2022 at 12:15 PM vaquar khan  wrote:

> I would suggest Deequ , I have implemented many time easy and effective.
>
>
> Regards,
> Vaquar khan
>
> On Tue, Dec 27, 2022, 10:30 PM ayan guha  wrote:
>
>> The way I would approach is to evaluate GE, Deequ (there is a python
>> binding called pydeequ) and others like Delta Live tables with expectations
>> from Data Quality feature perspective. All these tools have their pros and
>> cons, and all of them are compatible with spark as a compute engine.
>>
>> Also, you may want to look at dbt based DQ toolsets if sql is your thing.
>>
>> On Wed, 28 Dec 2022 at 3:14 pm, Sean Owen  wrote:
>>
>>> I think this is kind of mixed up. Data warehouses are simple SQL
>>> creatures; Spark is (also) a distributed compute framework. Kind of like
>>> comparing maybe a web server to Java.
>>> Are you thinking of Spark SQL? then I dunno sure you may well find it
>>> more complicated, but it's also just a data warehousey SQL surface.
>>>
>>> But none of that relates to the question of data quality tools. You
>>> could use GE with Redshift, or indeed with Spark - are you familiar with
>>> it? It's probably one of the most common tools people use with Spark for
>>> this in fact. It's just a Python lib at heart and you can apply it with
>>> Spark, but _not_ with a data warehouse, so I'm not sure what you're getting
>>> at.
>>>
>>> Deequ is also commonly seen. It's actually built on Spark, so again,
>>> confused about this "use Redshift or Snowflake not Spark".
>>>
>>> On Tue, Dec 27, 2022 at 9:55 PM Gourav Sengupta <
>>> gourav.sengu...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> SPARK is just another querying engine with a lot of hype.
>>>>
>>>> I would highly suggest using Redshift (storage and compute decoupled
>>>> mode) or Snowflake without all this super complicated understanding of
>>>> containers/ disk-space, mind numbing variables, rocket science tuning, hair
>>>> splitting failure scenarios, etc. After that try to choose solutions like
>>>> Athena, or Trino/ Presto, and then come to SPARK.
>>>>
>>>> Try out solutions like  "great expectations" if you are looking for
>>>> data quality and not entirely sucked into the world of SPARK and want to
>>>> keep your options open.
>>>>
>>>> Dont get me wrong, SPARK used to be great in 2016-2017, but there are
>>>> superb alternatives now and the industry, in this recession, should focus
>>>> on getting more value for every single dollar they spend.
>>>>
>>>> Best of luck.
>>>>
>>>> Regards,
>>>> Gourav Sengupta
>>>>
>>>> On Tue, Dec 27, 2022 at 7:30 PM Mich Talebzadeh <
>>>> mich.talebza...@gmail.com> wrote:
>>>>
>>>>> Well, you need to qualify your statement on data quality. Are you
>>>>> talking about data lineage here?
>>>>>
>>>>> HTH
>>>>>
>>>>>
>>>>>
>>>>>view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, 27 Dec 2022 at 19:25, rajat kumar 
>>>>> wrote:
>>>>>
>>>>>> Hi Folks
>>>>>> Hoping you are doing well, I want to implement data quality to detect
>>>>>> issues in data in advance. I have heard about few frameworks like 
>>>>>> GE/Deequ.
>>>>>> Can anyone pls suggest which one is good and how do I get started on it?
>>>>>>
>>>>>> Regards
>>>>>> Rajat
>>>>>>
>>>>> --
>> Best Regards,
>> Ayan Guha
>>
>


Profiling data quality with Spark

2022-12-27 Thread rajat kumar
Hi Folks
Hoping you are doing well, I want to implement data quality to detect
issues in data in advance. I have heard about few frameworks like GE/Deequ.
Can anyone pls suggest which one is good and how do I get started on it?

Regards
Rajat


Re: Kyro Serializer not getting set : Spark3

2022-09-23 Thread rajat kumar
Thanks Qian and Vinod for response.

Yes , legacy way works same as for other properties, was thinking if there
is any other way.

Thanks
Rajat

On Fri, Sep 23, 2022 at 7:43 AM Qian SUN  wrote:

> Hi rajat
>
> I’m guessing you are setting the configuration at runtime, and correct me
> if I’m wrong.
> Only certain subset of Spark SQL properties (prefixed with spark.sql) can
> be set on runtime, please refer to SparkConf.scala
> <https://github.com/apache/spark/blob/500f3097111a6bf024acf41400660c199a150350/core/src/main/scala/org/apache/spark/SparkConf.scala#L51-L52>
>
> Once a SparkConf object is passed to Spark, it is cloned and can no longer
> be modified by the user. Spark does not support modifying the configuration
> at runtime.
>
> So, remaining options have to be set before SparkContext is initalized.
>
> val spark = SparkSession.builder.config("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer"").getOrCreate
>
>
> rajat kumar  于2022年9月23日周五 05:58写道:
>
>> Hello Users,
>>
>> While using below setting getting exception
>>   spark.conf.set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>>
>> User class threw exception: org.apache.spark.sql.AnalysisException:
>> Cannot modify the value of a Spark config: spark.serializer at
>> org.apache.spark.sql.errors.QueryCompilationErrors$.cannotModifyValueOfSparkConfigError(QueryCompilationErrors.scala:2322)
>>
>> Can we safely skip setting it or is there any changed way?
>>
>> Thanks
>> Rajat
>>
>>
>>
>
> --
> Best!
> Qian SUN
>


Kyro Serializer not getting set : Spark3

2022-09-22 Thread rajat kumar
Hello Users,

While using below setting getting exception
  spark.conf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")

User class threw exception: org.apache.spark.sql.AnalysisException: Cannot
modify the value of a Spark config: spark.serializer at
org.apache.spark.sql.errors.QueryCompilationErrors$.cannotModifyValueOfSparkConfigError(QueryCompilationErrors.scala:2322)

Can we safely skip setting it or is there any changed way?

Thanks
Rajat


Re: NoClassDefError and SparkSession should only be created and accessed on the driver.

2022-09-20 Thread rajat kumar
Hi Alton, it's in same scala class only. Is there any change in spark3 to
serialize separately?

Regards
Rajat

On Tue, Sep 20, 2022, 13:35 Xiao, Alton  wrote:

> Can you show us your code?
>
> your udf wasn’t  serialized by spark, In my opinion,  were they out of the
> spark running code?
>
>
>
> *发件人**:* rajat kumar 
> *日期**:* 星期二, 2022年9月20日 15:58
> *收件人**:* user @spark 
> *主题**:* NoClassDefError and SparkSession should only be created and
> accessed on the driver.
>
> Hello ,
>
> I am using Spark3 where there are some UDFs along . I am using Dataframe
> APIs to write parquet using spark. I am getting NoClassDefError along with
> below error.
>
> If I comment out all UDFs , it is working fine.
>
> Could someone suggest what could be wrong. It was working fine in Spark2.4
>
> 22/09/20 06:33:17 WARN TaskSetManager: Lost task 9.0 in stage 1.0 (TID 10)
> (vm-36408481 executor 2): java.lang.ExceptionInInitializerError
>
> *at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
>
> *at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)*
>
> *at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
>
> *at java.lang.reflect.Method.invoke(Method.java:498)*
>
> *at
> java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)*
>
> *at sun.reflect.GeneratedMethodAccessor20.invoke(Unknown Source)*
>
> *at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
>
> *at java.lang.reflect.Method.invoke(Method.java:498)*
>
> *at
> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274)*
>
> *at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:)*
>
> *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)*
>
> *at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119)*
>
> *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657)*
>
> *at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)*
>
> *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)*
>
> *at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)*
>
> *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)*
>
> *at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)*
>
> *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)*
>
> *at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)*
>
> *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)*
>
> *at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)*
>
> *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)*
>
> *at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)*
>
> *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)*
>
> *at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119)*
>
> *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657)*
>
> *at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)*
>
> *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)*
>
> *at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)*
>
> *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)*
>
> *at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119)*
>
> *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657)*
>
> *at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119)*
>
> *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657)*
>
> *at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119)*
>
> *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657)*
>
> *at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)*
>
> *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)*
>
> *at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)*
>
> *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)*
>
> *at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119)*
>
> *at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657)*
>
> *at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)*
>
> *at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)*
>
> *at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java

NoClassDefError and SparkSession should only be created and accessed on the driver.

2022-09-20 Thread rajat kumar
Hello ,

I am using Spark3 where there are some UDFs along . I am using Dataframe
APIs to write parquet using spark. I am getting NoClassDefError along with
below error.

If I comment out all UDFs , it is working fine.

Could someone suggest what could be wrong. It was working fine in Spark2.4

22/09/20 06:33:17 WARN TaskSetManager: Lost task 9.0 in stage 1.0 (TID 10)
(vm-36408481 executor 2): java.lang.ExceptionInInitializerError
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
at sun.reflect.GeneratedMethodAccessor20.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at 

Long running task in spark

2022-09-11 Thread rajat kumar
Hello Users,

My 2 tasks are running forever. One of them gave a java heap space error.
I have 10 Joins , all tables are big. I understand this is data skewness.
Apart from changes at code level , any property which can be used in Spark
Config?


I am using Spark2 hence AQE can not be used.


Thanks
Rajat


Data Type Issue while upgrading to Spark3

2022-09-02 Thread rajat kumar
Hello Users

Can some suggest what could be causing below error?


java.lang.RuntimeException: Error while decoding:
java.lang.NullPointerException: Null value appeared in non-nullable field:
- array element class: "scala.Long"
- root class: "scala.collection.Seq"
If the schema is inferred from a Scala tuple/case class, or a Java bean,
please try to use scala.Option[_] or other nullable types (e.g.
java.lang.Integer instead of int/scala.Int).
mapobjects(lambdavariable(MapObject, LongType, true, -1),
assertnotnull(lambdavariable(MapObject, LongType, true, -1)), input[0,
array, true], Some(interface scala.collection.Seq))


Regards
Rajat


Moving to Spark 3x from Spark2

2022-09-01 Thread rajat kumar
Hello Members,

We want to move to Spark 3 from Spark2.4 .

Are there any changes we need to do at code level which can break the
existing code?

Will it work by simply changing the version of spark & scala ?

Regards
Rajat


deciding Spark tasks & optimization resource

2022-08-29 Thread rajat kumar
Hello Members,

I have a query for spark stages:-

why every stage has a different number of tasks/partitions in spark. Or how
is it determined?

Moreover, where can i see the improvements done in spark3+


Thanks in advance
Rajat


Re: Spark with GPU

2022-08-13 Thread rajat kumar
Thanks Sean.

Also, I observed that lots of things are not supported in GPU by NVIDIA.
E.g. nested types/decimal type/Udfs etc.
So, will it use CPU automatically for running those tasks which require
nested types or will it run on GPU and fail.

Thanks
Rajat

On Sat, Aug 13, 2022, 18:54 Sean Owen  wrote:

> Spark does not use GPUs itself, but tasks you run on Spark can.
> The only 'support' there is is for requesting GPUs as resources for tasks,
> so it's just a question of resource management. That's in OSS.
>
> On Sat, Aug 13, 2022 at 8:16 AM rajat kumar 
> wrote:
>
>> Hello,
>>
>> I have been hearing about GPU in spark3.
>>
>> For batch jobs , will it help to improve GPU performance. Also is GPU
>> support available only on Databricks or on cloud based Spark clusters ?
>>
>> I am new , if anyone can share insight , it will help
>>
>> Thanks
>> Rajat
>>
>


Spark with GPU

2022-08-13 Thread rajat kumar
Hello,

I have been hearing about GPU in spark3.

For batch jobs , will it help to improve GPU performance. Also is GPU
support available only on Databricks or on cloud based Spark clusters ?

I am new , if anyone can share insight , it will help

Thanks
Rajat


Re: Dependencies issue in spark

2022-07-20 Thread rajat kumar
:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at 
scala.collection.immutable.List$SerializationProxy.readObject(List.scala:488)
at sun.reflect.GeneratedMethodAccessor15.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1184)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2296)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:85)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:655)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1592)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:658)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: A master URL must be set
in your configuration
at org.apache.spark.SparkContext.(SparkContext.scala:434)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2834)
at 
org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:1016)
at scala.Option.getOrElse(Option.scala:189)
at 
org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:1010)




22/07/20 11:27:02 INFO TaskSetManager: Lost task 87.0 in stage 6.0
(TID 291) on 10.139.64.16, executor 9: java.lang.NoClassDefFoundError
(Could not initialize class com.raw.test$) [duplicate 12]



On Wed, Jul 20, 2022 at 10:36 PM rajat kumar 
wrote:

> I did not set it explicitly while running on cluster and other jobs are
> also running fine , this conflict I have seen while reading json file . It
> says below issue along with Noclassdeffounderror
>
> On Wed, Jul 20, 2022, 22:34 Sean Owen  wrote:
>
>> I think that's straightforward - did you do this?
>>
>> On Wed, Jul 20, 2022 at 9:41 AM rajat kumar 
>> wrote:
>>
>>> Hey Sean ,
>>>
>>> Yes I agree.  But couldnot get which lib is causing conflict. Got below
>>> error in caused by
>>>
>>> Caused by: org.apache.spark.SparkException: A master URL must be set in 
>>> your configuration
>>> at org.apache.spark.SparkContext.(SparkContext.scala:434)
>>> at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2834)
>>> at 
>>> org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:1016)
>>> at scala.Option.getOrElse(Option.scala:189)
>>> at org.apache.spark.sql.SparkSession$Builder.getOrCreate
>>>
>>>
>>> On Wed, Jul 20, 2022 at 6:29 PM Sean Owen  wrote:
>>>
>>&g

Dependencies issue in spark

2022-07-20 Thread rajat kumar
Hello , I am using maven with Spark. Post upgrading scala form 2.11 to 2.12


I am getting below error and have observed this coming while reading avro
Appreciate help.

ShuffleMapStage 6 (save at Calling.scala:81) failed in 0.633 s due to Job
aborted due to stage failure: Task 83 in stage 6.0 failed 4 times, most
recent failure: Lost task 83.3 in stage 6.0 (TID 331, 10.139.64.19,
executor 0): java.lang.NoClassDefFoundError:


Re: Issue while building spark project

2022-07-20 Thread rajat kumar
Thanks a lot Sean

On Mon, Jul 18, 2022, 21:58 Sean Owen  wrote:

> Increase the stack size for the JVM when Maven / SBT run. The build sets
> this but you may still need something like "-Xss4m" in your MAVEN_OPTS
>
> On Mon, Jul 18, 2022 at 11:18 AM rajat kumar 
> wrote:
>
>> Hello ,
>>
>> Can anyone pls help me in below error. It is a maven project. It is
>> coming while building it
>>
>> [ERROR] error: java.lang.StackOverflowError
>> [INFO] at
>> scala.tools.nsc.typechecker.Typers$Typer.typedApply$1(Typers.scala:4885)
>>
>


Issue while building spark project

2022-07-18 Thread rajat kumar
Hello ,

Can anyone pls help me in below error. It is a maven project. It is coming
while building it

[ERROR] error: java.lang.StackOverflowError
[INFO] at
scala.tools.nsc.typechecker.Typers$Typer.typedApply$1(Typers.scala:4885)


Spark job failing and not giving error to do diagnosis

2022-04-23 Thread rajat kumar
Hello All

I am not getting anything in the logs and also history url is not opening.

Has someone faced this issue?

Application failed 1 times (global limit =5; local limit is =1) due to
ApplicationMaster for attempt timed out. Failing the application.



Thanks
Rajat


Re: Executorlost failure

2022-04-07 Thread rajat kumar
With autoscaling can have any numbers of executors.

Thanks

On Fri, Apr 8, 2022, 08:27 Wes Peng  wrote:

> I once had a file which is 100+GB getting computed in 3 nodes, each node
> has 24GB memory only. And the job could be done well. So from my
> experience spark cluster seems to work correctly for big files larger
> than memory by swapping them to disk.
>
> Thanks
>
> rajat kumar wrote:
> > Tested this with executors of size 5 cores, 17GB memory. Data vol is
> > really high around 1TB
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Executorlost failure

2022-04-07 Thread rajat kumar
Tested this with executors of size 5 cores, 17GB memory. Data vol is really
high around 1TB

Thanks
Rajat

On Thu, Apr 7, 2022, 23:43 rajat kumar  wrote:

> Hello Users,
>
> I got following error, tried increasing executor memory and memory
> overhead that also did not help .
>
> ExecutorLost Failure(executor1 exited caused by one of the following
> tasks) Reason: container from a bad node:
>
> java.lang.OutOfMemoryError: enough memory for aggregation
>
>
> Can someone please suggest ?
>
> Thanks
> Rajat
>


Executorlost failure

2022-04-07 Thread rajat kumar
Hello Users,

I got following error, tried increasing executor memory and memory overhead
that also did not help .

ExecutorLost Failure(executor1 exited caused by one of the following tasks)
Reason: container from a bad node:

java.lang.OutOfMemoryError: enough memory for aggregation


Can someone please suggest ?

Thanks
Rajat


Issue while creating spark app

2022-02-26 Thread rajat kumar
Hello Users,

I am trying to create spark application using Scala(Intellij).
I have installed Scala plugin in intelliJ still getting below error:-

Cannot find project Scala library 2.12.12 for module SparkSimpleApp


Could anyone please help what I am doing wrong?

Thanks
Rajat


SparkSQL vs Dataframe vs Dataset

2021-12-06 Thread rajat kumar
Hi Users,

Is there any use case when we need to use SQL vs Dataframe vs Dataset?

Is there any recommended approach or any advantage/performance gain over
others?

Thanks
Rajat


Moving millions of file using spark

2021-06-16 Thread rajat kumar
Hello ,

I know this might not be a valid use case for spark. But I have millions of
files in a single folder. file names are having a pattern. based on pattern
I want to move it to different directory.

Can you pls suggest what can be done?

Thanks
rajat


Re: Issue while calling foreach in Pyspark

2021-05-08 Thread rajat kumar
, gateway=gateway, conf=conf)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 316, in
_ensure_initialized
SparkContext._gateway = gateway or launch_gateway(conf)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line
46, in launch_gateway
return _launch_gateway(conf)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line
108, in _launch_gateway
raise Exception("Java gateway process exited before sending its port
number")
Exception: Java gateway process exited before sending its port number

at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
at
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at
org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at
org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at
org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more


End Main...

Thanks
Rajat

On Sat, May 8, 2021 at 3:02 AM Mich Talebzadeh 
wrote:

> By yarn mode I meant dealing with issues raised in a cluster wide.
>
> From personal experience, I find it easier to trace these sorts of errors
> when I run the code in local mode as it could be related to the set-up and
> easier to track where things go wrong when one is dealing with local mode.
>
> This line
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
>
>
> may be related to the following stackoverflow error
>
> py4j.protocol.Py4JJavaError occurred while calling
> z:org.apache.spark.api.python.PythonRDD.collectAndServe - Stack Overflow
> <https://stackoverflow.com/questions/50064646/py4j-protocol-py4jjavaerror-occurred-while-calling-zorg-apache-spark-api-python>
>
>
> HTH
>
>
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 7 May 2021 at 22:10, Sean Owen  wrote:
>
>> I don't see any reason to think this is related to YARN.
>> You haven't shown the actual error @rajat so not sure there is
>> anything to say.
>>
>> On Fri, May 7, 2021 at 3:08 PM Mich Talebzadeh 
>> wrote:
>>
>>> I have suspicion that this may be caused by your cluster as it appears
>>> that you are running this in YARN mode like below
>>>
>>> spark-submit --master yarn --deploy-mode client xyx.py
>>>
>>> What happens if you try running it in local mode?
>>>
>>> spark-submit --master local[2] xyx.py
>>>
>>> Is this run in a managed cluster like GCP dataproc?
>>>
>>> HTH
>>>
>>>
>

Re: Issue while calling foreach in Pyspark

2021-05-07 Thread rajat kumar
Thanks Mich and Sean for the response . Yes Sean is right. This is a batch
job.

  I am having only 10 records in the dataframe still it is giving this
exception

Following are the full logs.

File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line
584, in foreach
self.rdd.foreach(f)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 789, in
foreach
self.mapPartitions(processPartition).count()  # Force evaluation
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1055, in
count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1046, in sum
return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 917, in fold
vals = self.mapPartitions(func).collect()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 816, in
collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63,
in deco
return f(*a, **kw)
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line
328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1
in stage 3.0 failed 4 times, most recent failure: Lost task 1.3 in stage
3.0 (TID 10, 10.244.158.5, executor 1):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 364, in
main
func, profiler, deserializer, serializer = read_command(pickleSer,
infile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 69, in
read_command
command = serializer._read_with_length(file)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line
172, in _read_with_length
return self.loads(obj)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line
580, in loads
return pickle.loads(obj, encoding=encoding)
  File
"/opt/dataflow/python/lib/python3.6/site-packages/module/read_data.py",
line 10, in 
spark = SparkSession.builder.appName("test").getOrCreate()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line
173, in getOrCreate
sc = SparkContext.getOrCreate(sparkConf)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 367, in
getOrCreate
SparkContext(conf=conf or SparkConf())
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 133, in
__init__
SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 316, in
_ensure_initialized
SparkContext._gateway = gateway or launch_gateway(conf)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line
46, in launch_gateway
return _launch_gateway(conf)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line
108, in _launch_gateway
raise Exception("Java gateway process exited before sending its port
number")
Exception: Java gateway process exited before sending its port number

On Fri, May 7, 2021 at 9:35 PM Sean Owen  wrote:

> foreach definitely works :)
> This is not a streaming question.
> The error says that the JVM worker died for some reason. You'd have to
> look at its logs to see why.
>
> On Fri, May 7, 2021 at 11:03 AM Mich Talebzadeh 
> wrote:
>
>> Hi,
>>
>> I am not convinced foreach works even in 3.1.1
>> Try doing the same with foreachBatch
>>
>>  foreachBatch(sendToSink). \
>> trigger(processingTime='2 seconds'). \
>>
>> and see it works
>>
>> HTH
>>
>>
>>
>>view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 7 May 2021 at 16:07, rajat kumar 
>> wrote:
>>
&

Issue while calling foreach in Pyspark

2021-05-07 Thread rajat kumar
Hi Team,

I am using Spark 2.4.4 with Python

While using below line:

dataframe.foreach(lambda record : process_logs(record))


My use case is , process logs will download the file from cloud storage
using Python code and then it will save the processed data.

I am getting the following error

  File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line
46, in launch_gateway
return _launch_gateway(conf)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/java_gateway.py", line
108, in _launch_gateway
raise Exception("Java gateway process exited before sending its port
number")
Exception: Java gateway process exited before sending its port number

Can anyone pls suggest what can be done?

Thanks
Rajat


Yaml for google spark kubernetes configmap

2021-03-03 Thread rajat kumar
Hi

Has anyone used kubernetes with spark for configmap.
My spark job is not able to find configmap.

Can someone pls share the yaml if u have used configmap for google k8s

Thanks
Rajat


Re: Thread spilling sort issue with single task

2021-01-26 Thread rajat kumar
Hi ,

Yes I understand its skew based problem but how can it be avoided . Could
you please suggest?

I am in Spark2.4

Thanks
Rajat

On Tue, Jan 26, 2021 at 3:58 PM German Schiavon 
wrote:

> Hi,
>
> One word : SKEW
>
> It seems the classic skew problem, you would have to apply skew techniques
> to repartition your data properly or if you are in spark 3.0+ try the
> skewJoin optimization.
>
> On Tue, 26 Jan 2021 at 11:20, rajat kumar 
> wrote:
>
>> Hi Everyone,
>>
>> I am running a spark application where I have applied 2 left joins. 1st
>> join in Broadcast and another one is normal.
>> Out of 200 tasks , last 1 task is stuck . It is running at "ANY" Locality
>> level. It seems data skewness issue.
>> It is doing too much spill and shuffle write is too much. Following error
>> is coming in executor logs:
>>
>> INFO UnsafeExternalSorter: Thread spilling sort data of 10.4 GB to disk
>> (10  times so far)
>>
>>
>> Can anyone please suggest what can be wrong?
>>
>> Thanks
>> Rajat
>>
>


Thread spilling sort issue with single task

2021-01-26 Thread rajat kumar
Hi Everyone,

I am running a spark application where I have applied 2 left joins. 1st
join in Broadcast and another one is normal.
Out of 200 tasks , last 1 task is stuck . It is running at "ANY" Locality
level. It seems data skewness issue.
It is doing too much spill and shuffle write is too much. Following error
is coming in executor logs:

INFO UnsafeExternalSorter: Thread spilling sort data of 10.4 GB to disk (10
 times so far)


Can anyone please suggest what can be wrong?

Thanks
Rajat


Process each kafka record for structured streaming

2021-01-20 Thread rajat kumar
Hi,

I want to apply custom logic for each row of data I am getting through
kafka and want to do it with microbatch.
When I am running it , it is not progressing.


kafka_stream_df \
.writeStream \
.foreach(process_records) \
.outputMode("append") \
.option("checkpointLocation", "checkpt") \
.trigger(continuous="5 seconds").start()

Regards

Rajat


Re: Running pyspark job from virtual environment

2021-01-17 Thread rajat kumar
Hi Mich,

Thanks for response. I am running it through CLI (on the cluster).

Since this will be scheduled job. I do not want to activate the environment
manually. It should automatically take the path of virtual environment to
run the job.

For that I saw 3 properties which I mentioned. I think setting  some of
them to point to environment binary will help to run the job from venv.

PYTHONPATH
PYSPARK_DRIVER_PYTHON
PYSPARK_PYTHON

Also, It has to be set in env.sh or bashrc file? What is the difference
between spark-env.sh and bashrc

Thanks
Rajat



On Sun, Jan 17, 2021 at 10:32 PM Mich Talebzadeh 
wrote:

> Hi Rajat,
>
> Are you running this through an IDE like PyCharm or on CLI?
>
> If you already have a Python Virtual environment, then just activate it
>
> The only env variable you need to set is export PYTHONPATH that you can do
> it in your startup shell script .bashrc etc.
>
> Once you are in virtual environment, then you run:
>
> $SPARK_HOME/bin/spark-submit 
> Alternatively you can chmod +x  to the file
>
> #! /usr/bin/env python3
>
> and then you can run it as.
>
> ./
>
> HTH
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 17 Jan 2021 at 13:41, rajat kumar 
> wrote:
>
>> Hello,
>>
>> Can anyone confirm here please?
>>
>> Regards
>> Rajat
>>
>> On Sat, Jan 16, 2021 at 11:46 PM rajat kumar 
>> wrote:
>>
>>> Hey Users,
>>>
>>> I want to run spark job from virtual environment using Python.
>>>
>>> Please note I am creating virtual env (using python3 -m venv env)
>>>
>>> I see that there are 3 variables for PYTHON which we have to set:
>>> PYTHONPATH
>>> PYSPARK_DRIVER_PYTHON
>>> PYSPARK_PYTHON
>>>
>>> I have 2 doubts:
>>> 1. If i want to use Virtual env, do I need to point python path of
>>> virtual environment to all these variables?
>>> 2. Should I set these variables in spark-env.sh or should I set them
>>> using export statements.
>>>
>>> Regards
>>> Rajat
>>>
>>>
>>>


Re: Running pyspark job from virtual environment

2021-01-17 Thread rajat kumar
Hello,

Can anyone confirm here please?

Regards
Rajat

On Sat, Jan 16, 2021 at 11:46 PM rajat kumar 
wrote:

> Hey Users,
>
> I want to run spark job from virtual environment using Python.
>
> Please note I am creating virtual env (using python3 -m venv env)
>
> I see that there are 3 variables for PYTHON which we have to set:
> PYTHONPATH
> PYSPARK_DRIVER_PYTHON
> PYSPARK_PYTHON
>
> I have 2 doubts:
> 1. If i want to use Virtual env, do I need to point python path of virtual
> environment to all these variables?
> 2. Should I set these variables in spark-env.sh or should I set them using
> export statements.
>
> Regards
> Rajat
>
>
>


Running pyspark job from virtual environment

2021-01-16 Thread rajat kumar
Hey Users,

I want to run spark job from virtual environment using Python.

Please note I am creating virtual env (using python3 -m venv env)

I see that there are 3 variables for PYTHON which we have to set:
PYTHONPATH
PYSPARK_DRIVER_PYTHON
PYSPARK_PYTHON

I have 2 doubts:
1. If i want to use Virtual env, do I need to point python path of virtual
environment to all these variables?
2. Should I set these variables in spark-env.sh or should I set them using
export statements.

Regards
Rajat


Kubernetes spark insufficient cpu error

2020-12-21 Thread rajat kumar
Hey All

I am facing this error while running spark on kubernetes, can anyone
suggest what can be corrected here?

I am using minikube and spark 2.4 to run a spark submit with cluster mode.

default-scheduler  0/1 nodes are available: 1 Insufficient cpu.

Regards
Rajat


Spark Streaming Job is stucked

2020-10-18 Thread rajat kumar
Hello Everyone,

My spark streaming job is running too slow, it is having batch time of 15
seconds and the batch gets completed in 20-22 secs. It was fine till 1st
week October, but it is behaving this way suddenly. I know changing the
batch time can help , but other than that any idea what can be done?

Please note I am using Direct Stream Approach

Thanks
Rajat Sharma


Call Oracle Sequence using Spark

2019-08-15 Thread rajat kumar
Hi All,

I have to call Oracle sequence using spark. Can you pls tell what is the
way to do that?

Thanks
Rajat


write files of a specific size

2019-05-05 Thread rajat kumar
Hi All,
My spark sql job produces output as per default partition and creates N
number of files.
I want to create each file as 100Mb sized in the final result.

how can I do it ?

thanks
rajat


common logging in spark

2019-05-01 Thread rajat kumar
Hi All,

I have heard that log4j will not able to work properly. I have been told to
use logger in scala code.

Is there any pointer for that?

Thanks for help in advance
rajat


handling skewness issues

2019-04-29 Thread rajat kumar
Hi All,

How to overcome skewness issues in spark ?

I read that we can add some randomness to key column before join and remove
that random part after join.

is there any better way ? Above method seems to be a workaround.

thanks
rajat


Re: repartition in df vs partitionBy in df

2019-04-24 Thread rajat kumar
hello,
thanks for quick reply.
got it . partitionBy is to create something like hive partitions.
but when do we use repartition actually?
how to decide whether to do repartition or not?
because in development we are getting sample data.
also what number should I give while repartition.

thanks

On Thu, 25 Apr 2019, 10:31 moqi  Hello, I think you can refer to this link and hope to help you.
>
>
> https://stackoverflow.com/questions/40416357/spark-sql-difference-between-df-repartition-and-dataframewriter-partitionby/40417992
>
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: repartition in df vs partitionBy in df

2019-04-24 Thread rajat kumar
Hi All,
Can anyone explain?

thanks
rajat

On Sun, 21 Apr 2019, 00:18 kumar.rajat20del  Hi Spark Users,
>
> repartition and partitionBy seems to be very same in Df.
> In which scenario we use one?
>
> As per my understanding repartition is very expensive operation as it needs
> full shuffle then when do we use repartition ?
>
> Thanks
> Rajat
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark job running for long time

2019-04-21 Thread rajat kumar
Hi Yeikel,

I can not copy anything from the system.
But I have seen explain output.

It was doing sortMergeJoin for all tables.
There are 10 tables , all of them doing left outer join.

Out of 10 tables, 1 table is of 50MB and second table is of 200MB. Rest are
big tables.

Also the data is in Avro form.

I am using spark 2.2

I suspect broadcast can help , not sure because broadcast works for 10MB
sized smaller tables

Thanks
Rajat

On Wed, 17 Apr 2019, 23:53 Yeikel  Can you share the output of df.explain() ?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: --jars vs --spark.executor.extraClassPath vs --spark.driver.extraClassPath

2019-04-20 Thread rajat kumar
Hi,

Can anyone pls explain ?

On Mon, 15 Apr 2019, 09:31 rajat kumar  Hi All,
>
> I came across different parameters in spark submit
>
> --jars , --spark.executor.extraClassPath , --spark.driver.extraClassPath
>
> What are the differences between them? When to use which one? Will it
> differ
> if I use following:
>
> --master yarn --deploy-mode client
> --master yarn --deploy-mode cluster
>
>
> Thanks
> Rajat
>


Re: Spark job running for long time

2019-04-17 Thread rajat kumar
Hi ,

Thanks for response!

We are doing 12 left outer joins. Also I see GC is colored as red in Spark
UI. It seems GC is also taking time.
We have tried using kyro serialization.  Tried  giving more memory to
executor as well as driver. But it didn't work.





On Wed, 17 Apr 2019, 23:35 Yeikel  We need more information about your job to be able to help you. Please
> share
> some snippets or the overall idea of what you are doing
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Spark job running for long time

2019-04-17 Thread rajat kumar
Hi All,

One of my containers is still running for long time.
In logs it is showing "Thread 240 spilling sort data of 10.4 GB to disk".
This is happening every minute.


Thanks
Rajat


--jars vs --spark.executor.extraClassPath vs --spark.driver.extraClassPath

2019-04-14 Thread rajat kumar
Hi All,

I came across different parameters in spark submit

--jars , --spark.executor.extraClassPath , --spark.driver.extraClassPath

What are the differences between them? When to use which one? Will it differ
if I use following:

--master yarn --deploy-mode client
--master yarn --deploy-mode cluster


Thanks
Rajat


spark rdd grouping

2015-11-30 Thread Rajat Kumar
Hi

i have a javaPairRdd rdd1. i want to group by rdd1 by keys but
preserve the partitions of original rdd only to avoid shuffle since I know
all same keys are already in same partition.

PairRdd is basically constrcuted using kafka streaming low level consumer
which have all records with same key already in same partition. Can i group
them together with avoid shuffle.

Thanks