Hi, I have a streaming job (Spark 2.4.4) in which the memory usage keeps increasing over time.
Periodically (20-25) mins the executors fall over (org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 6987) due to out of memory. In the UI, I can see that the memory keeps increasing batch by batch, although I do not keep more data in memory (I keep unpersisting, checkpointing and caching new data frames though), the storage tabs shows only the expected 4 objects overtime. I wish I just missed a parameter in the spark configuration (like garbage collection, reference tracking, etc) that would solve my issue. I have seen a few JIRA tickets around memory leak (SPARK-19644 <https://issues.apache.org/jira/browse/SPARK-19644>, SPARK-29055 <https://issues.apache.org/jira/browse/SPARK-29055>, SPARK-29321 <https://issues.apache.org/jira/browse/SPARK-29321>) it might be the same issue? ("spark.cleaner.referenceTracking.cleanCheckpoints", "true"), ('spark.cleaner.periodicGC.interval', '1min'), ('spark.cleaner.referenceTracking','true'), ('spark.cleaner.referenceTracking.blocking.shuffle','true'), ('spark.sql.streaming.minBatchesToRetain', '2'), ('spark.sql.streaming.maxBatchesToRetainInMemory', '5'), ('spark.ui.retainedJobs','50' ), ('spark.ui.retainedStages','50'), ('spark.ui.retainedTasks','500'), ('spark.worker.ui.retainedExecutors','50'), ('spark.worker.ui.retainedDrivers','50'), ('spark.sql.ui.retainedExecutions','50'), ('spark.streaming.ui.retainedBatches','1440'), ('spark.executor.JavaOptions','-XX:+UseG1GC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps') I've tried lowering the spark.streaming.ui.retainedBatches to 8, did not help. The application works fine apart from the fact that the processing some batches take longer (when the executors fall over). [image: image.png] [image: image.png] Any ideas? I've attached my code. Thanks, Andras
import sys from datetime import datetime, timedelta import numpy as np import time from pathlib import Path import json from subprocess import PIPE, run import pandas as pd import pyarrow import gc from pyspark import SparkContext from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.streaming.kafka import KafkaUtils from pyspark.streaming import StreamingContext from pyspark.sql.session import SparkSession from pyspark.sql.types import ArrayType, IntegerType, DoubleType, StringType, BooleanType from pyspark.sql.functions import col, row_number, udf, collect_list, udf, array, struct, lit, log, exp from pyspark.sql import DataFrame, Window code_version = "03" environment = "PREPRD" def_config = spark.sparkContext._conf.getAll() conf = spark.sparkContext._conf.setAll( [('spark.app.name', f'application_Streaming_{environment}_v{code_version}'), ('spark.executor.memory', '3500M'), ('spark.executor.cores', '2'), ('spark.cores.max', '4'), ('spark.driver.memory', '8g'), ('archives', 'hdfs://node-master:9000/data/application/deployment/env/application_scoring_v1.tar.gz#application_scoring_v1'), ("spark.cleaner.referenceTracking.cleanCheckpoints", "true"), ('spark.cleaner.periodicGC.interval', '1min'), ('spark.cleaner.referenceTracking','true'), ('spark.cleaner.referenceTracking.blocking.shuffle','true'), ('spark.sql.streaming.minBatchesToRetain', '2'), ('spark.sql.streaming.maxBatchesToRetainInMemory', '5'), ('spark.ui.retainedJobs','50' ), ('spark.ui.retainedStages','50'), ('spark.ui.retainedTasks','500'), ('spark.worker.ui.retainedExecutors','50'), ('spark.worker.ui.retainedDrivers','50'), ('spark.sql.ui.retainedExecutions','50'), ('spark.streaming.ui.retainedBatches','1440'), ('spark.executor.JavaOptions','-XX:+UseG1GC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps') ]) spark.sparkContext.stop() os.environ['LOG_DIRS'] = '/application/logs/application_logs' spark = SparkSession \ .builder \ .config(conf=conf) \ .getOrCreate() sc = spark.sparkContext spark.conf.set("spark.sql.execution.arrow.enabled", "false") spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "true") spark.conf.set("spark.streaming.stopGracefullyOnShutdown", "true") sc.addFile('hdfs:///data/application/deployment/shared_utils.tar') sc.addPyFile('hdfs:///data/application/deployment/application_payload.py') sc.addPyFile('hdfs:///data/application/deployment/spark_logging.py') sc.setLogLevel("ERROR") from hdfs import InsecureClient from application_payload import get_channel_game_info, get_application_output, get_predictions_pure from shared_utils.KafkaUtils.kafka_utils import KafkaConnect import spark_logging as logging import spark_misc as sm logger = logging.getLogger("driver", logfile=f"{sc._jsc.sc().applicationId()}.log") init_handler = logging.StreamHandler() init_handler.setLevel(logging.DEBUG) logger.addHandler(init_handler) logger.setLevel("DEBUG") def getSparkSessionInstance(sparkConf): if ("sparkSessionSingletonInstance" not in globals()): globals()["sparkSessionSingletonInstance"] = SparkSession \ .builder \ .config(conf=sparkConf) \ .getOrCreate() return globals()["sparkSessionSingletonInstance"] def application_streaming_distributed(batch_local, lines): global logger, executor_count, stream_data_output_folder, stream_data_input_folder, save_stream_input global hdfs_nn, hdfs_user, kafka_hosts, topic_produce global dwh_base_folder, working_directory, latest_dwh_folder logger.debug(f"batch init") try: in_sc = lines.context spark = getSparkSessionInstance(in_sc.getConf()) batch_utc = sm.get_batch_utc(dt=batch_local) batch_date = batch_utc.strftime("%Y-%m-%d") process_utc = datetime.utcfromtimestamp(time.time()) output_path = stream_data_output_folder output_partition = batch_utc.strftime("%Y_%m_%d_%H_%M_%S") logger.debug(f"check if model refreshed") in_hdfs_client = InsecureClient(hdfs_nn, hdfs_user) run_str = f"LOCAL {str(batch_local)}, UTC {batch_utc.strftime('%Y-%m-%d %H:%M:%S')}, Process {process_utc.strftime('%Y-%m-%d %H:%M:%S')}" logger.info(f"========= BATCH STARTED. {run_str}. =========") event1 = lines.filter(lambda x: 'event1' in x) event2 = lines.filter(lambda x: 'event2' in x) event3 = lines.filter(lambda x: 'event3' in x) event4 = lines.filter(lambda x: 'event4' in x) event1_df = spark.read.json(event1).repartition(executor_count).cache() event2_df = spark.read.json(event2).repartition(executor_count).cache() event3_df = spark.read.json(event3).repartition(executor_count).cache() event4_df = spark.read.json(event4).repartition(executor_count).cache() logger.info( f"========= BATCH INFO: event1: {event1_df.count()}, event2: {event2_df.count()}, event3: {event3_df.count()}, event4: {event4_df.count()}") incoming_events = {'event1': event1_df, 'event2': event2_df, 'event3': event3_df, 'event4': event4_df } if save_stream_input: for event_name, event_df in incoming_events.items(): if event_df.count() > 0: stream_output_location = f"{stream_data_input_folder}/{event_name}/{batch_date}/{output_partition}" sm.save_event_data(event_df=event_df, event_name=event_name, stream_output_location=stream_output_location, executor_count=executor_count) # Get Global Variables item_mapping_df = globals()['item_mapping_df'] user_item_agg = globals()['user_item_agg'] user_mapping_df = globals()['user_mapping_df'] currency_exchange = globals()['currency_exchange'] user_sparse_update = False kafka_update = False user_mapping_update = False event1_df_party = event1_df.filter((event1_df.brandId == 'PARTY') | (event1_df.brandId == 'PARTYPOKER')) if event1_df_party.count() > 0: ################## event1########################### logger.debug(f"Processing event1....") event1_agg_df_item_user, new_user_mapping_df = sm.calculate_metric(event1_df=event1_df_party, user_mapping_df=user_mapping_df, item_mapping_df=item_mapping_df, currency_exchange=currency_exchange) " returned cached event1_agg_df_item_user" if isinstance(new_user_mapping_df, DataFrame): logger.debug(f"New User overwritten.") user_mapping_df = new_user_mapping_df.repartition(executor_count).checkpoint() globals()['user_mapping_df'].unpersist() globals()['user_mapping_df'] = user_mapping_df globals()['user_mapping_df'].cache() new_user_mapping_df.unpersist() ################## UPDATE MAIN USER-ITEM ########################### logger.debug(f"No of rows for aggevent4ated event1: {event1_agg_df_item_user.count()}") new_user_item_agg = sm.union_user_item(old_user_item=user_item_agg, new_user_item=event1_agg_df_item_user) new_user_item_agg = new_user_item_agg.repartition(executor_count).checkpoint() logger.debug("Checkpoint written") new_user_item_agg.cache() new_user_item_agg.count() event1_agg_df_item_user.unpersist() logger.debug("new_user_item_agg cached") user_sparse_update = True # Active accounts check for updates ############################################################################################################################# gc = sm.df_safe_rename_cols(event2_df, ["accountName"], ["event3_name_txt"]).dropDuplicates() log = sm.df_safe_rename_cols(event3_df, ["accountName"], ["event3_name_txt"]).dropDuplicates() event4 = sm.df_safe_rename_cols(event4_df, ["accountName"], ["event3_name_txt"]).dropDuplicates() gc = gc.select("event3_name_txt") log = log.select("event3_name_txt") event4 = event4.select("event3_name_txt") active_accounts = gc.union(log).union(event4).dropDuplicates() active_accounts = active_accounts.join(user_mapping_df, active_accounts.event3_name_txt == user_mapping_df.event3_name_txt, 'inner') \ .select(active_accounts["event3_name_txt"], user_mapping_df["user_idx"]).repartition(executor_count).cache() active_accounts_cnt = active_accounts.count() if active_accounts_cnt > 0: logger.info(f"Active accounts {active_accounts_cnt}'") user_item_agg_collect = user_item_agg \ .join(active_accounts, user_item_agg.user_idx == active_accounts.user_idx, 'inner') \ .select(user_item_agg["*"]) \ .groupBy('user_idx').agg(collect_list(array("item_idx", "metric")).alias('item_ci')) \ .repartition(executor_count) user_item_agg_collect.cache() c = user_item_agg_collect.count() if c > 0: logger.debug(f"Processing {c} to check their predictions.. ") user_preference_udf = globals()['user_preference_udf'] if user_sparse_update: new_user_item_agg_collect = new_user_item_agg \ .join(active_accounts, new_user_item_agg.user_idx == active_accounts.user_idx, 'inner') \ .select(new_user_item_agg["*"]) \ .groupBy('user_idx').agg(collect_list(array("item_idx", "metric")).alias('item_ci')) \ .repartition(executor_count) new_predictions = new_user_item_agg_collect \ .select("user_idx", "item_ci", user_preference_udf("item_ci").alias("item_model")) updated_accounts = new_predictions \ .select(new_predictions['user_idx'], new_predictions['item_ci'], new_predictions["item_model"]) else: # should not happen in real scenario - we should always have some event1 updated_accounts = user_item_agg_collect \ .select("user_idx", "item_ci", user_preference_udf("item_ci").alias("item_model")) \ .repartition(executor_count) updated_accounts.cache() kafka_update_cnt = updated_accounts.count() kafka_update = kafka_update_cnt > 0 # If there is actual updates, run it through if kafka_update: logger.info(f"Predictions have changed for {kafka_update_cnt} users") application_output_udf = globals()['application_output_udf'] application_output_df = updated_accounts \ .join(user_mapping_df, updated_accounts.user_idx == user_mapping_df.user_idx, 'inner') \ .select(updated_accounts["*"], user_mapping_df["event3_name_txt"]) application_output_df = application_output_df.withColumn("cas_output", application_output_udf(col('event3_name_txt'), col('item_model'))) # LOCAL Kafka Produce cas_stream_out = [row['cas_output'] for row in application_output_df.select('cas_output').collect()] logger.info(f"Producing to kafka in the driver. Number of messages {len(cas_stream_out)}") if len(cas_stream_out) > 0: ku = KafkaConnect(kafka_hosts=kafka_hosts, logger=logger) ku.connect_kafka_producer(json_string=True) ku.publish_data_to_kafka(topic=topic_produce, data=cas_stream_out) else: logger.info(f"No kafka messages to produce") else: logger.info(f"No accounts to update") if (user_sparse_update + kafka_update + user_mapping_update) > 0: info_msg = "" if user_sparse_update: event1_agg_df_item_user.unpersist() t = time.time() user_item_agg.unpersist() globals()['user_item_agg'] = new_user_item_agg m = f"user_item_agg overwritten {round(time.time() - t, 2)}" info_msg += m if kafka_update: t = time.time() application_output_df \ .write \ .option("header", "true") \ .mode("overwrite") \ .parquet(f'{output_path}/application_output_df/{batch_date}/{output_partition}') m = f"Saved application_to_platform {round(time.time() - t, 2)}" info_msg += ", " + m logger.info(m) else: logger.info(f"No update in this batch at all") um = user_mapping_df.count() um2 = user_mapping_df.agg({'user_idx': 'max'}).collect() ui = globals()['user_item_agg'].count() ui2 = globals()['user_item_agg'].agg({'metric': 'sum'}).collect() im = item_mapping_df.agg({'item_idx': 'max'}).collect() logger.info(f"user_mapping_df ({um}, {um2}), user_item_agg ({ui}), {ui2}, item_mapping_df ({im})") except Exception: if (time.time() - globals()['last_error']) > 60*15: # email error every 15 mins only globals()['last_error'] = time.time() cfg = globals()['mail_log_info'] logging.add_mail_handler(logger, **cfg) logger.info("handler added") else: logger.info("handler not added") logger.error("========= BATCH ERROR. {run_str}. =========", exc_info=True) logging.remove_mail_handler(logger) raise finally: vars_ = ['event2_df', 'event3_df', 'event4_df', 'event1_df', 'event1_agg_df_item_user', 'user_item_agg_collect', 'updated_accounts', 'active_accounts'] for var in vars_: try: locals()[var].unpersist(blocking=True) logger.debug(f"Unpersisted {var} variable") except: pass logger.warning(f"Could not upersist {var} variable", exc_info=True) cas_stream_out = None cas_stream_out = None in_hdfs_client = None user_item_agg_collect = None updated_accounts = None gc.collect() logger.info(f"========= BATCH FINISHED. {run_str}. =========") def set_udfs(): """ Setting UDFS - which needs to be repeated each time the broadcast variables change due to serialization """ from application_payload import get_application_output, get_predictions_pure def user_preference_func(ls_user_items): """ Return predictions and user factors """ bc_item_factors = item_factors.value bc_event4ularization = event4ularization.value bc_YtY = YtY.value bc_factors = factors.value ls_preference_items = [] for user_items in ls_user_items: preference_items, uf = get_predictions_pure(user_features={'user_items': list(user_items)}, item_factors=bc_item_factors, event4ularization=bc_event4ularization, YtY=bc_YtY, n_factors=bc_factors, with_scores=True) item_idxs, attr = zip(*preference_items) scores, played_negative_positive = zip(*attr) m = np.array(item_idxs) + 0.1 played_item_idxs_positive = np.multiply(m, played_negative_positive) ls_preference_items.append(list(played_item_idxs_positive)) return pd.Series(ls_preference_items) def application_output_func(ls_event3_name_txt, ls_preference_items): msgs = [] bc_game_info = game_info.value for event3_name_txt, played_item_idxs_positive in zip(ls_event3_name_txt, ls_preference_items): cas_user_msg = get_application_output(event3_name_txt=event3_name_txt, played_item_idxs_positive=played_item_idxs_positive, game_info=bc_game_info, items_to_output=15) msg = json.dumps(cas_user_msg) msgs.append(msg) return pd.Series(msgs) globals()['user_preference_udf'] = pandas_udf(user_preference_func, returnType=ArrayType(DoubleType())) globals()['application_output_udf'] = pandas_udf(application_output_func, returnType=StringType()) ######################################################################################################################## def create_context(): set_udfs() logger.info("Creating new context - not from checkpoint") ssc = StreamingContext(sc, 60) opts = {"metadata.broker.list": globals()['kafka_hosts'], "auto.offset.reset": "largest", "group.id": "application_v1"} kvs = KafkaUtils.createDirectStream(ssc, [topic_listen], opts) kvs.checkpoint(120) lines = kvs.map(lambda row: row[1]) lines.foreachRDD(application_streaming_distributed) ssc.checkpoint(checkpoint) return ssc ################################################################################################## run_type = f"{environment}_v{code_version}" checkpoint = f"hdfs:///data/application/data/checkpoints/checkpoint_{run_type}" executor_count = len(spark.sparkContext._jsc.sc().statusTracker().getExecutorInfos()) - 1 cores_per_executor = int(spark.sparkContext.getConf().get('spark.executor.cores', '1')) total_cores = executor_count * cores_per_executor topic_listen = "platform.to.application.destination" mail_log_info = {'MAILHOST': "smtp", 'FROM': 'application-al...@domain.com', 'TO': ['exam...@domain.com'], 'SUBJECT': f'application Error Alert - {run_type}'} last_error = 0 if environment == "TST": topic_produce = 'casino.application.to.platform.destination' kafka_hosts = "kafka1:9092,kafka2:9092,kafka3:9092" hdfs_nn = "http://1.1.1.1:50070" hdfs_user = "hdfs_user" save_stream_input = False elif environment == "PREPRD": topic_produce = 'test.application.to.platform.destination' kafka_hosts = "kafka1:9092,kafka2:9092,kafka3:9092" hdfs_nn = "http://1.1.1.1:50070" hdfs_user = "hdfs_user" save_stream_input = True elif environment == "PRD": topic_produce = 'casino.application.to.platform.destination' kafka_hosts = "kafka1:9092,kafka2:9092,kafka3:9092" hdfs_nn = "http://1.1.1.1:50070" hdfs_user = "hdfs_user" save_stream_input = True else: raise ValueError("Wrong Environment given") data_folder = '/data/application/data/' dwh_base_folder = data_folder + 'batch/' stream_data_input_folder = data_folder + f"stream/input/{run_type}/" stream_data_output_folder = data_folder + f"stream/output/{run_type}/" working_directory = data_folder + f"WORKDIR/{run_type}/" hdfs_client = InsecureClient(hdfs_nn, hdfs_user) latest_folder = sm.get_latest_success_hdfs(client=hdfs_client, root_folder=dwh_base_folder) latest_dt = datetime.utcfromtimestamp((int(latest_folder) / 10 ** 6)) latest_dwh_folder = f"{dwh_base_folder}/{latest_folder}/" sm.clear_folder(path=working_directory + "*") logger.info(f"clearing prev working directory {working_directory} (HDFS)") sm.mkdir_if_not_exists(path=working_directory) sm.copy_within_hdfs(frompath=latest_dwh_folder + "*", topath=working_directory) logger.info(f"copied files from {latest_dwh_folder} to {working_directory} (HDFS)") logger.info("data init start") user_mapping_df, item_mapping_df, user_item_agg, game_info_df, model_parameters, currency_exchange = \ sm.get_spark_dataframes(spark, working_directory, executor_count, run_type) item_factors, YtY, event4ularization, factors, game_info = \ sm.get_and_broadcast_vars(sc, model_parameters, game_info_df) logger.info("data init end") clear_checkpoint = True logger.handlers = [h for h in logger.handlers if isinstance(h, logging.FileHandler)] # delete console logging clear_checkpoint = True if clear_checkpoint: hdfs_client = InsecureClient(hdfs_nn, user=hdfs_user) hdfs_client.delete(checkpoint.replace("hdfs://", ""), recursive=True) logger.info(f"checkpoint ({checkpoint.replace('hdfs://', '')}) cleared") ssc = StreamingContext.getOrCreate(checkpoint, lambda: create_context()) ssc.start() # ssc.awaitTermination()
--------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org