I have 400k of JSON files. Which is between 10 kb and 500 kb in size.
They don`t have the same schema, so I have to loop over them one at a time.
This works, but is`s very slow. This process takes 5 days!
So now I have tried to run this functions in a ThreadPool. But it don`t
seems to work.
*Start local spark. The system have 16 cores and 64 GB.*
number_cores = int(multiprocessing.cpu_count())
mem_bytes = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES') #
e.g. 4015976448
memory_gb = int(mem_bytes/(1024.**3)) # e.g. 3.74
def get_spark_session(app_name: str, conf: SparkConf):
conf.setMaster('local[{}]'.format(number_cores))
conf \
.set('spark.driver.memory', '{}g'.format(memory_gb)) \
.set("spark.sql.repl.eagerEval.enabled", "True") \
.set("spark.sql.adaptive.enabled", "True") \
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer") \
.set("spark.sql.repl.eagerEval.maxNumRows", "10000")
return
SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate()
spark = get_spark_session("Falk", SparkConf())
*Function to rename columns with \\ *
# We take a dataframe and return a new one with required changes
def cleanDataFrame(df: DataFrame) -> DataFrame:
# Returns a new sanitized field name (this function can be anything
really)
def sanitizeFieldName(s: str) -> str:
return s.replace("-", "_").replace("&", "_").replace("\"", "_")\
.replace("[", "_").replace("]", "_").replace(".", "_")
# We call this on all fields to create a copy and to perform any
changes we might
# want to do to the field.
def sanitizeField(field: StructField) -> StructField:
field = copy(field)
field.name = sanitizeFieldName(field.name)
# We recursively call cleanSchema on all types
field.dataType = cleanSchema(field.dataType)
return field
def cleanSchema(dataType: [DataType]) -> [DateType]:
dataType = copy(dataType)
# If the type is a StructType we need to recurse otherwise we can
return since
# we've reached the leaf node
if isinstance(dataType, StructType):
# We call our sanitizer for all top level fields
dataType.fields = [sanitizeField(f) for f in dataType.fields]
elif isinstance(dataType, ArrayType):
dataType.elementType = cleanSchema(dataType.elementType)
return dataType
# Now since we have the new schema we can create a new DataFrame by
using the old Frame's RDD as data and the new schema as the schema for the
data
return spark.createDataFrame(df.rdd, cleanSchema(df.schema))
*Function to flatten out a nested dataframe.*
from pyspark.sql.types import *
from pyspark.sql.functions import *
def flatten_test(df, sep="_"):
"""Returns a flattened dataframe.
.. versionadded:: x.X.X
Parameters
----------
sep : str
Delimiter for flatted columns. Default `_`
Notes
-----
Don`t use `.` as `sep`
It won't work on nested data frames with more than one level.
And you will have to use `columns.name`.
Flattening Map Types will have to find every key in the column.
This can be slow.
Examples
--------
data_mixed = [
{
"state": "Florida",
"shortname": "FL",
"info": {"governor": "Rick Scott"},
"counties": [
{"name": "Dade", "population": 12345},
{"name": "Broward", "population": 40000},
{"name": "Palm Beach", "population": 60000},
],
},
{
"state": "Ohio",
"shortname": "OH",
"info": {"governor": "John Kasich"},
"counties": [
{"name": "Summit", "population": 1234},
{"name": "Cuyahoga", "population": 1337},
],
},
]
data_mixed = spark.createDataFrame(data=data_mixed)
data_mixed.printSchema()
root
|-- counties: array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: string
| | |-- value: string (valueContainsNull = true)
|-- info: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- shortname: string (nullable = true)
|-- state: string (nullable = true)
data_mixed_flat = flatten_test(df, sep=":")
data_mixed_flat.printSchema()
root
|-- shortname: string (nullable = true)
|-- state: string (nullable = true)
|-- counties:name: string (nullable = true)
|-- counties:population: string (nullable = true)
|-- info:governor: string (nullable = true)
data = [
{
"id": 1,
"name": "Cole Volk",
"fitness": {"height": 130, "weight": 60},
},
{"name": "Mark Reg", "fitness": {"height": 130, "weight": 60}},
{
"id": 2,
"name": "Faye Raker",
"fitness": {"height": 130, "weight": 60},
},
]
df = spark.createDataFrame(data=data)
df.printSchema()
root
|-- fitness: map (nullable = true)
| |-- key: string
| |-- value: long (valueContainsNull = true)
|-- id: long (nullable = true)
|-- name: string (nullable = true)
df_flat = flatten_test(df, sep=":")
df_flat.printSchema()
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- fitness:height: long (nullable = true)
|-- fitness:weight: long (nullable = true)
data_struct = [
(("James",None,"Smith"),"OH","M"),
(("Anna","Rose",""),"NY","F"),
(("Julia","","Williams"),"OH","F"),
(("Maria","Anne","Jones"),"NY","M"),
(("Jen","Mary","Brown"),"NY","M"),
(("Mike","Mary","Williams"),"OH","M")
]
schema = StructType([
StructField('name', StructType([
StructField('firstname', StringType(), True),
StructField('middlename', StringType(), True),
StructField('lastname', StringType(), True)
])),
StructField('state', StringType(), True),
StructField('gender', StringType(), True)
])
df_struct = spark.createDataFrame(data = data_struct, schema =
schema)
df_struct.printSchema()
root
|-- name: struct (nullable = true)
| |-- firstname: string (nullable = true)
| |-- middlename: string (nullable = true)
| |-- lastname: string (nullable = true)
|-- state: string (nullable = true)
|-- gender: string (nullable = true)
df_struct_flat = flatten_test(df_struct, sep=":")
df_struct_flat.printSchema()
root
|-- state: string (nullable = true)
|-- gender: string (nullable = true)
|-- name:firstname: string (nullable = true)
|-- name:middlename: string (nullable = true)
|-- name:lastname: string (nullable = true)
"""
# compute Complex Fields (Arrays, Structs and Maptypes) in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType
or type(field.dataType) == StructType
or type(field.dataType) == MapType])
while len(complex_fields) !=0:
col_name = list(complex_fields.keys())[0]
#print ("Processing :"+col_name+" Type :
"+str(type(complex_fields[col_name])))
# if StructType then convert all sub element to columns.
# i.e. flatten structs
if (type(complex_fields[col_name]) == StructType):
expanded = [col(col_name + '.' + k).alias(col_name + sep + k)
for k in [n.name for n in complex_fields[col_name]]]
df = df.select("*", *expanded).drop(col_name)
# if ArrayType then add the Array Elements as Rows using the
explode function
# i.e. explode Arrays
elif (type(complex_fields[col_name]) == ArrayType):
df = df.withColumn(col_name, explode_outer(col_name))
# if MapType then convert all sub element to columns.
# i.e. flatten
elif (type(complex_fields[col_name]) == MapType):
keys_df =
df.select(explode_outer(map_keys(col(col_name)))).distinct()
keys = list(map(lambda row: row[0], keys_df.collect()))
key_cols = list(map(lambda f: col(col_name).getItem(f)
.alias(str(col_name + sep + f)), keys))
drop_column_list = [col_name]
df = df.select([col_name for col_name in df.columns
if col_name not in drop_column_list] + key_cols)
# recompute remaining Complex Fields in Schema
complex_fields = dict([(field.name, field.dataType)
for field in df.schema.fields
if type(field.dataType) == ArrayType
or type(field.dataType) == StructType
or type(field.dataType) == MapType])
return df
*Function to read each file, and apply the functions and save each file as
JSON.*
def json_to_norm_with_null(dir_path, path_to_save):
path = dir_path
for filename in os.listdir(path):
if not filename.endswith('._stript_list.json'):
continue
fullname = os.path.join(path, filename)
with open(fullname) as json_file:
jsonstr = json.load(json_file)
df = spark.read.json(fullname)
df = cleanDataFrame(df)
df = flatten_test(df, sep=":")
df.write.mode('append').option('compression',
'snappy').option("ignoreNullFields", "false").json(path_to_save)
*Function to start everything of. With hopefully 10 processes.*
from multiprocessing.pool import ThreadPool
tpool = ThreadPool(processes=10)
tpool.map(json_to_norm_with_null("/home/jovyan/notebooks/falk/data/form_version/F02",
'/home/jovyan/notebooks/falk/F02.json'))
--
Bjørn Jørgensen
Vestre Aspehaug 4, 6010 Ålesund
Norge
+47 480 94 297