Re: spark on k8s - can driver and executor have separate checkpoint location?

2020-05-16 Thread Ali Gouta
Hello,

I am wondering if you do so, then all your executor pods should run on the
same kubernetes worker node since you mount a single volume with a
ReadWriteOnce policy. By design this seems not to be good I assume. You may
need to have a kind of ReadWriteMany policy associated to the volume. Then
have pod anti-affinity to make sure they are not running on the same node.
You may achieve this by running an NFS fiesystem and then create a PV/PVC
that mounts to that shared file system. The persistentVolumeClaim defined
in your Yaml should call the PVC you created.

Best regards,
Ali Gouta.

On Sat, May 16, 2020 at 6:06 AM wzhan  wrote:

> Hi guys,
>
> I'm running spark applications on kubernetes. According to spark
> documentation
>
> https://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
> Spark needs distributed file system to store its checkpoint data so that in
> case of failure, it can recover from checkpoint directory.
>
> My question is, can driver and executor have separate checkpoint location?
> I'm asking this because driver and executor might be deployed on different
> nodes. A shared checkpoint location will require ReadWriteMany access mode.
> Since I only have a storage class that supports ReadWriteOnce access mode
> I'm trying to find some workaround.
>
>
> In Spark Streaming Guide, it mentioned "Failed driver can be restarted from
> checkpoint information" and when executor failed, "Tasks and receivers
> restarted by Spark automatically, no config needed".
>
> Given this I tried only config checkpoint location for driver pod. It
> immediately failed with below exception:
>
> 2020-05-15T14:20:17.142 [stream execution thread for baseline_windower [id
> =
> b14190ed-0fb2-4d0e-82d3-b3a3bf101712, runId =
> f246774e-5a20-4bfb-b997-4d06c344bb0f]hread] ERROR
> org.apache.spark.sql.execution.streaming.MicroBatchExecution - Query
> baseline_windower [id = b14190ed-0fb2-4d0e-82d3-b3a3bf101712, runId =
> f246774e-5a20-4bfb-b997-4d06c344bb0f] terminated with error
> org.apache.spark.SparkException: Writing job aborted.
> at
>
> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
> at
>
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at
>
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at
>
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
>
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> ...
> at
>
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
> Caused by: org.apache.spark.SparkException: Job aborted due to stage
> failure: Task 0 in stage 11.0 failed 4 times, most recent failure: Lost
> task
> 0.3 in stage 11.0 (TID 420, 10.233.124.162, executor 1):
> java.io.IOException: mkdir of
> file:/opt/window-data/baseline_windower/state/0/0 failed
>
>
> So I tried giving separate checkpoint location to driver and executor:
>
> In spark application helm chart I have a checkpointlocation configuration:
>
> spec:
>sparkConf:
>  "spark.sql.streaming.checkpointLocation":
> "file:///opt/checkpoint-data"
>
> I created two checkpoint pvc and mount the volume for driver and executor
> pod:
>
>   volumes:
> - name: checkpoint-driver-volume
>   persistentVolumeClaim:
> claimName: checkpoint-driver-pvc
> - name: checkpoint-executor-volume
>   persistentVolumeClaim:
> claimName: checkpoint-executor-pvc
>
> driver:
> volumeMounts:
>   - name: checkpoint-driver-volume
> mountPath: "/opt/checkpoint-data"
> ...
> executor:
> volumeMounts:
>   - name: checkpoint-executor-volume
> mountPath: "/opt/checkpoint-data"
>
> After deployment it seems to be working. I tried restarted driver pod and
> it
> did recover from checkpoint directory. But I'm not sure if this is actually
> supported by design.
>
> Thanks,
> wzhan
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: How to change Dataframe schema

2020-05-16 Thread Adi Polak
Hi Manjunath,
Can you share the data example?
>From the information shared above, it seems that you will need to apply
mapping with custom logic on the rows in your RDD to be consistent before
you can apply the schema.

I recommend reading about the mapping functionality here:
https://data-flair.training/blogs/apache-spark-map-vs-flatmap/

I hope it helps!

-Adi

On Sat, 16 May 2020 at 17:50, Manjunath Shetty H 
wrote:

> Hi,
>
> I have a dataframe with some columns and data that is fetched from JDBC,
> as i have to maintain the schema consistent in the ORC file i have to apply
> different schema for that dataframe. Column names will be same, but Data or
> Schema may contain some extra columns.
>
> Is there any way i can apply the schema on top the existing Dataframe ?.
> Schema may be just doing the columns reordering in the most of the cases.
>
> i have tried this "
>
> DataFrame dfNew = hc.createDataFrame(df.rdd(), ((StructType) 
> DataType.fromJson(schema)));
>
> "
>
> But this will map the columns based on index and it will fail in case of
> columns reordering.
>
> Any pointers will be helpful.
>
> Thanks and Regards
> Manjunath Shetty
>


unsubscribe

2020-05-16 Thread Punna Yenumala



Re: unsubscribe

2020-05-16 Thread Hichame El Khalfi


From: Basavaraj 
Sent: Friday, May 15, 2020 9:12:01 PM
To: spark users
Subject: unsubscribe




How to change Dataframe schema

2020-05-16 Thread Manjunath Shetty H
Hi,

I have a dataframe with some columns and data that is fetched from JDBC, as i 
have to maintain the schema consistent in the ORC file i have to apply 
different schema for that dataframe. Column names will be same, but Data or 
Schema may contain some extra columns.

Is there any way i can apply the schema on top the existing Dataframe ?. Schema 
may be just doing the columns reordering in the most of the cases.

i have tried this "

DataFrame dfNew = hc.createDataFrame(df.rdd(), ((StructType) 
DataType.fromJson(schema)));

"

But this will map the columns based on index and it will fail in case of 
columns reordering.

Any pointers will be helpful.

Thanks and Regards
Manjunath Shetty