Thank you. The reason for using spark local is to test the code, and as in this case I find the bottlenecks and fix them before I spinn up a K8S cluster.
I did test it now with 16 cores and 10 files import time tic = time.perf_counter() json_to_norm_with_null("/home/jovyan/notebooks/falk/test", '/home/jovyan/notebooks/falk/test/test.json') toc = time.perf_counter() print(f"Func run in {toc - tic:0.4f} seconds") Func run in 30.3695 seconds then I stop spark and stat it with setMaster('local[1]') and now Func run in 30.8168 seconds Which means that it don`t matter if I run this code on one core or on a K8S cluster with 100 cores. So I tested the same with from multiprocessing.pool import ThreadPool import multiprocessing as mp if __name__ == "__main__": tic = time.perf_counter() pool = ThreadPool(mp.cpu_count()) opt = pool.map(json_to_norm_with_null("/home/jovyan/notebooks/falk/test", '/home/jovyan/notebooks/falk/test/test.json')) toc = time.perf_counter() print(f"Func run in {toc - tic:0.4f} seconds") I get the same files and they are ok. But I also get this error TypeError Traceback (most recent call last) Input In [33], in <cell line: 5>() 6 tic = time.perf_counter() 7 pool = ThreadPool(mp.cpu_count())----> 8 opt = pool.map(json_to_norm_with_null("/home/jovyan/notebooks/falk/test", '/home/jovyan/notebooks/falk/test/test.json')) 9 toc = time.perf_counter() 10 print(f"Func run in {toc - tic:0.4f} seconds") TypeError: Pool.map() missing 1 required positional argument: 'iterable' So any hints on what to change? :) Spark has the pandas on spark API, and that is realy great. I prefer pandas on spark API and pyspark over pandas. tor. 21. jul. 2022 kl. 09:18 skrev Khalid Mammadov < khalidmammad...@gmail.com>: > One quick observation is that you allocate all your local CPUs to Spark > then execute that app with 10 Threads i.e 10 spark apps and so you will > need 160cores in total as each will need 16CPUs IMHO. Wouldn't that create > CPU bottleneck? > > Also on the side note, why you need Spark if you use that on local only? > Sparks power can only be (mainly) observed in a cluster env. > I have achieved great parallelism using pandas and pools on a local > machine in the past. > > > On Wed, 20 Jul 2022, 21:39 Bjørn Jørgensen, <bjornjorgen...@gmail.com> > wrote: > >> I have 400k of JSON files. Which is between 10 kb and 500 kb in size. >> They don`t have the same schema, so I have to loop over them one at a >> time. >> >> This works, but is`s very slow. This process takes 5 days! >> >> So now I have tried to run this functions in a ThreadPool. But it don`t >> seems to work. >> >> >> *Start local spark. The system have 16 cores and 64 GB.* >> >> number_cores = int(multiprocessing.cpu_count()) >> >> mem_bytes = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES') # >> e.g. 4015976448 >> memory_gb = int(mem_bytes/(1024.**3)) # e.g. 3.74 >> >> >> def get_spark_session(app_name: str, conf: SparkConf): >> conf.setMaster('local[{}]'.format(number_cores)) >> conf \ >> .set('spark.driver.memory', '{}g'.format(memory_gb)) \ >> .set("spark.sql.repl.eagerEval.enabled", "True") \ >> .set("spark.sql.adaptive.enabled", "True") \ >> .set("spark.serializer", >> "org.apache.spark.serializer.KryoSerializer") \ >> .set("spark.sql.repl.eagerEval.maxNumRows", "10000") >> >> return >> SparkSession.builder.appName(app_name).config(conf=conf).getOrCreate() >> >> spark = get_spark_session("Falk", SparkConf()) >> >> >> *Function to rename columns with \\ * >> >> # We take a dataframe and return a new one with required changes >> def cleanDataFrame(df: DataFrame) -> DataFrame: >> # Returns a new sanitized field name (this function can be anything >> really) >> def sanitizeFieldName(s: str) -> str: >> return s.replace("-", "_").replace("&", "_").replace("\"", "_")\ >> .replace("[", "_").replace("]", "_").replace(".", "_") >> >> # We call this on all fields to create a copy and to perform any >> changes we might >> # want to do to the field. >> def sanitizeField(field: StructField) -> StructField: >> field = copy(field) >> field.name = sanitizeFieldName(field.name) >> # We recursively call cleanSchema on all types >> field.dataType = cleanSchema(field.dataType) >> return field >> >> def cleanSchema(dataType: [DataType]) -> [DateType]: >> dataType = copy(dataType) >> # If the type is a StructType we need to recurse otherwise we can >> return since >> # we've reached the leaf node >> if isinstance(dataType, StructType): >> # We call our sanitizer for all top level fields >> dataType.fields = [sanitizeField(f) for f in dataType.fields] >> elif isinstance(dataType, ArrayType): >> dataType.elementType = cleanSchema(dataType.elementType) >> return dataType >> >> # Now since we have the new schema we can create a new DataFrame by >> using the old Frame's RDD as data and the new schema as the schema for the >> data >> return spark.createDataFrame(df.rdd, cleanSchema(df.schema)) >> >> >> >> *Function to flatten out a nested dataframe.* >> >> >> from pyspark.sql.types import * >> from pyspark.sql.functions import * >> >> >> def flatten_test(df, sep="_"): >> """Returns a flattened dataframe. >> .. versionadded:: x.X.X >> >> Parameters >> ---------- >> sep : str >> Delimiter for flatted columns. Default `_` >> >> Notes >> ----- >> Don`t use `.` as `sep` >> It won't work on nested data frames with more than one level. >> And you will have to use `columns.name`. >> >> Flattening Map Types will have to find every key in the column. >> This can be slow. >> >> Examples >> -------- >> >> data_mixed = [ >> { >> "state": "Florida", >> "shortname": "FL", >> "info": {"governor": "Rick Scott"}, >> "counties": [ >> {"name": "Dade", "population": 12345}, >> {"name": "Broward", "population": 40000}, >> {"name": "Palm Beach", "population": 60000}, >> ], >> }, >> { >> "state": "Ohio", >> "shortname": "OH", >> "info": {"governor": "John Kasich"}, >> "counties": [ >> {"name": "Summit", "population": 1234}, >> {"name": "Cuyahoga", "population": 1337}, >> ], >> }, >> ] >> >> data_mixed = spark.createDataFrame(data=data_mixed) >> >> data_mixed.printSchema() >> >> root >> |-- counties: array (nullable = true) >> | |-- element: map (containsNull = true) >> | | |-- key: string >> | | |-- value: string (valueContainsNull = true) >> |-- info: map (nullable = true) >> | |-- key: string >> | |-- value: string (valueContainsNull = true) >> |-- shortname: string (nullable = true) >> |-- state: string (nullable = true) >> >> >> data_mixed_flat = flatten_test(df, sep=":") >> data_mixed_flat.printSchema() >> root >> |-- shortname: string (nullable = true) >> |-- state: string (nullable = true) >> |-- counties:name: string (nullable = true) >> |-- counties:population: string (nullable = true) >> |-- info:governor: string (nullable = true) >> >> >> >> >> data = [ >> { >> "id": 1, >> "name": "Cole Volk", >> "fitness": {"height": 130, "weight": 60}, >> }, >> {"name": "Mark Reg", "fitness": {"height": 130, "weight": >> 60}}, >> { >> "id": 2, >> "name": "Faye Raker", >> "fitness": {"height": 130, "weight": 60}, >> }, >> ] >> >> >> df = spark.createDataFrame(data=data) >> >> df.printSchema() >> >> root >> |-- fitness: map (nullable = true) >> | |-- key: string >> | |-- value: long (valueContainsNull = true) >> |-- id: long (nullable = true) >> |-- name: string (nullable = true) >> >> df_flat = flatten_test(df, sep=":") >> >> df_flat.printSchema() >> >> root >> |-- id: long (nullable = true) >> |-- name: string (nullable = true) >> |-- fitness:height: long (nullable = true) >> |-- fitness:weight: long (nullable = true) >> >> data_struct = [ >> (("James",None,"Smith"),"OH","M"), >> (("Anna","Rose",""),"NY","F"), >> (("Julia","","Williams"),"OH","F"), >> (("Maria","Anne","Jones"),"NY","M"), >> (("Jen","Mary","Brown"),"NY","M"), >> (("Mike","Mary","Williams"),"OH","M") >> ] >> >> >> schema = StructType([ >> StructField('name', StructType([ >> StructField('firstname', StringType(), True), >> StructField('middlename', StringType(), True), >> StructField('lastname', StringType(), True) >> ])), >> StructField('state', StringType(), True), >> StructField('gender', StringType(), True) >> ]) >> >> df_struct = spark.createDataFrame(data = data_struct, schema = >> schema) >> >> df_struct.printSchema() >> >> root >> |-- name: struct (nullable = true) >> | |-- firstname: string (nullable = true) >> | |-- middlename: string (nullable = true) >> | |-- lastname: string (nullable = true) >> |-- state: string (nullable = true) >> |-- gender: string (nullable = true) >> >> df_struct_flat = flatten_test(df_struct, sep=":") >> >> df_struct_flat.printSchema() >> >> root >> |-- state: string (nullable = true) >> |-- gender: string (nullable = true) >> |-- name:firstname: string (nullable = true) >> |-- name:middlename: string (nullable = true) >> |-- name:lastname: string (nullable = true) >> """ >> # compute Complex Fields (Arrays, Structs and Maptypes) in Schema >> complex_fields = dict([(field.name, field.dataType) >> for field in df.schema.fields >> if type(field.dataType) == ArrayType >> or type(field.dataType) == StructType >> or type(field.dataType) == MapType]) >> >> while len(complex_fields) !=0: >> col_name = list(complex_fields.keys())[0] >> #print ("Processing :"+col_name+" Type : >> "+str(type(complex_fields[col_name]))) >> >> # if StructType then convert all sub element to columns. >> # i.e. flatten structs >> if (type(complex_fields[col_name]) == StructType): >> expanded = [col(col_name + '.' + k).alias(col_name + sep + k) >> for k in [n.name for n in complex_fields[col_name]]] >> df = df.select("*", *expanded).drop(col_name) >> >> # if ArrayType then add the Array Elements as Rows using the >> explode function >> # i.e. explode Arrays >> elif (type(complex_fields[col_name]) == ArrayType): >> df = df.withColumn(col_name, explode_outer(col_name)) >> >> # if MapType then convert all sub element to columns. >> # i.e. flatten >> elif (type(complex_fields[col_name]) == MapType): >> keys_df = >> df.select(explode_outer(map_keys(col(col_name)))).distinct() >> keys = list(map(lambda row: row[0], keys_df.collect())) >> key_cols = list(map(lambda f: col(col_name).getItem(f) >> .alias(str(col_name + sep + f)), keys)) >> drop_column_list = [col_name] >> df = df.select([col_name for col_name in df.columns >> if col_name not in drop_column_list] + key_cols) >> >> # recompute remaining Complex Fields in Schema >> complex_fields = dict([(field.name, field.dataType) >> for field in df.schema.fields >> if type(field.dataType) == ArrayType >> or type(field.dataType) == StructType >> or type(field.dataType) == MapType]) >> >> return df >> >> >> *Function to read each file, and apply the functions and save each file >> as JSON.* >> >> def json_to_norm_with_null(dir_path, path_to_save): >> path = dir_path >> >> for filename in os.listdir(path): >> if not filename.endswith('._stript_list.json'): >> continue >> >> >> fullname = os.path.join(path, filename) >> with open(fullname) as json_file: >> jsonstr = json.load(json_file) >> >> df = spark.read.json(fullname) >> df = cleanDataFrame(df) >> df = flatten_test(df, sep=":") >> df.write.mode('append').option('compression', >> 'snappy').option("ignoreNullFields", "false").json(path_to_save) >> >> >> *Function to start everything of. With hopefully 10 processes.* >> >> from multiprocessing.pool import ThreadPool >> tpool = ThreadPool(processes=10) >> >> tpool.map(json_to_norm_with_null("/home/jovyan/notebooks/falk/data/form_version/F02", >> '/home/jovyan/notebooks/falk/F02.json')) >> >> >> -- >> Bjørn Jørgensen >> Vestre Aspehaug 4, 6010 Ålesund >> Norge >> >> +47 480 94 297 >> > -- Bjørn Jørgensen Vestre Aspehaug 4, 6010 Ålesund Norge +47 480 94 297