Reading too many files

2022-10-03 Thread Sachit Murarka
Hello,

I am reading too many files in Spark 3.2(Parquet) . It is not giving any
error in the logs. But after spark.read.parquet , it is not able to proceed
further.
Can anyone please suggest if there is any property to improve the parallel
reads? I am reading more than 25000 files .

Kind Regards,
Sachit Murarka


Re: Query regarding Proleptic Gregorian Calendar Spark3

2022-09-20 Thread Sachit Murarka
Reposting once.
Kind Regards,
Sachit Murarka


On Tue, Sep 20, 2022 at 6:56 PM Sachit Murarka 
wrote:

> Hi All,
>
> I am getting below error , I read the document and understood that we need
> to set 2 properties
> spark.conf.set("spark.sql.parquet.int96RebaseModeInRead","CORRECTED")
> spark.conf.set("spark.sql.parquet.int96RebaseModeInWrite","CORRECTED")
>
> is this the only way or is there any other way to handle this behaviour?
>
> Caused by: org.apache.spark.SparkUpgradeException: You may get a different
> result due to the upgrading of Spark 3.0:
> writing dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z
> into Parquet INT96
> files can be dangerous, as the files may be read by Spark 2.x or legacy
> versions of Hive
> later, which uses a legacy hybrid calendar that is different from Spark
> 3.0+'s Proleptic
> Gregorian calendar. See more details in SPARK-31404. You can set
> spark.sql.parquet.int96RebaseModeInWrite to 'LEGACY' to
> rebase the datetime values w.r.t. the calendar difference during writing,
> to get maximum
> interoperability. Or set spark.sql.parquet.int96RebaseModeInWrite to
> 'CORRECTED' to write the datetime values as it is,
> if you are 100% sure that the written files will only be read by Spark
> 3.0+ or other
> systems that use Proleptic Gregorian calendar.
>
>
> Kind Regards,
> Sachit Murarka
>


Query regarding Proleptic Gregorian Calendar Spark3

2022-09-20 Thread Sachit Murarka
Hi All,

I am getting below error , I read the document and understood that we need
to set 2 properties
spark.conf.set("spark.sql.parquet.int96RebaseModeInRead","CORRECTED")
spark.conf.set("spark.sql.parquet.int96RebaseModeInWrite","CORRECTED")

is this the only way or is there any other way to handle this behaviour?

Caused by: org.apache.spark.SparkUpgradeException: You may get a different
result due to the upgrading of Spark 3.0:
writing dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z
into Parquet INT96
files can be dangerous, as the files may be read by Spark 2.x or legacy
versions of Hive
later, which uses a legacy hybrid calendar that is different from Spark
3.0+'s Proleptic
Gregorian calendar. See more details in SPARK-31404. You can set
spark.sql.parquet.int96RebaseModeInWrite to 'LEGACY' to
rebase the datetime values w.r.t. the calendar difference during writing,
to get maximum
interoperability. Or set spark.sql.parquet.int96RebaseModeInWrite to
'CORRECTED' to write the datetime values as it is,
if you are 100% sure that the written files will only be read by Spark 3.0+
or other
systems that use Proleptic Gregorian calendar.


Kind Regards,
Sachit Murarka


Re: EXT: Network time out property is not getting set in Spark

2022-09-13 Thread Sachit Murarka
On Tue, Sep 13, 2022, 21:23 Sachit Murarka  wrote:

> Hi Vibhor,
>
> Thanks for your response!
>
> There are some properties which can be set without changing this flag 
> "spark.sql.legacy.setCommandRejectsSparkCoreConfs"
> post creation of spark session , like shuffle partitions. Any idea why so?
>
> Kind Regards,
> Sachit Murarka
>
>
> On Tue, Sep 13, 2022 at 7:14 PM Vibhor Gupta 
> wrote:
>
>> Hi Sachit,
>>
>> Check the migration guide.
>>
>> https://spark.apache.org/docs/latest/sql-migration-guide.html#:~:text=Spark%202.4%20and%20below%3A%20the,legacy.setCommandRejectsSparkCoreConfs%20to%20false.
>> Migration Guide: SQL, Datasets and DataFrame - Spark 3.3.0 Documentation
>> - Apache Spark
>> <https://spark.apache.org/docs/latest/sql-migration-guide.html#:~:text=Spark%202.4%20and%20below%3A%20the,legacy.setCommandRejectsSparkCoreConfs%20to%20false.>
>> In Spark 3.0, you can use ADD FILE to add file directories as well.
>> Earlier you could add only single files using this command. To restore the
>> behavior of earlier versions, set spark.sql.legacy.addSingleFileInAddFile
>> to true.. In Spark 3.0, SHOW TBLPROPERTIES throws AnalysisException if the
>> table does not exist. In Spark version 2.4 and below, this scenario caused
>> NoSuchTableException.
>> spark.apache.org
>> Also I think it is better to set this property before starting the
>> SparkContext.
>>
>> Regards,
>> Vibhor
>>
>> --
>> *From:* Sachit Murarka 
>> *Sent:* Tuesday, September 13, 2022 5:14 PM
>> *To:* spark users 
>> *Subject:* EXT: Network time out property is not getting set in Spark
>>
>> *EXTERNAL: *Report suspicious emails to *Email Abuse.*
>>
>> Hello Everyone,
>>
>> I am trying to set network timeout property , it used to work in
>> Spark2.X , but in Spark 3 , it is giving following error:-
>>
>> Could you please suggest if it is due to any bug in Spark3 or do we need
>> any other property because as per spark official doc ,this is the unchanged
>> property.
>>
>> spark.conf.set("spark.network.timeout", "1200s")
>>
>> org.apache.spark.sql.AnalysisException: Cannot modify the value of a
>> Spark config: spark.network.timeout
>>
>>   at
>> org.apache.spark.sql.errors.QueryCompilationErrors$.cannotModifyValueOfSparkConfigError(QueryCompilationErrors.scala:2322)
>>
>>   at
>> org.apache.spark.sql.RuntimeConfig.requireNonStaticConf(RuntimeConfig.scala:157)
>>
>>   at org.apache.spark.sql.RuntimeConfig.set(RuntimeConfig.scala:41)
>>
>>
>> Kind Regards,
>> Sachit Murarka
>>
>


Network time out property is not getting set in Spark

2022-09-13 Thread Sachit Murarka
Hello Everyone,

I am trying to set network timeout property , it used to work in Spark2.X ,
but in Spark 3 , it is giving following error:-

Could you please suggest if it is due to any bug in Spark3 or do we need
any other property because as per spark official doc ,this is the unchanged
property.

spark.conf.set("spark.network.timeout", "1200s")

org.apache.spark.sql.AnalysisException: Cannot modify the value of a Spark
config: spark.network.timeout

  at
org.apache.spark.sql.errors.QueryCompilationErrors$.cannotModifyValueOfSparkConfigError(QueryCompilationErrors.scala:2322)

  at
org.apache.spark.sql.RuntimeConfig.requireNonStaticConf(RuntimeConfig.scala:157)

  at org.apache.spark.sql.RuntimeConfig.set(RuntimeConfig.scala:41)


Kind Regards,
Sachit Murarka


Re: Issue while creating spark app

2022-02-26 Thread Sachit Murarka
Hello ,

Thanks for replying. I have installed Scala plugin in IntelliJ  first then
also it's giving same error

Cannot find project Scala library 2.12.12 for module SparkSimpleApp

Thanks
Rajat

On Sun, Feb 27, 2022, 00:52 Bitfox  wrote:

> You need to install scala first, the current version for spark is 2.12.15
> I would suggest you install scala by sdk which works great.
>
> Thanks
>
> On Sun, Feb 27, 2022 at 12:10 AM rajat kumar 
> wrote:
>
>> Hello Users,
>>
>> I am trying to create spark application using Scala(Intellij).
>> I have installed Scala plugin in intelliJ still getting below error:-
>>
>> Cannot find project Scala library 2.12.12 for module SparkSimpleApp
>>
>>
>> Could anyone please help what I am doing wrong?
>>
>> Thanks
>>
>> Rajat
>>
>


Collecting list of errors across executors

2021-08-03 Thread Sachit Murarka
Hi Team,

We are using rdd.foreach(lambda x : do_something(x))

Our use case requires collecting of the error messages in a list which are
coming up in the exception block of the method do_something.
Since this will be running on executor , a global list won't work here. As
the state needs to be shared among various executors, I thought of using
Accumulator,
but the accumulator uses only Integral values.

Can someone please suggest how do I collect all errors in a list which are
coming from all records of RDD.

Thanks,
Sachit Murarka


Re: Usage of DropDuplicate in Spark

2021-06-22 Thread Sachit Murarka
Hi Chetan,

You can substract the data frame or use except operation.
First DF contains full rows.
Second DF contains unique rows (post remove duplicates)
Subtract first and second DF .

hope this helps

Thanks
Sachit

On Tue, Jun 22, 2021, 22:23 Chetan Khatri 
wrote:

> Hi Spark Users,
>
> I want to use DropDuplicate, but those records which I discard. I
> would like to log to the instrumental table.
>
> What would be the best approach to do that?
>
> Thanks
>


Small file problem

2021-06-16 Thread Sachit Murarka
Hello Spark Users,

We are receiving too much small small files. About 3 million. Reading it
using spark.read itself taking long time and job is not proceeding further.

Is there any way to fasten this and proceed?

Regards
Sachit Murarka


How to handle auto-restart in Kubernetes Spark application

2021-05-02 Thread Sachit Murarka
Hi All,

I am using Spark with Kubernetes, Can anyone please tell me how I can
handle restarting failed Spark jobs?

I have used following property but it is not working

 restartPolicy:
type: OnFailure

Kind Regards,
Sachit Murarka


How to gracefully shutdown spark job on kubernetes

2021-03-29 Thread Sachit Murarka
Hi All,

I am using Spak 3.0.1 with Python 3.8 .

I am using spark.stop() in the end to gracefully shutdown the job once the
processing is done. But my job keeps running and it is giving following
exception in every 5 mins. Can someone please help on this?

21/03/29 17:46:39 WARN WatchConnectionManager: Exec Failure
java.io.EOFException
at okio.RealBufferedSource.require(RealBufferedSource.java:61)
at okio.RealBufferedSource.readByte(RealBufferedSource.java:74)
at
okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.java:117)
at
okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:101)
at
okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
at
okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
at okhttp3.RealCall$AsyncCall.execute(RealCall.java:203)
at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)

Kind Regards,
Sachit Murarka


Re: Issue while consuming message in kafka using structured streaming

2021-03-23 Thread Sachit Murarka
Hi Team,

I am facing this issue again.
I am using Spark 3.0.1 with Python.

Could you please suggest why it says the below error:



*Current Committed Offsets: {KafkaV2[Subscribe[my-topic]]:
{“my-topic”:{“1":1498,“0”:1410}}}Current Available Offsets:
{KafkaV2[Subscribe[my-topic]]: {“my-topic”:{“1”:1499,“0":1410}}}*


Kind Regards,
Sachit Murarka


On Fri, Mar 12, 2021 at 5:44 PM Gabor Somogyi 
wrote:

> Please see that driver side for example resolved in 3.1.0...
>
> G
>
>
> On Fri, Mar 12, 2021 at 1:03 PM Sachit Murarka 
> wrote:
>
>> Hi Gabor,
>>
>> Thanks a lot for the response. I am using Spark 3.0.1 and this is spark
>> structured streaming.
>>
>> Kind Regards,
>> Sachit Murarka
>>
>>
>> On Fri, Mar 12, 2021 at 5:30 PM Gabor Somogyi 
>> wrote:
>>
>>> Since you've not provided any version I guess you're using 2.x and
>>> you're hitting this issue:
>>> https://issues.apache.org/jira/browse/SPARK-28367
>>> The executor side must be resolved out of the box in the latest Spark
>>> version however on driver side one must set "
>>> spark.sql.streaming.kafka.useDeprecatedOffsetFetching=false" to use the
>>> new way of fetching.
>>>
>>> If it doesn't solve your problem then Kafka side must be checked why
>>> it's not returning...
>>>
>>> Hope this helps!
>>>
>>> G
>>>
>>>
>>> On Fri, Mar 12, 2021 at 12:29 PM Sachit Murarka 
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I am getting following error in spark structured streaming while
>>>> connecting to Kakfa
>>>>
>>>> Main issue from logs::
>>>> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of
>>>> 6ms expired before the position for partition my-topic-1 could be
>>>> determined
>>>>
>>>> Current Committed Offsets: {KafkaV2[Subscribe[my-topic]]:
>>>> {“my-topic”:{“1":1498,“0”:1410}}}
>>>> Current Available Offsets: {KafkaV2[Subscribe[my-topic]]:
>>>> {“my-topic”:{“1”:1499,“0":1410}}}
>>>>
>>>>
>>>> Full logs::
>>>>
>>>> 21/03/12 11:04:35 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4
>>>> times; aborting job
>>>> 21/03/12 11:04:35 ERROR WriteToDataSourceV2Exec: Data source write
>>>> support
>>>> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@1eff441c
>>>> is aborting.
>>>> 21/03/12 11:04:35 ERROR WriteToDataSourceV2Exec: Data source write
>>>> support
>>>> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@1eff441c
>>>> aborted.
>>>> 21/03/12 11:04:35 ERROR MicroBatchExecution: Query [id =
>>>> 2d788a3a-f0ee-4903-9679-0d13bc401e12, runId =
>>>> 1b387c28-c8e3-4336-9c9f-57db16aa8132] terminated with error
>>>> org.apache.spark.SparkException: Writing job aborted.
>>>> at
>>>> org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:413)
>>>> at
>>>> org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:361)
>>>> at
>>>> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:322)
>>>> at
>>>> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:329)
>>>> at
>>>> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
>>>> at
>>>> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
>>>> at
>>>> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)
>>>> at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
>>>> at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2940)
>>>> at
>>>> org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
>>>> at
>>>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
>>>> at
>>>> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
>>>> at
>>>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecut

Re: Issue while consuming message in kafka using structured streaming

2021-03-12 Thread Sachit Murarka
Hi Gabor,

Thanks a lot for the response. I am using Spark 3.0.1 and this is spark
structured streaming.

Kind Regards,
Sachit Murarka


On Fri, Mar 12, 2021 at 5:30 PM Gabor Somogyi 
wrote:

> Since you've not provided any version I guess you're using 2.x and you're
> hitting this issue: https://issues.apache.org/jira/browse/SPARK-28367
> The executor side must be resolved out of the box in the latest Spark
> version however on driver side one must set "
> spark.sql.streaming.kafka.useDeprecatedOffsetFetching=false" to use the
> new way of fetching.
>
> If it doesn't solve your problem then Kafka side must be checked why it's
> not returning...
>
> Hope this helps!
>
> G
>
>
> On Fri, Mar 12, 2021 at 12:29 PM Sachit Murarka 
> wrote:
>
>> Hi All,
>>
>> I am getting following error in spark structured streaming while
>> connecting to Kakfa
>>
>> Main issue from logs::
>> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of
>> 6ms expired before the position for partition my-topic-1 could be
>> determined
>>
>> Current Committed Offsets: {KafkaV2[Subscribe[my-topic]]:
>> {“my-topic”:{“1":1498,“0”:1410}}}
>> Current Available Offsets: {KafkaV2[Subscribe[my-topic]]:
>> {“my-topic”:{“1”:1499,“0":1410}}}
>>
>>
>> Full logs::
>>
>> 21/03/12 11:04:35 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4
>> times; aborting job
>> 21/03/12 11:04:35 ERROR WriteToDataSourceV2Exec: Data source write
>> support
>> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@1eff441c
>> is aborting.
>> 21/03/12 11:04:35 ERROR WriteToDataSourceV2Exec: Data source write
>> support
>> org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@1eff441c
>> aborted.
>> 21/03/12 11:04:35 ERROR MicroBatchExecution: Query [id =
>> 2d788a3a-f0ee-4903-9679-0d13bc401e12, runId =
>> 1b387c28-c8e3-4336-9c9f-57db16aa8132] terminated with error
>> org.apache.spark.SparkException: Writing job aborted.
>> at
>> org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:413)
>> at
>> org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:361)
>> at
>> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:322)
>> at
>> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:329)
>> at
>> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39)
>> at
>> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39)
>> at
>> org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45)
>> at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
>> at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2940)
>> at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
>> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
>> at org.apache.spark.sql.Dataset.collect(Dataset.scala:2940)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:575)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
>> at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>> at
>> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:570)
>> at
>> org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
>> at

Issue while consuming message in kafka using structured streaming

2021-03-12 Thread Sachit Murarka
ost recent failure: Lost task
0.3 in stage 0.0 (TID 3, 10.244.2.68, executor 1):
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired
before the position for partition my-topic-1 could be determined

Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
at scala.Option.foreach(Option.scala:407)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
at
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:382)
... 37 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout of
6ms expired before the position for partition my-topic-1 could be
determined

Current Committed Offsets: {KafkaV2[Subscribe[my-topic]]:
{“my-topic”:{“1":1498,“0”:1410}}}
Current Available Offsets: {KafkaV2[Subscribe[my-topic]]:
{“my-topic”:{“1”:1499,“0":1410}}}

Kind Regards,
Sachit Murarka


Re: Single executor processing all tasks in spark structured streaming kafka

2021-03-11 Thread Sachit Murarka
Hi Kapil,

Thanks for suggestion. Yes, It worked.

Regards
Sachit

On Tue, 9 Mar 2021, 00:19 Kapil Garg,  wrote:

> Hi Sachit,
> What do you mean by "spark is running only 1 executor with 1 task" ?
> Did you submit the spark application with multiple executors but only 1 is
> being used and rest are idle ?
> If that's the case, then it might happen due to spark.locality.wait
> setting which is by default set to 3s. This will enable spark to wait for
> 3s for the tasks to finish on the executor before submitting the next batch
> on another executors. This happens due to spark's preference for cached
> kafka consumers.
>
> And regarding having 1 task doing all the processing. Please check if your
> kafka topic has only 1 partition. Spark draws the parallelism from the
> number of partitions in the kafka topic. Once you have loaded the data from
> partitions, you can choose to repartition the batch so it is processed by
> multiple tasks.
>
> On Mon, Mar 8, 2021 at 10:57 PM Sachit Murarka 
> wrote:
>
>> Hi All,
>>
>> I am using Spark 3.0.1 Structuring streaming with Pyspark.
>>
>> The problem is spark is running only 1 executor with 1 task. Following is
>> the summary of what I am doing.
>>
>> Can anyone help on why my executor is 1 only?
>>
>> def process_events(event):
>> fetch_actual_data()
>> #many more steps
>>
>> def fetch_actual_data():
>> #applying operation on actual data
>>
>> df = spark.readStream.format("kafka") \
>> .option("kafka.bootstrap.servers", KAFKA_URL) \
>> .option("subscribe", KAFKA_TOPICS) \
>> .option("startingOffsets",
>> START_OFFSET).load() .selectExpr("CAST(value AS STRING)")
>>
>>
>> query =
>> df.writeStream.foreach(process_events).option("checkpointLocation",
>> "/opt/checkpoint").trigger(processingTime="30 seconds").start()
>>
>>
>>
>> Kind Regards,
>> Sachit Murarka
>>
>
>
> --
> Regards
> Kapil Garg
>
>
> *-*
>
> *This email and any files transmitted with it are confidential and
> intended solely for the use of the individual or entity to whom they are
> addressed. If you have received this email in error, please notify the
> system manager. This message contains confidential information and is
> intended only for the individual named. If you are not the named addressee,
> you should not disseminate, distribute or copy this email. Please notify
> the sender immediately by email if you have received this email by mistake
> and delete this email from your system. If you are not the intended
> recipient, you are notified that disclosing, copying, distributing or
> taking any action in reliance on the contents of this information is
> strictly prohibited.*
>
>
>
> *Any views or opinions presented in this email are solely those of the
> author and do not necessarily represent those of the organization. Any
> information on shares, debentures or similar instruments, recommended
> product pricing, valuations and the like are for information purposes only.
> It is not meant to be an instruction or recommendation, as the case may be,
> to buy or to sell securities, products, services nor an offer to buy or
> sell securities, products or services unless specifically stated to be so
> on behalf of the Flipkart group. Employees of the Flipkart group of
> companies are expressly required not to make defamatory statements and not
> to infringe or authorise any infringement of copyright or any other legal
> right by email communications. Any such communication is contrary to
> organizational policy and outside the scope of the employment of the
> individual concerned. The organization will not accept any liability in
> respect of such communication, and the employee responsible will be
> personally liable for any damages or other liability arising.*
>
>
>
> *Our organization accepts no liability for the content of this email, or
> for the consequences of any actions taken on the basis of the information *
> provided,* unless that information is subsequently confirmed in writing.
> If you are not the intended recipient, you are notified that disclosing,
> copying, distributing or taking any action in reliance on the contents of
> this information is strictly prohibited.*
>
>
> *-*
>
>


Single executor processing all tasks in spark structured streaming kafka

2021-03-08 Thread Sachit Murarka
Hi All,

I am using Spark 3.0.1 Structuring streaming with Pyspark.

The problem is spark is running only 1 executor with 1 task. Following is
the summary of what I am doing.

Can anyone help on why my executor is 1 only?

def process_events(event):
fetch_actual_data()
#many more steps

def fetch_actual_data():
#applying operation on actual data

df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_URL) \
.option("subscribe", KAFKA_TOPICS) \
.option("startingOffsets",
START_OFFSET).load() .selectExpr("CAST(value AS STRING)")


query = df.writeStream.foreach(process_events).option("checkpointLocation",
"/opt/checkpoint").trigger(processingTime="30 seconds").start()



Kind Regards,
Sachit Murarka


Re: com.esotericsoftware.kryo.KryoException: java.io.IOException: No space left on device\n\t

2021-03-08 Thread Sachit Murarka
Thanks Sean.

Kind Regards,
Sachit Murarka


On Mon, Mar 8, 2021 at 6:23 PM Sean Owen  wrote:

> It's there in the error: No space left on device
> You ran out of disk space (local disk) on one of your machines.
>
> On Mon, Mar 8, 2021 at 2:02 AM Sachit Murarka 
> wrote:
>
>> Hi All,
>>
>> I am getting the following error in my spark job.
>>
>> Can someone please have a look ?
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
>> in stage 41.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>> 41.0 (TID 80817, executor 193): com.esotericsoftware.kryo.KryoException:
>> java.io.IOException: No space left on device\n\tat
>> com.esotericsoftware.kryo.io.Output.flush(Output.java:188)\n\tat
>> com.esotericsoftware.kryo.io.Output.require(Output.java:164)\n\tat
>> com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:251)\n\tat
>> com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:237)\n\tat
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:49)\n\tat
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:38)\n\tat
>> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)\n\tat
>> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:245)\n\tat
>> org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:134)\n\tat
>> org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:241)\n\tat
>> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)\n\tat
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)\n\tat
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)\n\tat
>> org.apache.spark.scheduler.Task.run(Task.scala:123)\n\tat
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)\n\tat
>> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)\n\tat
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)\n\tat
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
>> java.lang.Thread.run(Thread.java:748)\nCaused by: java.io.IOException: No
>> space left on device\n\tat java.io.FileOutputStream.writeBytes(Native
>> Method)\n\tat
>> java.io.FileOutputStream.write(FileOutputStream.java:326)\n\tat
>> org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)\n\tat
>> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)\n\tat
>> java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)\n\tat
>> net.jpountz.lz4.LZ4BlockOutputStream.flush(LZ4BlockOutputStream.java:240)\n\tat
>> com.esotericsoftware.kryo.io.Output.flush(Output.java:186)\n\t... 19
>> more\n\nDriver stacktrace:\n\tat
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)\n\tat
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)\n\tat
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)\n\tat
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)\n\tat
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)\n\tat
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)\n\tat
>> scala.Option.foreach(Option.scala:257)\n\tat
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)\n\tat
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)\n\tat
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)\n\tat
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)\n\tat
>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)\n\tat
>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)\n\tat
>> org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)\n\tat
>> org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)\n\tat
>> org.apache.spark.SparkContex

Re: com.esotericsoftware.kryo.KryoException: java.io.IOException: No space left on device\n\t

2021-03-08 Thread Sachit Murarka
Hi Gourav,

I am using Pyspark . Spark version 2.4.4.
I have checked its not an space issue. Also I am using mount directory for
storing temp files.

Thanks
Sachit

On Mon, 8 Mar 2021, 13:53 Gourav Sengupta, 
wrote:

> Hi,
>
> it will be much help if you could at least format the message before
> asking people to go through it. Also I am pretty sure that the error is
> mentioned in the first line itself.
>
> Any ideas regarding the SPARK version, and environment that you are using?
>
>
> Thanks and Regards,
> Gourav Sengupta
>
> On Mon, Mar 8, 2021 at 8:02 AM Sachit Murarka 
> wrote:
>
>> Hi All,
>>
>> I am getting the following error in my spark job.
>>
>> Can someone please have a look ?
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
>> in stage 41.0 failed 4 times, most recent failure: Lost task 0.3 in stage
>> 41.0 (TID 80817, executor 193): com.esotericsoftware.kryo.KryoException:
>> java.io.IOException: No space left on device\n\tat
>> com.esotericsoftware.kryo.io.Output.flush(Output.java:188)\n\tat
>> com.esotericsoftware.kryo.io.Output.require(Output.java:164)\n\tat
>> com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:251)\n\tat
>> com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:237)\n\tat
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:49)\n\tat
>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:38)\n\tat
>> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)\n\tat
>> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:245)\n\tat
>> org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:134)\n\tat
>> org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:241)\n\tat
>> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)\n\tat
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)\n\tat
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)\n\tat
>> org.apache.spark.scheduler.Task.run(Task.scala:123)\n\tat
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)\n\tat
>> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)\n\tat
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)\n\tat
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
>> java.lang.Thread.run(Thread.java:748)\nCaused by: java.io.IOException: No
>> space left on device\n\tat java.io.FileOutputStream.writeBytes(Native
>> Method)\n\tat
>> java.io.FileOutputStream.write(FileOutputStream.java:326)\n\tat
>> org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)\n\tat
>> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)\n\tat
>> java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)\n\tat
>> net.jpountz.lz4.LZ4BlockOutputStream.flush(LZ4BlockOutputStream.java:240)\n\tat
>> com.esotericsoftware.kryo.io.Output.flush(Output.java:186)\n\t... 19
>> more\n\nDriver stacktrace:\n\tat
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)\n\tat
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)\n\tat
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)\n\tat
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)\n\tat
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)\n\tat
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)\n\tat
>> scala.Option.foreach(Option.scala:257)\n\tat
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)\n\tat
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)\n\tat
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)\n\tat
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)\n\tat
>>

com.esotericsoftware.kryo.KryoException: java.io.IOException: No space left on device\n\t

2021-03-08 Thread Sachit Murarka
(DelegatingMethodAccessorImpl.java:43)\n\tat
java.lang.reflect.Method.invoke(Method.java:498)\n\tat
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat
py4j.Gateway.invoke(Gateway.java:282)\n\tat
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat
py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat
py4j.GatewayConnection.run(GatewayConnection.java:238)\n\tat
java.lang.Thread.run(Thread.java:748)\nCaused by:
com.esotericsoftware.kryo.KryoException: java.io.IOException: No space left
on device\n\tat
com.esotericsoftware.kryo.io.Output.flush(Output.java:188)\n\tat
com.esotericsoftware.kryo.io.Output.require(Output.java:164)\n\tat
com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:251)\n\tat
com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:237)\n\tat
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:49)\n\tat
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:38)\n\tat
com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)\n\tat
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:245)\n\tat
org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:134)\n\tat
org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:241)\n\tat
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)\n\tat
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)\n\tat
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)\n\tat
org.apache.spark.scheduler.Task.run(Task.scala:123)\n\tat
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)\n\tat
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)\n\tat
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)\n\tat
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\t...
1 more\nCaused by: java.io.IOException: No space left on device\n\tat
java.io.FileOutputStream.writeBytes(Native Method)\n\tat
java.io.FileOutputStream.write(FileOutputStream.java:326)\n\tat
org.apache.spark.storage.TimeTrackingOutputStream.write(TimeTrackingOutputStream.java:58)\n\tat
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)\n\tat
java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)\n\tat
net.jpountz.lz4.LZ4BlockOutputStream.flush(LZ4BlockOutputStream.java:240)\n\tat
com.esotericsoftware.kryo.io.Output.flush(Output.java:186)\n\t... 19
more\n\n"}


Kind Regards,
Sachit Murarka


Re: Structured Streaming With Kafka - processing each event

2021-03-02 Thread Sachit Murarka
Hi Mich,

Thanks for reply.  Will checkout this.

Kind Regards,
Sachit Murarka


On Fri, Feb 26, 2021 at 2:14 AM Mich Talebzadeh 
wrote:

> Hi Sachit,
>
> I managed to make mine work using the *foreachBatch function *in
> writeStream.
>
> "foreach" performs custom write logic on each row and "foreachBatch"
> performs custom write logic on each micro-batch through SendToBigQuery
> function here
>  foreachBatch(SendToBigQuery) expects 2 parameters, first: micro-batch as
> DataFrame or Dataset and second: unique id for each batch
>  Using foreachBatch, we write each micro batch to storage defined in our
> custom logic. In this case, we store the output of our streaming
> application to Google BigQuery table.
>  Note that we are appending data and column "rowkey" is defined as UUID so
> it can be used as the primary key. batchId is just the counter
> (monolithically increasing number).
>
> This is my code:
>
>
> from __future__ import print_function
> from config import config
> import sys
> from sparkutils import sparkstuff as s
> from pyspark.sql import *
> from pyspark.sql.functions import *
> from pyspark.sql.types import StructType, StringType,IntegerType,
> FloatType, TimestampType
> from google.cloud import bigquery
>
>
> def SendToBigQuery(df, batchId):
>
> """
> Below uses standard Spark-BigQuery API to write to the table
> Additional transformation logic will be performed here
> """
> s.writeTableToBQ(df, "append",
> config['MDVariables']['targetDataset'],config['MDVariables']['targetTable'])
>
> class MDStreaming:
> def __init__(self, spark_session,spark_context):
> self.spark = spark_session
> self.sc = spark_context
> self.config = config
>
> def fetch_data(self):
> self.sc.setLogLevel("ERROR")
> #{"rowkey":"c9289c6e-77f5-4a65-9dfb-d6b675d67cff","ticker":"MSFT",
> "timeissued":"2021-02-23T08:42:23", "price":31.12}
> schema = StructType().add("rowkey", StringType()).add("ticker",
> StringType()).add("timeissued", TimestampType()).add("price", FloatType())
> try:
> # construct a streaming dataframe streamingDataFrame that
> subscribes to topic config['MDVariables']['topic']) -> md (market data)
> streamingDataFrame = self.spark \
> .readStream \
> .format("kafka") \
> .option("kafka.bootstrap.servers",
> config['MDVariables']['bootstrapServers'],) \
> .option("schema.registry.url",
> config['MDVariables']['schemaRegistryURL']) \
> .option("group.id", config['common']['appName']) \
> .option("zookeeper.connection.timeout.ms",
> config['MDVariables']['zookeeperConnectionTimeoutMs']) \
> .option("rebalance.backoff.ms",
> config['MDVariables']['rebalanceBackoffMS']) \
> .option("zookeeper.session.timeout.ms",
> config['MDVariables']['zookeeperSessionTimeOutMs']) \
> .option("auto.commit.interval.ms",
> config['MDVariables']['autoCommitIntervalMS']) \
> .option("subscribe", config['MDVariables']['topic']) \
> .option("failOnDataLoss", "false") \
> .option("includeHeaders", "true") \
> .option("startingOffsets", "latest") \
> .load() \
> .select(from_json(col("value").cast("string"),
> schema).alias("parsed_value"))
>
> #streamingDataFrame.printSchema()
>
> """
>"foreach" performs custom write logic on each row and
> "foreachBatch" performs custom write logic on each micro-batch through
> SendToBigQuery function
> foreachBatch(SendToBigQuery) expects 2 parameters, first:
> micro-batch as DataFrame or Dataset and second: unique id for each batch
>Using foreachBatch, we write each micro batch to storage
> defined in our custom logic. In this case, we store the output of our
> streaming application to Google BigQuery table.
>Note that we are appending data and column "rowkey" is
> defined as UUID so it can be used as the primary key
> """
> result = streamingDataFrame.select( \
>  col("parsed_value.rowkey"

Re: Spark job crashing - Spark Structured Streaming with Kafka

2021-03-02 Thread Sachit Murarka
.internal.ws.WebSocketReader.readHeader(WebSocketReader.java:117)
at
okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:101)
at
okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
at
okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
at okhttp3.RealCall$AsyncCall.execute(RealCall.java:203)
at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
21/03/02 07:05:36 WARN WatchConnectionManager: Exec Failure
java.io.EOFException
at okio.RealBufferedSource.require(RealBufferedSource.java:61)
at okio.RealBufferedSource.readByte(RealBufferedSource.java:74)
at
okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.java:117)
at
okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:101)
at
okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
at
okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
at okhttp3.RealCall$AsyncCall.execute(RealCall.java:203)
at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
21/03/02 07:10:37 WARN WatchConnectionManager: Exec Failure
java.io.EOFException
at okio.RealBufferedSource.require(RealBufferedSource.java:61)
at okio.RealBufferedSource.readByte(RealBufferedSource.java:74)
at
okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.java:117)
at
okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:101)
at
okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274)
at
okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214)
at okhttp3.RealCall$AsyncCall.execute(RealCall.java:203)
at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
xecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
 [2021-03-02 07:25:35,740] -  - [ERROR] - StreamingQueryException Exception
while calling run_data Traceback (most recent call last):
  File "/opt/app/file.py", line 69, in process_streams
query.awaitTermination()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line
103, in awaitTermination
return self._jsq.awaitTermination()
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
line 1304, in __call__
return_value = get_return_value(
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 134,
in deco
raise_from(converted)
  File "", line 3, in raise_from
pyspark.sql.utils.StreamingQueryException: Writing job aborted.
=== Streaming Query ===
Identifier: [id = 4ee71252-c480-4d00-866b-0fbd88e9520e, runId =
8f1f1756-da8d-4983-9f76-dc1af626ad84]
Current Committed Offsets: {}
Current Available Offsets: {KafkaV2[Subscribe[test-topic]]:
{"test-topic":{"0":4628}}}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
WriteToMicroBatchDataSource
org.apache.spark.sql.execution.streaming.sources.ForeachWriterTable$$anon$1$$anon$2@30f2abbb
+- Project [cast(value#8 as string) AS value#21]
   +- StreamingDataSourceV2Relation [key#7, value#8, topic#9, partition#10,
offset#11L, timestamp#12, timestampType#13],
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@433a9c3b,
KafkaV2[Subscribe[test-topic]]

Kind Regards,
Sachit Murarka


On Tue, Mar 2, 2021 at 2:53 PM Jungtaek Lim 
wrote:

> I feel this quite lacks information. Full stack traces from
> driver/executors are essential at least to determine what was happening.
>
> On Tue, Mar 2, 2021 at 5:26 PM Sachit Murarka 
> wrote:
>
>> Hi All,
>>
>> My spark job is crashing (Structured stream) . Can anyone help please. I
>> am using spark 3.0.1 with kubernetes.
>>
>> [ERROR] - StreamingQueryException Exception in  query.awaitTermination()
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line
>> 103, in awaitTermination
>> return self._jsq.awaitTermination()
>>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
>> line 1304, in __call__
>> return_value = get_return_value(
>>   File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
>> 134, in deco
>> raise_from(converted)
>>
>> Kind Regards,
>> Sachit Murarka
>>
>


Spark job crashing - Spark Structured Streaming with Kafka

2021-03-02 Thread Sachit Murarka
Hi All,

My spark job is crashing (Structured stream) . Can anyone help please. I am
using spark 3.0.1 with kubernetes.

[ERROR] - StreamingQueryException Exception in  query.awaitTermination()
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line
103, in awaitTermination
return self._jsq.awaitTermination()
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
line 1304, in __call__
return_value = get_return_value(
  File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 134,
in deco
raise_from(converted)

Kind Regards,
Sachit Murarka


Structured Streaming With Kafka - processing each event

2021-02-24 Thread Sachit Murarka
Hello Users,

I am using Spark 3.0.1 Structuring streaming with Pyspark.

My use case::
I get so many records in kafka(essentially some metadata with the location
of actual data). I have to take that metadata from kafka and apply some
processing.
Processing includes : Reading the actual data location from metadata and
fetching the actual data and applying some operation on actual data.

What I have tried::

def process_events(event):
fetch_actual_data()
#many more steps

def fetch_actual_data():
#applying operation on actual data

df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_URL) \
.option("subscribe", KAFKA_TOPICS) \
.option("startingOffsets",
START_OFFSET).load() .selectExpr("CAST(value AS STRING)")


query = df.writeStream.foreach(process_events).option("checkpointLocation",
"/opt/checkpoint").trigger(processingTime="30 seconds").start()


My Queries:

1. Will this foreach run across different executor processes? Generally in
spark , foreach means it runs on a single executor.

2. I receive too many records in kafka and above code will run multiple
times for each single message. If I change it for foreachbatch, will it
optimize it?


Kind Regards,
Sachit Murarka


Re: EOF Exception Spark Structured Streams - Kubernetes

2021-02-01 Thread Sachit Murarka
Hi Prashant,

Yes I understand that will be available from Spark 3.0.2 . But I am using
Spark3.0.1

Could you please suggest any way to mitigate this exception?

Will it cause any issues from this exception in the spark application?

Thanks
Sachit

On Mon, 1 Feb 2021, 13:57 Prashant Sharma,  wrote:

> Hi Sachit,
>
> The fix verison on that JIRA says 3.0.2, so this fix is not yet released.
> Soon, there will be a 3.1.1 release, in the meantime you can try out the
> 3.1.1-rc which also has the fix and let us know your findings.
>
> Thanks,
>
>
> On Mon, Feb 1, 2021 at 10:24 AM Sachit Murarka 
> wrote:
>
>> Following is the related JIRA , Can someone pls check
>>
>> https://issues.apache.org/jira/browse/SPARK-24266
>>
>> I am using 3.0.1 , It says fixed in 3.0.0 and 3.1.0 . Could you please
>> suggest what can be done to avoid this?
>>
>> Kind Regards,
>> Sachit Murarka
>>
>>
>> On Sun, Jan 31, 2021 at 6:38 PM Sachit Murarka 
>> wrote:
>>
>>> Hi Users,
>>>
>>> I am running Spark application on Kubernetes and getting the following
>>> exception in the driver pod. Though it is not affecting the output.
>>>
>>> This exception is coming every 5 minutes and this is a structured
>>> streaming job.
>>>
>>> Could anyone please advise ?
>>>
>>> 21/01/29 06:33:15 WARN WatchConnectionManager: Exec
>>> Failurejava.io.EOFException at
>>> okio.RealBufferedSource.require(RealBufferedSource.java:61) at
>>> okio.RealBufferedSource.readByte(RealBufferedSource.java:74) at
>>> okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.java:117) at
>>> okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:101)
>>> at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274) at
>>> okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214) at
>>> okhttp3.RealCall$AsyncCall.execute(RealCall.java:203) at
>>> okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) at
>>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>> at
>>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>> at java.base/java.lang.Thread.run(Thread.java:834)21/01/29 06:38:16 WARN
>>> WatchConnectionManager: Exec Failure
>>>
>>>
>>> Kind Regards,
>>> Sachit Murarka
>>>
>>


Re: Spark SQL query

2021-02-01 Thread Sachit Murarka
Application wise it wont show as such.
You can try to corelate it with explain plain output using some filters or
attribute.

Or else if you do not have too much queries in history. Just take queries
and find plan of those queries and match it with shown in UI.

I know thats the tedious task. But I dont think that there is other way.

Thanks
Sachit

On Mon, 1 Feb 2021, 22:32 Arpan Bhandari,  wrote:

> Sachit,
>
> That is showing all the queries that got executed, but how it would get
> mapped to specific application Id it was associated with ?
>
> Thanks,
> Arpan Bhandari
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark SQL query

2021-02-01 Thread Sachit Murarka
Hi arpan,

In spark shell when you type
:history.
then also it is not showing?

Thanks
Sachit

On Mon, 1 Feb 2021, 21:13 Arpan Bhandari,  wrote:

> Hey Sachit,
>
> It shows the query plan, which is difficult to diagnose out and depict the
> actual query.
>
>
> Thanks,
> Arpan Bhandari
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: EOF Exception Spark Structured Streams - Kubernetes

2021-01-31 Thread Sachit Murarka
Following is the related JIRA , Can someone pls check

https://issues.apache.org/jira/browse/SPARK-24266

I am using 3.0.1 , It says fixed in 3.0.0 and 3.1.0 . Could you please
suggest what can be done to avoid this?

Kind Regards,
Sachit Murarka


On Sun, Jan 31, 2021 at 6:38 PM Sachit Murarka 
wrote:

> Hi Users,
>
> I am running Spark application on Kubernetes and getting the following
> exception in the driver pod. Though it is not affecting the output.
>
> This exception is coming every 5 minutes and this is a structured
> streaming job.
>
> Could anyone please advise ?
>
> 21/01/29 06:33:15 WARN WatchConnectionManager: Exec
> Failurejava.io.EOFException at
> okio.RealBufferedSource.require(RealBufferedSource.java:61) at
> okio.RealBufferedSource.readByte(RealBufferedSource.java:74) at
> okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.java:117) at
> okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:101)
> at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274) at
> okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214) at
> okhttp3.RealCall$AsyncCall.execute(RealCall.java:203) at
> okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:834)21/01/29 06:38:16 WARN
> WatchConnectionManager: Exec Failure
>
>
> Kind Regards,
> Sachit Murarka
>


EOF Exception Spark Structured Streams - Kubernetes

2021-01-31 Thread Sachit Murarka
Hi Users,

I am running Spark application on Kubernetes and getting the following
exception in the driver pod. Though it is not affecting the output.

This exception is coming every 5 minutes and this is a structured streaming
job.

Could anyone please advise ?

21/01/29 06:33:15 WARN WatchConnectionManager: Exec
Failurejava.io.EOFException at
okio.RealBufferedSource.require(RealBufferedSource.java:61) at
okio.RealBufferedSource.readByte(RealBufferedSource.java:74) at
okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.java:117) at
okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:101)
at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:274) at
okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:214) at
okhttp3.RealCall$AsyncCall.execute(RealCall.java:203) at
okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32) at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)21/01/29 06:38:16 WARN
WatchConnectionManager: Exec Failure


Kind Regards,
Sachit Murarka


Re: Spark SQL query

2021-01-31 Thread Sachit Murarka
  Hi Arpan,

Launch spark shell and in the shell type ":history" , you will see the
query executed.

In the Spark UI under SQL Tab you can see the query plan when you click on
the details button(Though it won't show you the complete query). But by
looking at the plan you can get your query.

Hope this helps!


Kind Regards,
Sachit Murarka


On Fri, Jan 29, 2021 at 9:33 PM Arpan Bhandari  wrote:

> Hi Sachit,
>
> Yes it was executed using spark shell, history is already enabled. already
> checked sql tab but it is not showing the query. My spark version is 2.4.5
>
> Thanks,
> Arpan Bhandari
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark SQL query

2021-01-29 Thread Sachit Murarka
Hi Arpan,

Was it executed using spark shell?
If yes type :history

Do u have history server enabled?
If yes , go to the history and go to the SQL tab in History UI.

Thanks
Sachit

On Fri, 29 Jan 2021, 19:19 Arpan Bhandari,  wrote:

> Hi ,
>
> Is there a way to track back spark sql after it has been already run i.e.
> query has been already submitted by a person and i have to back trace what
> query actually got submitted.
>
>
> Appreciate any help on this.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Query on entrypoint.sh Kubernetes spark

2021-01-21 Thread Sachit Murarka
Hi All,

To run spark on kubernetes . I see following lines in entrypoint.sh script
available

case "$1" in
  driver)
shift 1
CMD=(
  "$SPARK_HOME/bin/spark-submit"
  --conf "spark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS"
  --deploy-mode client

Could you pls suggest Why deploy-mode client is mentioned in entrypoint.sh ?
I am running spark submit using deploy mode cluster but inside
entrypoint.sh which it is mentioned like that.


Kind Regards,
Sachit Murarka


Re: Issue with executer

2021-01-20 Thread Sachit Murarka
Hi Vikas

1. Are you running in local mode? Master has local[*]
2. Pls mask the ip or confidential info while sharing logs

Thanks
Sachit

On Wed, 20 Jan 2021, 17:35 Vikas Garg,  wrote:

> Hi,
>
> I am facing issue with spark executor. I am struggling with this issue
> since last many days and unable to resolve the issue.
>
> Below is the configuration I have given.
>
>   val spark = SparkSession.builder()
> .appName("Spark Job")
> .master("local[*]")
> .config("spark.dynamicAllocation.enabled", true)
> .config("spark.shuffle.service.enabled", true)
> .config("spark.driver.maxResultSize", "8g")
> .config("spark.driver.memory", "8g")
> .config("spark.executor.memory", "8g")
> .config("spark.network.timeout", "3600s")
> .getOrCreate()
>
> 1/01/20 17:06:57 ERROR RetryingBlockFetcher: Exception while beginning
> fetch of 1 outstanding blocks
>
> *java.io.IOException*: Failed to connect to
> del1-lhp-n9.synapse.com/192.168.166.213:51348
>
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(
> *TransportClientFactory.java:253*)
>
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(
> *TransportClientFactory.java:195*)
>
> at
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$2.createAndStart(
> *NettyBlockTransferService.scala:122*)
>
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(
> *RetryingBlockFetcher.java:141*)
>
> at org.apache.spark.network.shuffle.RetryingBlockFetcher.start(
> *RetryingBlockFetcher.java:121*)
>
> at
> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(
> *NettyBlockTransferService.scala:143*)
>
> at
> org.apache.spark.network.BlockTransferService.fetchBlockSync(
> *BlockTransferService.scala:103*)
>
> at
> org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(
> *BlockManager.scala:1010*)
>
> at
> org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(
> *BlockManager.scala:954*)
>
> at scala.Option.orElse(*Option.scala:289*)
>
> at org.apache.spark.storage.BlockManager.getRemoteBlock(
> *BlockManager.scala:954*)
>
> at org.apache.spark.storage.BlockManager.getRemoteBytes(
> *BlockManager.scala:1092*)
>
> at
> org.apache.spark.scheduler.TaskResultGetter$$anon$3.$anonfun$run$1(
> *TaskResultGetter.scala:88*)
>
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>
> at org.apache.spark.util.Utils$.logUncaughtExceptions(
> *Utils.scala:1932*)
>
> at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(
> *TaskResultGetter.scala:63*)
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> *ThreadPoolExecutor.java:1149*)
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> *ThreadPoolExecutor.java:624*)
>
> at java.lang.Thread.run(*Thread.java:748*)
>
> Caused by: *io.netty.channel.AbstractChannel$AnnotatedSocketException*:
> Permission denied: no further information:
> del1-lhp-n9.synapse.com/192.168.166.213:51348
>
> Caused by: *java.net.SocketException*: Permission denied: no further
> information
>
> at sun.nio.ch.SocketChannelImpl.checkConnect(*Native Method*)
>
> at sun.nio.ch.SocketChannelImpl.finishConnect(
> *SocketChannelImpl.java:715*)
>
> at
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(
> *NioSocketChannel.java:330*)
>
> at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(
> *AbstractNioChannel.java:334*)
>
> at io.netty.channel.nio.NioEventLoop.processSelectedKey(
> *NioEventLoop.java:702*)
>
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(
> *NioEventLoop.java:650*)
>
> at io.netty.channel.nio.NioEventLoop.processSelectedKeys(
> *NioEventLoop.java:576*)
>
> at io.netty.channel.nio.NioEventLoop.run(
> *NioEventLoop.java:493*)
>
> at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(
> *SingleThreadEventExecutor.java:989*)
>
> at io.netty.util.internal.ThreadExecutorMap$2.run(
> *ThreadExecutorMap.java:74*)
>
> at io.netty.util.concurrent.FastThreadLocalRunnable.run(
> *FastThreadLocalRunnable.java:30*)
>
> at java.lang.Thread.run(*Thread.java:748*)
>
> 21/01/20 17:06:57 ERROR RetryingBlockFetcher: Exception while beginning
> fetch of 1 outstanding blocks
>
> *java.io.IOException*: Failed to connect to
> del1-lhp-n9.synapse.com/192.168.166.213:51348
>
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(
> *TransportClientFactory.java:253*)
>
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(
> *TransportClientFactory.java:195*)
>
> at
> 

Re: Spark 3.0.1 giving warning while running with Java 11

2021-01-15 Thread Sachit Murarka
Sure Sean. Thanks for confirmation.

On Fri, 15 Jan 2021, 10:57 Sean Owen,  wrote:

> You can ignore that. Spark 3.x works with Java 11 but it will generate
> some warnings that are safe to disregard.
>
> On Thu, Jan 14, 2021 at 11:26 PM Sachit Murarka 
> wrote:
>
>> Hi All,
>>
>> Getting warning while running spark3.0.1 with Java11 .
>>
>>
>> WARNING: An illegal reflective access operation has occurred
>> WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (
>> file:/opt/spark/jars/spark-unsafe_2.12-3.0.1.jar) to constructor
>> java.nio.DirectByteBuffer(long,int)
>> WARNING: Please consider reporting this to the maintainers of
>> org.apache.spark.unsafe.Platform
>> WARNING: Use --illegal-access=warn to enable warnings of further illegal
>> reflective access operations
>> WARNING: All illegal access operations will be denied in a future release
>>
>>
>>
>> Kind Regards,
>> Sachit Murarka
>>
>


Spark 3.0.1 giving warning while running with Java 11

2021-01-14 Thread Sachit Murarka
Hi All,

Getting warning while running spark3.0.1 with Java11 .


WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (
file:/opt/spark/jars/spark-unsafe_2.12-3.0.1.jar) to constructor
java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of
org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal
reflective access operations
WARNING: All illegal access operations will be denied in a future release


Kind Regards,
Sachit Murarka


Re: HPA - Kubernetes for Spark

2021-01-10 Thread Sachit Murarka
Hi ,

Yes I know  by setting shuffle tracking property enabled we can use DRA.
But , it is marked as experimental. Is it advised to use ?

Also , regarding HPA. We do not have HPA differently as such for Spark.
Right?

Kind Regards,
Sachit Murarka


On Mon, Jan 11, 2021 at 2:17 AM Sandish Kumar HN 
wrote:

> Sachit,
>
> K8S based spark dynamic allocation is only available on Spark 3.0.X+ and
> that too without External Shuffling Service.
>
>
> https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/user-guide.md#dynamic-allocation
> http://spark.apache.org/docs/latest/running-on-kubernetes.html#future-work
>
> On Sun, 10 Jan 2021 at 13:23, Sachit Murarka 
> wrote:
>
>> Hi All,
>>
>> I have read about HPA  Horizontal Pod Autoscaling(for pod scaling).
>>
>> I understand it can be achieved by setting the request and limit for
>> resources in yaml:
>> kubectl autoscale deploy/application-cpu --cpu-percent=95 --min=1
>> --max=10  // example command.
>>
>> But does Kubernetes actually work with Spark for HPA? Since 1 pod is used
>> to launch 1 unique executor. Here in spark ideally pod can be scaled by
>> dynamical allocation of executors(which in turn is a pod) instead of HPA.
>> But Dynamic allocation is not supported as shuffle service is not there
>> till Spark 3 release.
>>
>> Could any one suggest how can I proceed achieving pod scaling in Spark?
>>
>> Please note : I am using Kubernetes with Spark operator.
>>
>>
>> Kind Regards,
>> Sachit Murarka
>>
>
>
> --
>
> Thanks,
> Regards,
> SandishKumar HN
>


HPA - Kubernetes for Spark

2021-01-10 Thread Sachit Murarka
Hi All,

I have read about HPA  Horizontal Pod Autoscaling(for pod scaling).

I understand it can be achieved by setting the request and limit for
resources in yaml:
kubectl autoscale deploy/application-cpu --cpu-percent=95 --min=1 --max=10
// example command.

But does Kubernetes actually work with Spark for HPA? Since 1 pod is used
to launch 1 unique executor. Here in spark ideally pod can be scaled by
dynamical allocation of executors(which in turn is a pod) instead of HPA.
But Dynamic allocation is not supported as shuffle service is not there
till Spark 3 release.

Could any one suggest how can I proceed achieving pod scaling in Spark?

Please note : I am using Kubernetes with Spark operator.


Kind Regards,
Sachit Murarka


Re: Suggestion on Spark 2.4.7 vs Spark 3 for Kubernetes

2021-01-05 Thread Sachit Murarka
Thanks for the link Prashant.

Regards
Sachit

On Tue, 5 Jan 2021, 15:08 Prashant Sharma,  wrote:

>  A lot of developers may have already moved to 3.0.x, FYI 3.1.0 is just
> around the corner hopefully(in a few days) and has a lot of improvements to
> spark on K8s, including it will be transitioning from experimental to GA in
> this release.
>
> See: https://issues.apache.org/jira/browse/SPARK-33005
>
> Thanks,
>
> On Tue, Jan 5, 2021 at 12:41 AM Sachit Murarka 
> wrote:
>
>> Hi Users,
>>
>> Could you please tell which Spark version have you used in Production for
>> Kubernetes.
>> Which is a recommended version for Production provided that both
>> Streaming and core apis have to be used using Pyspark.
>>
>> Thanks !
>>
>> Kind Regards,
>> Sachit Murarka
>>
>


Suggestion on Spark 2.4.7 vs Spark 3 for Kubernetes

2021-01-04 Thread Sachit Murarka
Hi Users,

Could you please tell which Spark version have you used in Production for
Kubernetes.
Which is a recommended version for Production provided that both Streaming
and core apis have to be used using Pyspark.

Thanks !

Kind Regards,
Sachit Murarka


Re: Error while running Spark on K8s

2021-01-04 Thread Sachit Murarka
Hi Prashant

Thanks for the response!

I created the service account with the permissions  and following is the
command:

spark-submit --deploy-mode cluster --master k8s://http://ip:port --name
"sachit"   --conf spark.kubernetes.pyspark.pythonVersion=3
--conf spark.kubernetes.namespace=spark-test --conf
spark.executor.instances=5 --conf
spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa  --conf
spark.kubernetes.container.image=sparkpy local:///opt/spark/da/main.py

Kind Regards,
Sachit Murarka


On Mon, Jan 4, 2021 at 5:46 PM Prashant Sharma  wrote:

> Hi Sachit,
>
> Can you give more details on how did you run? i.e. spark submit command.
> My guess is, a service account with sufficient privilege is not provided.
> Please see:
> http://spark.apache.org/docs/latest/running-on-kubernetes.html#rbac
>
> Thanks,
>
> On Mon, Jan 4, 2021 at 5:27 PM Sachit Murarka 
> wrote:
>
>> Hi All,
>> I am getting the below error when I am trying to run the spark job on
>> Kubernetes, I am running it in cluster mode.
>>
>> Exception in thread "main"
>> io.fabric8.kubernetes.client.KubernetesClientException: Operation:
>> [create] for kind: [Pod] with name: [null] in namespace:
>> [spark-test] failed.
>> at
>> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
>> at
>> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
>>
>>
>> I saw there was an JIRA opened already. SPARK-31786
>> I tried the parameter mentioned in JIRA
>> too(spark.kubernetes.driverEnv.HTTP2_DISABLE=true), that also did not work.
>> Can anyone suggest what can be done?
>>
>> Kind Regards,
>> Sachit Murarka
>>
>


Error while running Spark on K8s

2021-01-04 Thread Sachit Murarka
Hi All,
I am getting the below error when I am trying to run the spark job on
Kubernetes, I am running it in cluster mode.

Exception in thread "main"
io.fabric8.kubernetes.client.KubernetesClientException: Operation:
[create] for kind: [Pod] with name: [null] in namespace:
[spark-test] failed.
at
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
at
io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)


I saw there was an JIRA opened already. SPARK-31786
I tried the parameter mentioned in JIRA
too(spark.kubernetes.driverEnv.HTTP2_DISABLE=true), that also did not work.
Can anyone suggest what can be done?

Kind Regards,
Sachit Murarka


Re: Issue while installing dependencies Python Spark

2020-12-18 Thread Sachit Murarka
Hi Patrick/Users,

I am exploring wheel file form packages for this , as this seems simple:-

https://bytes.grubhub.com/managing-dependencies-and-artifacts-in-pyspark-7641aa89ddb7

However, I am facing another issue:- I am using pandas , which needs numpy.
Numpy is giving error!


ImportError: Unable to import required dependencies:
numpy:

IMPORTANT: PLEASE READ THIS FOR ADVICE ON HOW TO SOLVE THIS ISSUE!

Importing the numpy C-extensions failed. This error can happen for
many reasons, often due to issues with your setup or how NumPy was
installed.

We have compiled some common reasons and troubleshooting tips at:

https://numpy.org/devdocs/user/troubleshooting-importerror.html

Please note and check the following:

  * The Python version is: Python3.7 from "/usr/bin/python3"
  * The NumPy version is: "1.19.4"

and make sure that they are the versions you expect.
Please carefully study the documentation linked above for further help.

Original error was: No module named 'numpy.core._multiarray_umath'



Kind Regards,
Sachit Murarka


On Thu, Dec 17, 2020 at 9:24 PM Patrick McCarthy 
wrote:

> I'm not very familiar with the environments on cloud clusters, but in
> general I'd be reluctant to lean on setuptools or other python install
> mechanisms. In the worst case, you might encounter /usr/bin/pip not having
> permissions to install new packages, or even if you do a package might
> require something you can't change like a libc dependency.
>
> Perhaps you can install the .whl and its dependencies to the virtualenv on
> a local machine, and then *after* the install process, package that venv?
>
> If possible, I like conda for this approach over a vanilla venv because it
> will contain all the non-python dependencies (like libc) if they're needed.
>
>
> Another thing - I think there are several ways to do this, but I've had
> the most success including the .zip containing my environment in
> `spark.yarn.dist.archives` and then using a relative path:
>
> os.environ['PYSPARK_PYTHON'] = './py37minimal_env/py37minimal/bin/python'
>
> dist_archives =
> 'hdfs:///user/pmccarthy/conda/py37minimal.zip#py37minimal_env'
>
> SparkSession.builder.
> ...
>  .config('spark.yarn.dist.archives', dist_archives)
>
>
> On Thu, Dec 17, 2020 at 10:32 AM Sachit Murarka 
> wrote:
>
>> Hi Users
>>
>> I have a wheel file , while creating it I have mentioned dependencies in
>> setup.py file.
>> Now I have 2 virtual envs, 1 was already there . another one I created
>> just now.
>>
>> I have switched to new virtual env, I want spark to download the
>> dependencies while doing spark-submit using wheel.
>>
>> Could you please help me on this?
>>
>> It is not downloading dependencies , instead it is pointing to older
>> version of  virtual env and proceeding with the execution of spark job.
>>
>> Please note I have tried setting the env variables also.
>> Also I have tried following options as well in spark submit
>>
>> --conf spark.pyspark.virtualenv.enabled=true  --conf
>> spark.pyspark.virtualenv.type=native --conf
>> spark.pyspark.virtualenv.requirements=requirements.txt  --conf
>> spark.pyspark.python= /path/to/venv/bin/python3 --conf
>> spark.pyspark.driver.python=/path/to/venv/bin/python3
>>
>> This did not help too..
>>
>> Kind Regards,
>> Sachit Murarka
>>
>
>
> --
>
>
> *Patrick McCarthy  *
>
> Senior Data Scientist, Machine Learning Engineering
>
> Dstillery
>
> 470 Park Ave South, 17th Floor, NYC 10016
>


Issue while installing dependencies Python Spark

2020-12-17 Thread Sachit Murarka
Hi Users

I have a wheel file , while creating it I have mentioned dependencies in
setup.py file.
Now I have 2 virtual envs, 1 was already there . another one I created just
now.

I have switched to new virtual env, I want spark to download the
dependencies while doing spark-submit using wheel.

Could you please help me on this?

It is not downloading dependencies , instead it is pointing to older
version of  virtual env and proceeding with the execution of spark job.

Please note I have tried setting the env variables also.
Also I have tried following options as well in spark submit

--conf spark.pyspark.virtualenv.enabled=true  --conf
spark.pyspark.virtualenv.type=native --conf
spark.pyspark.virtualenv.requirements=requirements.txt  --conf
spark.pyspark.python= /path/to/venv/bin/python3 --conf
spark.pyspark.driver.python=/path/to/venv/bin/python3

This did not help too..

Kind Regards,
Sachit Murarka


Running spark code using wheel file

2020-12-16 Thread Sachit Murarka
Hi All,

I have created a wheel file and I am using the following command to run the
spark job:

spark-submit --py-files application.whl main_flow.py

My application is unable to reference the modules. Do I need to do the pip
install of the wheel first?

Kind Regards,
Sachit Murarka


Streaming job taking all executors

2020-12-13 Thread Sachit Murarka
Hi All,

I am using Standalone Spark.

I am using dynamic memory allocation. Despite giving max executors, min
executors and initial executors, my  streaming job is taking all executors
available in the cluster. Could anyone please suggest what can be wrong
here?

Please note source is Kafka.

I feel this can be avoided by setting max cores per application. But why
this is happening if max executors is set and also what can be other best
ways to avoid that.

Thanks,
Sachit


Re: Regexp_extract not giving correct output

2020-12-02 Thread Sachit Murarka
Hi Sean,

Thanks for quick response!

I have tried with string literal 'r' as a prefix that also gave an empty
result..
spark.sql(r"select regexp_extract('[11] [22]
[33]','(^\[OrderID:\s)?(?(1).*\]\s\[UniqueID:\s([a-z0-9A-Z]*)\].*|\[.*\]\s\[([a-z0-9A-Z]*)\].*)',1)
as anyid").show()

and as I mentioned when I am using 2 backslashes it is giving an exception
as follows:
  : java.util.regex.PatternSyntaxException: Unknown inline modifier near
index 21
(^\[OrderID:\s)?(?(1).*\]\s\[UniqueID:\s([a-z0-9A-Z]*)\].*|\[.*\]\s\[([a-z0-9A-Z]*)\].*)

Kind Regards,
Sachit Murarka


On Wed, Dec 2, 2020 at 9:07 PM Sean Owen  wrote:

> As in Java/Scala, in Python you'll need to escape the backslashes with \\.
> "\[" means just "[" in a string. I think you could also prefix the string
> literal with 'r' to disable Python's handling of escapes.
>
> On Wed, Dec 2, 2020 at 9:34 AM Sachit Murarka 
> wrote:
>
>> Hi All,
>>
>> I am using Pyspark to get the value from a column on basis of regex.
>>
>> Following is the regex which I am using:
>>
>> (^\[OrderID:\s)?(?(1).*\]\s\[UniqueID:\s([a-z0-9A-Z]*)\].*|\[.*\]\s\[([a-z0-9A-Z]*)\].*)
>>
>> df = spark.createDataFrame([("[1234] [] [] [66]",),
>> ("abcd",)],["stringValue"])
>>
>> result = df.withColumn('extracted value',
>> F.regexp_extract(F.col('stringValue'),
>> '(^\[OrderID:\s)?(?(1).*\]\s\[UniqueID:\s([a-z0-9A-Z]*)\].*|\[.*\]\s\[([a-z0-9A-Z]*)\].*)',
>> 1))
>>
>> I have tried with spark.sql as well. It is giving empty output.
>>
>> I have tested this regex , it is working fine on an online regextester .
>> But it is not working in spark . I know spark needs Java based regex ,
>> hence I tried escaping also , that gave exception:
>> : java.util.regex.PatternSyntaxException: Unknown inline modifier near
>> index 21
>>
>> (^\[OrderID:\s)?(?(1).*\]\s\[UniqueID:\s([a-z0-9A-Z]*)\].*|\[.*\]\s\[([a-z0-9A-Z]*)\].*)
>>
>>
>> Can you please help here?
>>
>> Kind Regards,
>> Sachit Murarka
>>
>


Regexp_extract not giving correct output

2020-12-02 Thread Sachit Murarka
Hi All,

I am using Pyspark to get the value from a column on basis of regex.

Following is the regex which I am using:
(^\[OrderID:\s)?(?(1).*\]\s\[UniqueID:\s([a-z0-9A-Z]*)\].*|\[.*\]\s\[([a-z0-9A-Z]*)\].*)

df = spark.createDataFrame([("[1234] [] [] [66]",),
("abcd",)],["stringValue"])

result = df.withColumn('extracted value',
F.regexp_extract(F.col('stringValue'),
'(^\[OrderID:\s)?(?(1).*\]\s\[UniqueID:\s([a-z0-9A-Z]*)\].*|\[.*\]\s\[([a-z0-9A-Z]*)\].*)',
1))

I have tried with spark.sql as well. It is giving empty output.

I have tested this regex , it is working fine on an online regextester .
But it is not working in spark . I know spark needs Java based regex ,
hence I tried escaping also , that gave exception:
: java.util.regex.PatternSyntaxException: Unknown inline modifier near
index 21
(^\[OrderID:\s)?(?(1).*\]\s\[UniqueID:\s([a-z0-9A-Z]*)\].*|\[.*\]\s\[([a-z0-9A-Z]*)\].*)


Can you please help here?

Kind Regards,
Sachit Murarka


Need Unit test complete reference for Pyspark

2020-11-18 Thread Sachit Murarka
Hi Users,

I have to write Unit Test cases for PySpark.
I think pytest-spark and "spark testing base" are good test libraries.

Can anyone please provide full reference for writing the test cases in
Python using these?

Kind Regards,
Sachit Murarka


Need help on Calling Pyspark code using Wheel

2020-10-23 Thread Sachit Murarka
Hi Users,

I have created a wheel file using Poetry. I tried running the following
commands to run spark job using wheel , but it is not working. Can anyone
please let me know about the invocation step for the wheel file?

spark-submit --py-files /path/to/wheel
spark-submit --files /path/to/wheel

Thanks
Sachit

Kind Regards,
Sachit Murarka


Re: Multiple applications being spawned

2020-10-13 Thread Sachit Murarka
Hi Jayesh,

Its not executor process. Its application( job itself) is getting called
multiple times. Like a recursion. Problem seems mainly in ZipWithIndex

Thanks
Sachit

On Tue, 13 Oct 2020, 22:40 Lalwani, Jayesh,  wrote:

> Where are you running your Spark cluster? Can you post the command line
> that you are using to run your application?
>
>
>
> Spark is designed to process a lot of data by distributing work to a
> cluster of a machines. When you submit a job, it starts executor processes
> on the cluster. So, what you are seeing is somewhat expected, (although 25
> processes on a single node seem too high)
>
>
>
> *From: *Sachit Murarka 
> *Date: *Tuesday, October 13, 2020 at 8:15 AM
> *To: *spark users 
> *Subject: *RE: [EXTERNAL] Multiple applications being spawned
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> Adding Logs.
>
>
>
> When it launches the multiple applications , following logs get generated
> on the terminal
> Also it retries the task always:
>
> 20/10/13 12:04:30 WARN TaskSetManager: Lost task XX in stage XX (TID XX,
> executor 5): java.net.SocketException: Broken pipe (Write failed)
> at java.net.SocketOutputStream.socketWrite0(Native Method)
> at
> java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
> at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
> at
> java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
> at java.io.DataOutputStream.write(DataOutputStream.java:107)
> at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
> at
> org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:212)
> at
> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:224)
> at
> org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:224)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at
> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
> at
> org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
> at
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
>     at
> org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
>
>
>
> Kind Regards,
> Sachit Murarka
>
>
>
>
>
> On Tue, Oct 13, 2020 at 4:02 PM Sachit Murarka 
> wrote:
>
> Hi Users,
>
> When action(I am using count and write) gets executed in my spark job , it
> launches many more application instances(around 25 more apps).
>
> In my spark code ,  I am running the transformations through Dataframes
> then converting dataframe to rdd then applying zipwithindex , then
> converting it back to dataframe and then applying 2 actions(Count & Write).
>
>
>
> Please note : This was working fine till the previous week, it has started
> giving this issue since yesterday.
>
>
> Could you please tell what can be the reason for this behavior?
>
>
> Kind Regards,
> Sachit Murarka
>
>


Re: Multiple applications being spawned

2020-10-13 Thread Sachit Murarka
Adding Logs.

When it launches the multiple applications , following logs get generated
on the terminal
Also it retries the task always:

20/10/13 12:04:30 WARN TaskSetManager: Lost task XX in stage XX (TID XX,
executor 5): java.net.SocketException: Broken pipe (Write failed)
at java.net.SocketOutputStream.socketWrite0(Native Method)
at
java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at
org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:212)
at
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:224)
at
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:224)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
at
org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
at
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)

Kind Regards,
Sachit Murarka


On Tue, Oct 13, 2020 at 4:02 PM Sachit Murarka 
wrote:

> Hi Users,
>
> When action(I am using count and write) gets executed in my spark job , it
> launches many more application instances(around 25 more apps).
>
> In my spark code ,  I am running the transformations through Dataframes
> then converting dataframe to rdd then applying zipwithindex , then
> converting it back to dataframe and then applying 2 actions(Count & Write).
>
> Please note : This was working fine till the previous week, it has started
> giving this issue since yesterday.
>
> Could you please tell what can be the reason for this behavior?
>
> Kind Regards,
> Sachit Murarka
>


Multiple applications being spawned

2020-10-13 Thread Sachit Murarka
Hi Users,

When action(I am using count and write) gets executed in my spark job , it
launches many more application instances(around 25 more apps).

In my spark code ,  I am running the transformations through Dataframes
then converting dataframe to rdd then applying zipwithindex , then
converting it back to dataframe and then applying 2 actions(Count & Write).

Please note : This was working fine till the previous week, it has started
giving this issue since yesterday.

Could you please tell what can be the reason for this behavior?

Kind Regards,
Sachit Murarka


Re: Job is not able to perform Broadcast Join

2020-10-06 Thread Sachit Murarka
Yes, Even I tried the same first. Then I moved to join method because
shuffle spill was happening because row num without partition happens on
single task. Instead of processinf entire dataframe on single task. I have
broken down that into df1 and df2 and joining.
Because df2 is having very less data set since it has 2 cols only.

Thanks
Sachit

On Wed, 7 Oct 2020, 01:04 Eve Liao,  wrote:

> Try to avoid broadcast. Thought this:
> https://towardsdatascience.com/adding-sequential-ids-to-a-spark-dataframe-fa0df5566ff6
> could be helpful.
>
> On Tue, Oct 6, 2020 at 12:18 PM Sachit Murarka 
> wrote:
>
>> Thanks Eve for response.
>>
>> Yes I know we can use broadcast for smaller datasets,I increased the
>> threshold (4Gb) for the same then also it did not work. and the df3 is
>> somewhat greater than 2gb.
>>
>> Trying by removing broadcast as well.. Job is running since 1 hour. Will
>> let you know.
>>
>>
>> Thanks
>> Sachit
>>
>> On Wed, 7 Oct 2020, 00:41 Eve Liao,  wrote:
>>
>>> How many rows does df3 have? Broadcast joins are a great way to append
>>> data stored in relatively *small* single source of truth data files to
>>> large DataFrames. DataFrames up to 2GB can be broadcasted so a data file
>>> with tens or even hundreds of thousands of rows is a broadcast candidate.
>>> Your broadcast variable is probably too large.
>>>
>>> On Tue, Oct 6, 2020 at 11:37 AM Sachit Murarka 
>>> wrote:
>>>
>>>> Hello Users,
>>>>
>>>> I am facing an issue in spark job where I am doing row number() without
>>>> partition by clause because I need to add sequential increasing IDs.
>>>> But to avoid the large spill I am not doing row number() over the
>>>> complete data frame.
>>>>
>>>> Instead I am applying monotically_increasing id on actual data set ,
>>>> then create a new data frame from original data frame which will have
>>>> just monotically_increasing id.
>>>>
>>>> So DF1 = All columns + monotically_increasing_id
>>>> DF2 = Monotically_increasingID
>>>>
>>>> Now I am applying row number() on DF2 since this is a smaller
>>>> dataframe.
>>>>
>>>> DF3 = Monotically_increasingID + Row_Number_ID
>>>>
>>>> Df.join(broadcast(DF3))
>>>>
>>>> This will give me sequential increment id in the original Dataframe.
>>>>
>>>> But below is the stack trace.
>>>>
>>>> py4j.protocol.Py4JJavaError: An error occurred while calling
>>>> o180.parquet.
>>>> : org.apache.spark.SparkException: Job aborted.
>>>> at
>>>> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
>>>> at
>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
>>>> at
>>>> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
>>>> at
>>>> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
>>>> at
>>>> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
>>>> at
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>> at
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>> at
>>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>>>> at
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> at
>>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>> at
>>>> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>> at
>>>> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
>>>> at
>>>> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
>>>> at
>>>> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>>>> at
>>>> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)

Re: Job is not able to perform Broadcast Join

2020-10-06 Thread Sachit Murarka
Thanks Eve for response.

Yes I know we can use broadcast for smaller datasets,I increased the
threshold (4Gb) for the same then also it did not work. and the df3 is
somewhat greater than 2gb.

Trying by removing broadcast as well.. Job is running since 1 hour. Will
let you know.


Thanks
Sachit

On Wed, 7 Oct 2020, 00:41 Eve Liao,  wrote:

> How many rows does df3 have? Broadcast joins are a great way to append
> data stored in relatively *small* single source of truth data files to
> large DataFrames. DataFrames up to 2GB can be broadcasted so a data file
> with tens or even hundreds of thousands of rows is a broadcast candidate.
> Your broadcast variable is probably too large.
>
> On Tue, Oct 6, 2020 at 11:37 AM Sachit Murarka 
> wrote:
>
>> Hello Users,
>>
>> I am facing an issue in spark job where I am doing row number() without
>> partition by clause because I need to add sequential increasing IDs.
>> But to avoid the large spill I am not doing row number() over the
>> complete data frame.
>>
>> Instead I am applying monotically_increasing id on actual data set ,
>> then create a new data frame from original data frame which will have
>> just monotically_increasing id.
>>
>> So DF1 = All columns + monotically_increasing_id
>> DF2 = Monotically_increasingID
>>
>> Now I am applying row number() on DF2 since this is a smaller dataframe.
>>
>> DF3 = Monotically_increasingID + Row_Number_ID
>>
>> Df.join(broadcast(DF3))
>>
>> This will give me sequential increment id in the original Dataframe.
>>
>> But below is the stack trace.
>>
>> py4j.protocol.Py4JJavaError: An error occurred while calling o180.parquet.
>> : org.apache.spark.SparkException: Job aborted.
>> at
>> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
>> at
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
>> at
>> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
>> at
>> org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
>> at
>> org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>> at
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>> at
>> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>> at
>> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
>> at
>> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
>> at
>> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>> at
>> org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>> at
>> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>> at
>> org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
>> at
>> org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
>> at
>> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
>> at
>> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
>> at
>> org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
&

Job is not able to perform Broadcast Join

2020-10-06 Thread Sachit Murarka
Hello Users,

I am facing an issue in spark job where I am doing row number() without
partition by clause because I need to add sequential increasing IDs.
But to avoid the large spill I am not doing row number() over the complete
data frame.

Instead I am applying monotically_increasing id on actual data set ,
then create a new data frame from original data frame which will have just
monotically_increasing id.

So DF1 = All columns + monotically_increasing_id
DF2 = Monotically_increasingID

Now I am applying row number() on DF2 since this is a smaller dataframe.

DF3 = Monotically_increasingID + Row_Number_ID

Df.join(broadcast(DF3))

This will give me sequential increment id in the original Dataframe.

But below is the stack trace.

py4j.protocol.Py4JJavaError: An error occurred while calling o180.parquet.
: org.apache.spark.SparkException: Job aborted.
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
at
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at
org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at
org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Could not execute broadcast in
1000 secs. You can increase the timeout for broadcasts via
spark.sql.broadcastTimeout or disable broadcast join by setting
spark.sql.autoBroadcastJoinThreshold to -1

Initially this threshold was 300. I already increased it.


Kind Regards,
Sachit Murarka


Executor Lost Spark Issue

2020-10-06 Thread Sachit Murarka
Hello,

I have to write the aggregated output stored in DF(about 3GB) in a single
file for that I have tried using repartition(1) as well as coalesce(1).

My Job is failing with the following Exception:


ExecutorLostFailure (executor 32 exited caused by one of the running tasks)
Reason: Remote RPC client disassociated. Likely due to containers exceeding
thresholds, or network issues. Check driver logs for WARN messages.

ava.lang.OutOfMemoryError
at 
java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at 
net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:220)
at 
net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:173)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at 
org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:554)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:258)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)



Could you please suggest something? I have sufficient memory in executors
and the driver as well.

Kind Regards,
Sachit Murarka


Re: PGP Encrypt using spark Scala

2019-08-26 Thread Sachit Murarka
Hi Deepak,

Thanks for reply.

Yes. That is the option I am considering now because even apache camel
needs data in local.  I might need to copy data from hdfs to local if I
want apache camel ( to get rid of shell).

Thanks
Sachit

On Mon, 26 Aug 2019, 21:11 Deepak Sharma,  wrote:

> Hi Schit
> PGP Encrypt is something that is not inbuilt with spark.
> I would suggest writing a shell script that would do pgp encrypt and use
> it in spark scala program , which would run from driver.
>
> Thanks
> Deepak
>
> On Mon, Aug 26, 2019 at 8:10 PM Sachit Murarka 
> wrote:
>
>> Hi All,
>>
>> I want to encrypt my files available at HDFS location using PGP Encryption
>> How can I do it in spark. I saw Apache Camel  but it seems camel is used
>> when source files are in Local location rather than HDFS.
>>
>> Kind Regards,
>> Sachit Murarka
>>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>


PGP Encrypt using spark Scala

2019-08-26 Thread Sachit Murarka
Hi All,

I want to encrypt my files available at HDFS location using PGP Encryption
How can I do it in spark. I saw Apache Camel  but it seems camel is used
when source files are in Local location rather than HDFS.

Kind Regards,
Sachit Murarka


Re: Implementing Upsert logic Through Streaming

2019-06-30 Thread Sachit Murarka
Hi Chris,

I have to make sure my DB has updated value for any record at a given point
of time.
Say following is data. I have to take 4th row for EmpId 2.
Also if any Emp details are already there in Oracle.  I have to update it
with latest value in the stream.

EmpId,  salary,  timestamp
1, 1000 , 1234
2, 2000, 2234
3, 2000,3234
2, 2100,4234

Thanks
Sachit

On Mon, 1 Jul 2019, 01:46 Chris Teoh,  wrote:

> Just thinking on this, if your needs can be addressed using batch instead
> of streaming, I think this is a viable solution. Using a lambda
> architecture approach seems like a possible solution.
>
> On Sun., 30 Jun. 2019, 9:54 am Chris Teoh,  wrote:
>
>> Not sure what your needs are here.
>>
>> If you can afford to wait, increase your micro batch windows to a long
>> period of time, aggregate your data by key every micro batch and then apply
>> those changes to the Oracle database.
>>
>> Since you're using text file to stream, there's no way to pre partition
>> your stream. If you're using Kafka, you could partition by record key and
>> do the summarisation that way before applying the changes to Oracle.
>>
>> I hope that helps.
>>
>> On Tue., 25 Jun. 2019, 9:43 pm Sachit Murarka, 
>> wrote:
>>
>>> Hi All,
>>>
>>> I will get records continously in text file form(Streaming). It will
>>> have timestamp as field also.
>>>
>>> Target is Oracle Database.
>>>
>>> My Goal is to maintain latest record for a key in Oracle. Could you
>>> please suggest how this can be implemented efficiently?
>>>
>>> Kind Regards,
>>> Sachit Murarka
>>>
>>


Implementing Upsert logic Through Streaming

2019-06-25 Thread Sachit Murarka
Hi All,

I will get records continously in text file form(Streaming). It will have
timestamp as field also.

Target is Oracle Database.

My Goal is to maintain latest record for a key in Oracle. Could you please
suggest how this can be implemented efficiently?

Kind Regards,
Sachit Murarka


1 task per executor

2019-05-28 Thread Sachit Murarka
Hi All,

I am using spark 2.2
I have enabled spark dynamic allocation with executor cores 4, driver cores
4 and executor memory 12GB driver memory 10GB.

In Spark UI, I see only 1 task is launched per executor.

Could anyone please help on this?

Kind Regards,
Sachit Murarka


NoClassDefFoundError

2019-05-21 Thread Sachit Murarka
Hi All,

I have simply added exception handling in my code in Scala. I am
getting NoClassDefFoundError . Any leads would be appreciated.

Thanks

Kind Regards,
Sachit Murarka