[ https://issues.apache.org/jira/browse/SPARK-36950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17641320#comment-17641320 ]
Bjørn Jørgensen commented on SPARK-36950: ----------------------------------------- [~hyukjin.kwon] I have updated the post now. This is a problem that is a little bit over my knowledge to implement in the core code. I have a hope that there is someone else that can have a look at it and get it in the code base. > Normalize semi-structured data into a flat table. > ------------------------------------------------- > > Key: SPARK-36950 > URL: https://issues.apache.org/jira/browse/SPARK-36950 > Project: Spark > Issue Type: Improvement > Components: PySpark > Affects Versions: 3.4.0 > Reporter: Bjørn Jørgensen > Priority: Major > > Many users get seminested data form JSON or XML. > There are some problems with querying this data, where there are nested > fields. > In pandas there is a > [json_normalize|https://github.com/pandas-dev/pandas/blob/v1.3.3/pandas/io/json/_normalize.py#L112-L353] > function that flat out nested dicts. > Here are some examples for the use of those [Flatten Complex Nested JSON > (PYSPARK)|https://stackoverflow.com/questions/73599398/flatten-complex-nested-json-pyspark/73666330#73666330] > [Unable to load jsonl nested file into a flattened > dataframe|https://stackoverflow.com/questions/73546452/unable-to-load-jsonl-nested-file-into-a-flattened-dataframe/73594355#73594355] > With pandas users can use this function > > {code:java} > def flatten_pandas(df_): > #The same as flatten but for pandas > have_list = df_.columns[df_.applymap(lambda x: isinstance(x, > list)).any()].tolist() > have_dict = df_.columns[df_.applymap(lambda x: isinstance(x, > dict)).any()].tolist() > have_nested = len(have_list) + len(have_dict) > > while have_nested!=0: > if len(have_list)!=0: > for _ in have_list: > df_ = df_.explode(_) > > elif have_dict !=0: > df_ = pd.json_normalize(json.loads(df_.to_json(force_ascii=False, > orient="records")), sep=":") > > have_list = df_.columns[df_.applymap(lambda x: isinstance(x, > list)).any()].tolist() > have_dict = df_.columns[df_.applymap(lambda x: isinstance(x, > dict)).any()].tolist() > have_nested = len(have_list) + len(have_dict) > > return df_ > {code} > > With pyspark or pandas_api we don't have a function for getting dict to > columns implemented. > These are the functions that I'm using to do the same in pyspark. > {code:java} > from pyspark.sql.functions import * > from pyspark.sql.types import * > def flatten_test(df, sep="_"): > """Returns a flattened dataframe. > .. versionadded:: x.X.X > Parameters > ---------- > sep : str > Delimiter for flatted columns. Default `_` > Notes > ----- > Don`t use `.` as `sep` > It won't work on nested data frames with more than one level. > And you will have to use `columns.name`. > Flattening Map Types will have to find every key in the column. > This can be slow. > Examples > -------- > data_mixed = [ > { > "state": "Florida", > "shortname": "FL", > "info": {"governor": "Rick Scott"}, > "counties": [ > {"name": "Dade", "population": 12345}, > {"name": "Broward", "population": 40000}, > {"name": "Palm Beach", "population": 60000}, > ], > }, > { > "state": "Ohio", > "shortname": "OH", > "info": {"governor": "John Kasich"}, > "counties": [ > {"name": "Summit", "population": 1234}, > {"name": "Cuyahoga", "population": 1337}, > ], > }, > ] > data_mixed = spark.createDataFrame(data=data_mixed) > data_mixed.printSchema() > root > |-- counties: array (nullable = true) > | |-- element: map (containsNull = true) > | | |-- key: string > | | |-- value: string (valueContainsNull = true) > |-- info: map (nullable = true) > | |-- key: string > | |-- value: string (valueContainsNull = true) > |-- shortname: string (nullable = true) > |-- state: string (nullable = true) > data_mixed_flat = flatten_test(df, sep=":") > data_mixed_flat.printSchema() > root > |-- shortname: string (nullable = true) > |-- state: string (nullable = true) > |-- counties:name: string (nullable = true) > |-- counties:population: string (nullable = true) > |-- info:governor: string (nullable = true) > data = [ > { > "id": 1, > "name": "Cole Volk", > "fitness": {"height": 130, "weight": 60}, > }, > {"name": "Mark Reg", "fitness": {"height": 130, "weight": 60}}, > { > "id": 2, > "name": "Faye Raker", > "fitness": {"height": 130, "weight": 60}, > }, > ] > df = spark.createDataFrame(data=data) > df.printSchema() > root > |-- fitness: map (nullable = true) > | |-- key: string > | |-- value: long (valueContainsNull = true) > |-- id: long (nullable = true) > |-- name: string (nullable = true) > df_flat = flatten_test(df, sep=":") > df_flat.printSchema() > root > |-- id: long (nullable = true) > |-- name: string (nullable = true) > |-- fitness:height: long (nullable = true) > |-- fitness:weight: long (nullable = true) > data_struct = [ > (("James",None,"Smith"),"OH","M"), > (("Anna","Rose",""),"NY","F"), > (("Julia","","Williams"),"OH","F"), > (("Maria","Anne","Jones"),"NY","M"), > (("Jen","Mary","Brown"),"NY","M"), > (("Mike","Mary","Williams"),"OH","M") > ] > schema = StructType([ > StructField('name', StructType([ > StructField('firstname', StringType(), True), > StructField('middlename', StringType(), True), > StructField('lastname', StringType(), True) > ])), > StructField('state', StringType(), True), > StructField('gender', StringType(), True) > ]) > df_struct = spark.createDataFrame(data = data_struct, schema = schema) > df_struct.printSchema() > root > |-- name: struct (nullable = true) > | |-- firstname: string (nullable = true) > | |-- middlename: string (nullable = true) > | |-- lastname: string (nullable = true) > |-- state: string (nullable = true) > |-- gender: string (nullable = true) > df_struct_flat = flatten_test(df_struct, sep=":") > df_struct_flat.printSchema() > root > |-- state: string (nullable = true) > |-- gender: string (nullable = true) > |-- name:firstname: string (nullable = true) > |-- name:middlename: string (nullable = true) > |-- name:lastname: string (nullable = true) > """ > # compute Complex Fields (Arrays, Structs and Maptypes) in Schema > complex_fields = dict( > [ > (field.name, field.dataType) > for field in df.schema.fields > if type(field.dataType) == ArrayType > or type(field.dataType) == StructType > or type(field.dataType) == MapType > ] > ) > while len(complex_fields) != 0: > col_name = list(complex_fields.keys())[0] > # print ("Processing :"+col_name+" Type : > "+str(type(complex_fields[col_name]))) > # if StructType then convert all sub element to columns. > # i.e. flatten structs > if type(complex_fields[col_name]) == StructType: > expanded = [ > col(col_name + "." + k).alias(col_name + sep + k) > for k in [n.name for n in complex_fields[col_name]] > ] > df = df.select("*", *expanded).drop(col_name) > # if ArrayType then add the Array Elements as Rows using the explode > function > # i.e. explode Arrays > elif type(complex_fields[col_name]) == ArrayType: > df = df.withColumn(col_name, explode_outer(col_name)) > # if MapType then convert all sub element to columns. > # i.e. flatten > elif type(complex_fields[col_name]) == MapType: > keys_df = > df.select(explode_outer(map_keys(col(col_name)))).distinct() > keys = list(map(lambda row: row[0], keys_df.collect())) > key_cols = list( > map( > lambda f: col(col_name).getItem(f).alias(str(col_name + > sep + f)), > keys, > ) > ) > drop_column_list = [col_name] > df = df.select( > [ > col_name > for col_name in df.columns > if col_name not in drop_column_list > ] > + key_cols > ) > # recompute remaining Complex Fields in Schema > complex_fields = dict( > [ > (field.name, field.dataType) > for field in df.schema.fields > if type(field.dataType) == ArrayType > or type(field.dataType) == StructType > or type(field.dataType) == MapType > ] > ) > return df > {code} > {code:java} > # We take a dataframe and return a new one with required changes > def cleanDataFrame(df: DataFrame) -> DataFrame: > # Returns a new sanitized field name (this function can be anything > really) > def sanitizeFieldName(s: str) -> str: > return ( > s.replace("-", "_") > .replace("&", "_") > .replace('"', "_") > .replace("[", "_") > .replace("]", "_") > .replace(".", "_") > ) > # We call this on all fields to create a copy and to perform any changes > we might > # want to do to the field. > def sanitizeField(field: StructField) -> StructField: > field = copy(field) > field.name = sanitizeFieldName(field.name) > # We recursively call cleanSchema on all types > field.dataType = cleanSchema(field.dataType) > return field > def cleanSchema(dataType: [DataType]) -> [DateType]: > dataType = copy(dataType) > # If the type is a StructType we need to recurse otherwise we can > return since > # we've reached the leaf node > if isinstance(dataType, StructType): > # We call our sanitizer for all top level fields > dataType.fields = [sanitizeField(f) for f in dataType.fields] > elif isinstance(dataType, ArrayType): > dataType.elementType = cleanSchema(dataType.elementType) > return dataType > # Now since we have the new schema we can create a new DataFrame by using > the old Frame's RDD as data and the new schema as the schema for the data > return spark.createDataFrame(df.rdd, cleanSchema(df.schema)) > {code} > {code:java} > def json_to_norm_with_null(dir_path, path_to_save): > path = dir_path > for filename in os.listdir(path): > if not filename.endswith(".json"): > continue > fullname = os.path.join(path, filename) > with open(fullname) as json_file: > jsonstr = json.load(json_file) > df = spark.read.json(fullname) > df = cleanDataFrame(df) > df = flatten_test(df, sep=":") > df.write.mode("append").option("ignoreNullFields", > "false").json(path_to_save) > {code} > def cleanDataFrame is taken from a post at stackoverflow > def flatten_test the first 2 parts for array and struct are taken from [gnist > github|https://gist.github.com/nmukerje/e65cde41be85470e4b8dfd9a2d6aed50] > The exemples in def flatten_test are taken from pandas json_normalize > function. > There is one problem with def flatten_test. It needs to load one and one > dataframe if the schema is different. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org