Hi,

I have a simple ETL application, where the data source schama needs to be 
sanitized. Column names might include special characters that need to be 
removed. For example, from "some{column}" to "some_column".
Normally I'd just alias the columns, but in this case the schema can have 
thousands of deeply nested columns. Creating a new StructType feels more 
intuitive and simpler, but the only way I know of to apply the new schema is to 
create a new dataframe -
spark.createDataFrame(old_df.rdd, new_schema). This makes the deserialization 
and re-serialization of the dataframe the most expensive operation in that 
"simple" ETL app.

To make things worse, since it's a pyspark application, the RDD is treated as 
Python RDD and all the data is moving from the JVM to Python and back, without 
any real transformation.
This is resolved by creating the new DF on the JVM only:

jschema = 
spark._sc._jvm.org.apache.spark.sql.types.DataType.fromJson(sanitized_schema.json())
sanitized_df = DataFrame(spark._jsparkSession.createDataFrame(df._jdf.rdd(), 
jschema), spark)

Is there another way to do a bulk rename operation? I'd like to avoid creating 
some uber "select" statement with aliases, or multiple withColumnRenamed 
operations, as much as possible, mainly for maintenance reasons.

Thanks
  • PySpark schema sanitization Shay Elbaz

Reply via email to