scala> def getRootRdd( rdd:RDD[_] ): RDD[_] = { if (rdd.dependencies.size == 0) rdd else getRootRdd(rdd.dependencies(0).rdd)} getRootRdd: (rdd: org.apache.spark.rdd.RDD[_])org.apache.spark.rdd.RDD[_]
scala> val rdd = spark.read.parquet("/Users/russellspitzer/Temp/local").rdd rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[38] at rdd at <console>:24 scala> val scan = getRootRdd(rdd) scan: org.apache.spark.rdd.RDD[_] = FileScanRDD[33] at rdd at <console>:24 scala> scan.partitions.map(scan.preferredLocations) res8: Array[Seq[String]] = Array(WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray(), WrappedArray()) I define a quick traversal to get the source RDD for the dataframe operation. I make the read datafrarne and get the RDD out of it. I traverse the RDD's dependencies to get the FileScan. I then apply the scan's preferredLocations methods to each partition. You can see the result here is that none of my partitions have a preferred location so they will all be run at "Any". This is because I'm using my local file system which never reports a preferred location so even though the scheduler will report "ANY" in this case they are actually node local. > On Apr 13, 2021, at 8:37 AM, Mohamadreza Rostami > <mohamadrezarosta...@gmail.com> wrote: > > Thanks for your response. > I think my HDFS-spark cluster is co-localized because I have a spark worker > per each datanode; in other words, I installed the spark workers on > datanodes, and I think that's the point that why this simple query on a > co-localized HDFS-spark cluster run in "Any" locality level? > Is there any way to figure out which IP or hostname of data-nodes returns > from name-node to the spark? or Can you offer me a debug approach? > >> On Farvardin 24, 1400 AP, at 17:45, Russell Spitzer >> <russell.spit...@gmail.com <mailto:russell.spit...@gmail.com>> wrote: >> >> Data locality can only occur if the Spark Executor IP address string matches >> the preferred location returned by the file system. So this job would only >> have local tasks if the datanode replicas for the files in question had the >> same ip address as the Spark executors you are using. If they don't then the >> scheduler falls back to assigning read tasks to the first executor available >> with locality level "any". >> >> So unless you have that HDFS - Spark Cluster co-localization I wouldn't >> expect this job to run at any other locality level than ANY. >> >>> On Apr 13, 2021, at 3:47 AM, Mohamadreza Rostami >>> <mohamadrezarosta...@gmail.com <mailto:mohamadrezarosta...@gmail.com>> >>> wrote: >>> >>> I have a Hadoop cluster that uses Apache Spark to query parquet files saved >>> on Hadoop. For example, when i'm using the following PySpark code to find a >>> word in parquet files: >>> df = spark.read.parquet("hdfs://test/parquets/* <hdfs://test/parquets/*>") >>> df.filter(df['word'] == "jhon").show() >>> After running this code, I go to spark application UI, stages tab, I see >>> that locality level summery set on Any. In contrast, because of this >>> query's nature, it must run locally and on NODE_LOCAL locality level at >>> least. When I check the network IO of the cluster while running this, I >>> find out that this query use network (network IO increases while the query >>> is running). The strange part of this situation is that the number shown in >>> the spark UI's shuffle section is very small. >>> How can I find out the root cause of this problem and solve that? >>> link of stackoverflow.com <http://stackoverflow.com/> : >>> https://stackoverflow.com/questions/66612906/problem-with-data-locality-when-running-spark-query-with-local-nature-on-apache >>> >>> <https://stackoverflow.com/questions/66612906/problem-with-data-locality-when-running-spark-query-with-local-nature-on-apache> >