Is Spark rdd.toDF() thread-safe?

2021-03-17 Thread yujhe.li
Hi, 

I have an application that runs in a Spark-2.4.4 cluster and it transforms
two RDD to DataFrame with `rdd.toDF()` then outputs them to file. 

For slave resource usage optimization, the application executes the job in
multi-thread. The code snippet looks like this:



And I found that `toDF()` is not thread-safe. The application failed
sometimes by `java.lang.UnsupportedOperationException`.
You can reproduce it from the following code snippet (1% to happen, it's
easier to happen when the case class has a large number of fields.):






You may get a similar exception message: `Schema for type A is not
supported`



>From the error message, it was caused by `ScalaReflection.schemaFor()`. 
I had looked at the code, it seems like Spark uses Scala reflection to get
the data type and as I know there is a concurrency issue in Scala
reflection. 

SPARK-26555   
thread-safety-in-scala-reflection-with-type-matching

  

Should we fix it? I can not find any document about thread-safe in creating
DataFrame.

I had workarounded this by adding a lock when transforming RDD to DataFrame.





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Reading RDD by (key, data) from s3

2019-04-16 Thread yujhe.li
You can't, sparkcontext is a singleton object. You have to use hadoop library
or aws client to read files on s3.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: read snappy compressed files in spark

2018-09-01 Thread yujhe.li
What's your Spark version?
Do you have added hadoop native library to your path? like
"spark.executor.extraJavaOptions -Djava.library.path=/hadoop-native/" in
spark-defaults.conf.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Pass config file through spark-submit

2018-08-16 Thread yujhe.li
So can you read the file on executor side?
I think the file passed by --files my.app.conf would be added under
classpath, and you can use it directly.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Repartition not working on a csv file

2018-06-30 Thread yujhe.li
Abdeali Kothari wrote
> My entire CSV is less than 20KB.
> By somewhere in between, I do a broadcast join with 3500 records in
> another
> file.
> After the broadcast join I have a lot of processing to do. Overall, the
> time to process a single record goes up-to 5mins on 1 executor
> 
> I'm trying to increase the partitions that my data is in so that I have at
> maximum 1 record per executor (currently it sets 2 tasks, and hence 2
> executors... I want it to split it into at least 100 tasks at a time so I
> get 5 records per task => ~20min per task)

Maybe you can try repartition(100) after broadcast join, the task number
should change to 100 for your later transformation.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Repartition not working on a csv file

2018-06-30 Thread yujhe.li
Abdeali Kothari wrote
> I am using Spark 2.3.0 and trying to read a CSV file which has 500
> records.
> When I try to read it, spark says that it has two stages: 10, 11 and then
> they join into stage 12.

What's your CSV size per file? I think Spark optimizer may put many files
into one task when reading small files.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Setting log level to DEBUG while keeping httpclient.wire on WARN

2018-06-30 Thread yujhe.li
Daniel Haviv wrote
> Hi,
> I'm trying to debug an issue with Spark so I've set log level to DEBUG but
> at the same time I'd like to avoid the httpclient.wire's verbose output by
> setting it to WARN.
> 
> I tried the following log4.properties config but I'm still getting DEBUG
> outputs for httpclient.wire:

Can you paste your DEBUG message? and how you pass log4j.properties to Spark
application?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org