scala> def getRootRdd( rdd:RDD[_] ): RDD[_]  = { if (rdd.dependencies.size == 
0) rdd else getRootRdd(rdd.dependencies(0).rdd)}
getRootRdd: (rdd: org.apache.spark.rdd.RDD[_])org.apache.spark.rdd.RDD[_]

scala> val rdd = spark.read.parquet("/Users/russellspitzer/Temp/local").rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[38] 
at rdd at <console>:24

scala> val scan = getRootRdd(rdd)
scan: org.apache.spark.rdd.RDD[_] = FileScanRDD[33] at rdd at <console>:24

scala> scan.partitions.map(scan.preferredLocations)
res8: Array[Seq[String]] = Array(WrappedArray(), WrappedArray(), 
WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray(), 
WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray())

I define a quick traversal to get the source RDD for the dataframe operation. I 
make the read datafrarne and get the RDD out of it. I traverse the RDD's 
dependencies to get the FileScan. I then apply the scan's preferredLocations 
methods to each partition. You can see the result here is that none of my 
partitions have a preferred location so they will all be run at "Any". This is 
because I'm using my local file system which never reports a preferred location 
so even though the scheduler will report "ANY" in this case they are actually 
node local.


> On Apr 13, 2021, at 8:37 AM, Mohamadreza Rostami 
> <mohamadrezarosta...@gmail.com> wrote:
> 
> Thanks for your response.
> I think my HDFS-spark cluster is co-localized because I have a spark worker 
> per each datanode; in other words, I installed the spark workers on 
> datanodes, and I think that's the point that why this simple query on a 
> co-localized HDFS-spark cluster run in "Any" locality level?
> Is there any way to figure out which IP or hostname of data-nodes returns 
> from name-node to the spark? or Can you offer me a debug approach?
> 
>> On Farvardin 24, 1400 AP, at 17:45, Russell Spitzer 
>> <russell.spit...@gmail.com <mailto:russell.spit...@gmail.com>> wrote:
>> 
>> Data locality can only occur if the Spark Executor IP address string matches 
>> the preferred location returned by the file system. So this job would only 
>> have local tasks if the datanode replicas for the files in question had the 
>> same ip address as the Spark executors you are using. If they don't then the 
>> scheduler falls back to assigning read tasks to the first executor available 
>> with locality level "any". 
>> 
>> So unless you have that HDFS - Spark Cluster co-localization I wouldn't 
>> expect this job to run at any other locality level than ANY.
>> 
>>> On Apr 13, 2021, at 3:47 AM, Mohamadreza Rostami 
>>> <mohamadrezarosta...@gmail.com <mailto:mohamadrezarosta...@gmail.com>> 
>>> wrote:
>>> 
>>> I have a Hadoop cluster that uses Apache Spark to query parquet files saved 
>>> on Hadoop. For example, when i'm using the following PySpark code to find a 
>>> word in parquet files:
>>> df = spark.read.parquet("hdfs://test/parquets/* <hdfs://test/parquets/*>")
>>> df.filter(df['word'] == "jhon").show()
>>> After running this code, I go to spark application UI, stages tab, I see 
>>> that locality level summery set on Any. In contrast, because of this 
>>> query's nature, it must run locally and on NODE_LOCAL locality level at 
>>> least. When I check the network IO of the cluster while running this, I 
>>> find out that this query use network (network IO increases while the query 
>>> is running). The strange part of this situation is that the number shown in 
>>> the spark UI's shuffle section is very small.
>>> How can I find out the root cause of this problem and solve that?
>>> link of stackoverflow.com <http://stackoverflow.com/> : 
>>> https://stackoverflow.com/questions/66612906/problem-with-data-locality-when-running-spark-query-with-local-nature-on-apache
>>>  
>>> <https://stackoverflow.com/questions/66612906/problem-with-data-locality-when-running-spark-query-with-local-nature-on-apache>
> 

Reply via email to