Hi,

I have a streaming job (Spark 2.4.4) in which the memory usage keeps
increasing over time.

Periodically (20-25) mins the executors fall over
(org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 6987) due to out of memory. In the UI, I can see that
the memory keeps increasing batch by batch, although I do not keep more
data in memory (I keep unpersisting, checkpointing and caching new data
frames though), the storage tabs shows only the expected 4 objects overtime.

I wish I just missed a parameter in the spark configuration (like garbage
collection, reference tracking, etc) that would solve my issue. I have seen
a few JIRA tickets around memory leak (SPARK-19644
<https://issues.apache.org/jira/browse/SPARK-19644>, SPARK-29055
<https://issues.apache.org/jira/browse/SPARK-29055>, SPARK-29321
<https://issues.apache.org/jira/browse/SPARK-29321>) it might be the same
issue?

     ("spark.cleaner.referenceTracking.cleanCheckpoints", "true"),
     ('spark.cleaner.periodicGC.interval', '1min'),
     ('spark.cleaner.referenceTracking','true'),
     ('spark.cleaner.referenceTracking.blocking.shuffle','true'),
     ('spark.sql.streaming.minBatchesToRetain', '2'),
     ('spark.sql.streaming.maxBatchesToRetainInMemory', '5'),
     ('spark.ui.retainedJobs','50' ),
     ('spark.ui.retainedStages','50'),
     ('spark.ui.retainedTasks','500'),
     ('spark.worker.ui.retainedExecutors','50'),
     ('spark.worker.ui.retainedDrivers','50'),
     ('spark.sql.ui.retainedExecutions','50'),
     ('spark.streaming.ui.retainedBatches','1440'),
     ('spark.executor.JavaOptions','-XX:+UseG1GC -verbose:gc
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps')

I've tried lowering the spark.streaming.ui.retainedBatches to 8, did not
help.

The application works fine apart from the fact that the processing
some batches take longer (when the executors fall over).

[image: image.png]
[image: image.png]


Any ideas?

I've attached my code.


Thanks,
Andras
import sys
from datetime import datetime, timedelta
import numpy as np
import time
from pathlib import Path
import json
from subprocess import PIPE, run
import pandas as pd
import pyarrow
import gc

from pyspark import SparkContext
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import ArrayType, IntegerType, DoubleType, StringType, 
BooleanType
from pyspark.sql.functions import col, row_number, udf, collect_list, udf, 
array, struct, lit, log, exp
from pyspark.sql import DataFrame, Window

code_version = "03"
environment = "PREPRD"

def_config = spark.sparkContext._conf.getAll()
conf = spark.sparkContext._conf.setAll(
    [('spark.app.name', f'application_Streaming_{environment}_v{code_version}'),
     ('spark.executor.memory', '3500M'),
     ('spark.executor.cores', '2'),
     ('spark.cores.max', '4'),
     ('spark.driver.memory', '8g'),
     ('archives', 
'hdfs://node-master:9000/data/application/deployment/env/application_scoring_v1.tar.gz#application_scoring_v1'),
     ("spark.cleaner.referenceTracking.cleanCheckpoints", "true"),
     ('spark.cleaner.periodicGC.interval', '1min'),
     ('spark.cleaner.referenceTracking','true'),
     ('spark.cleaner.referenceTracking.blocking.shuffle','true'),
     ('spark.sql.streaming.minBatchesToRetain', '2'),
     ('spark.sql.streaming.maxBatchesToRetainInMemory', '5'),
     ('spark.ui.retainedJobs','50' ),
     ('spark.ui.retainedStages','50'),
     ('spark.ui.retainedTasks','500'),
     ('spark.worker.ui.retainedExecutors','50'),
     ('spark.worker.ui.retainedDrivers','50'),
     ('spark.sql.ui.retainedExecutions','50'),
     ('spark.streaming.ui.retainedBatches','1440'),
     ('spark.executor.JavaOptions','-XX:+UseG1GC -verbose:gc 
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps')
     ])

spark.sparkContext.stop()

os.environ['LOG_DIRS'] = '/application/logs/application_logs'

spark = SparkSession \
    .builder \
    .config(conf=conf) \
    .getOrCreate()
sc = spark.sparkContext
spark.conf.set("spark.sql.execution.arrow.enabled", "false")
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "true")
spark.conf.set("spark.streaming.stopGracefullyOnShutdown", "true")

sc.addFile('hdfs:///data/application/deployment/shared_utils.tar')
sc.addPyFile('hdfs:///data/application/deployment/application_payload.py')
sc.addPyFile('hdfs:///data/application/deployment/spark_logging.py')
sc.setLogLevel("ERROR")

from hdfs import InsecureClient
from application_payload import get_channel_game_info, get_application_output, 
get_predictions_pure
from shared_utils.KafkaUtils.kafka_utils import KafkaConnect
import spark_logging as logging
import spark_misc as sm

logger = logging.getLogger("driver", 
logfile=f"{sc._jsc.sc().applicationId()}.log")
init_handler = logging.StreamHandler()
init_handler.setLevel(logging.DEBUG)
logger.addHandler(init_handler)
logger.setLevel("DEBUG")


def getSparkSessionInstance(sparkConf):
    if ("sparkSessionSingletonInstance" not in globals()):
        globals()["sparkSessionSingletonInstance"] = SparkSession \
            .builder \
            .config(conf=sparkConf) \
            .getOrCreate()
    return globals()["sparkSessionSingletonInstance"]


def application_streaming_distributed(batch_local, lines):
    global logger, executor_count, stream_data_output_folder, 
stream_data_input_folder, save_stream_input
    global hdfs_nn, hdfs_user, kafka_hosts, topic_produce
    global dwh_base_folder, working_directory, latest_dwh_folder

    logger.debug(f"batch init")

    try:
        in_sc = lines.context
        spark = getSparkSessionInstance(in_sc.getConf())

        batch_utc = sm.get_batch_utc(dt=batch_local)
        batch_date = batch_utc.strftime("%Y-%m-%d")
        process_utc = datetime.utcfromtimestamp(time.time())

        output_path = stream_data_output_folder
        output_partition = batch_utc.strftime("%Y_%m_%d_%H_%M_%S")

        logger.debug(f"check if model refreshed")
        in_hdfs_client = InsecureClient(hdfs_nn, hdfs_user)

        run_str = f"LOCAL {str(batch_local)}, UTC {batch_utc.strftime('%Y-%m-%d 
%H:%M:%S')}, Process {process_utc.strftime('%Y-%m-%d %H:%M:%S')}"

        logger.info(f"========= BATCH STARTED. {run_str}.  =========")
        event1 = lines.filter(lambda x: 'event1' in x)
        event2 = lines.filter(lambda x: 'event2' in x)
        event3 = lines.filter(lambda x: 'event3' in x)
        event4 = lines.filter(lambda x: 'event4' in x)

        event1_df = spark.read.json(event1).repartition(executor_count).cache()
        event2_df = spark.read.json(event2).repartition(executor_count).cache()
        event3_df = spark.read.json(event3).repartition(executor_count).cache()
        event4_df = spark.read.json(event4).repartition(executor_count).cache()

        logger.info(
            f"========= BATCH INFO: event1: {event1_df.count()}, event2: 
{event2_df.count()}, event3: {event3_df.count()}, event4: {event4_df.count()}")

        incoming_events = {'event1': event1_df,
                           'event2': event2_df,
                           'event3': event3_df,
                           'event4': event4_df
                           }

        if save_stream_input:
            for event_name, event_df in incoming_events.items():
                if event_df.count() > 0:
                    stream_output_location = 
f"{stream_data_input_folder}/{event_name}/{batch_date}/{output_partition}"
                    sm.save_event_data(event_df=event_df, event_name=event_name,
                                       
stream_output_location=stream_output_location, executor_count=executor_count)

        # Get Global Variables
        item_mapping_df = globals()['item_mapping_df']
        user_item_agg = globals()['user_item_agg']
        user_mapping_df = globals()['user_mapping_df']
        currency_exchange = globals()['currency_exchange']

        user_sparse_update = False
        kafka_update = False
        user_mapping_update = False

        event1_df_party = event1_df.filter((event1_df.brandId == 'PARTY') | 
(event1_df.brandId == 'PARTYPOKER'))
        if event1_df_party.count() > 0:
            ################## event1###########################
            logger.debug(f"Processing event1....")
            event1_agg_df_item_user, new_user_mapping_df = 
sm.calculate_metric(event1_df=event1_df_party,
                                                                              
user_mapping_df=user_mapping_df,
                                                                              
item_mapping_df=item_mapping_df,
                                                                              
currency_exchange=currency_exchange)
            " returned cached event1_agg_df_item_user"
            if isinstance(new_user_mapping_df, DataFrame):
                logger.debug(f"New User overwritten.")
                user_mapping_df = 
new_user_mapping_df.repartition(executor_count).checkpoint()
                globals()['user_mapping_df'].unpersist()
                globals()['user_mapping_df'] = user_mapping_df
                globals()['user_mapping_df'].cache()
                new_user_mapping_df.unpersist()

            ################## UPDATE MAIN USER-ITEM ###########################
            logger.debug(f"No of rows for aggevent4ated event1: 
{event1_agg_df_item_user.count()}")
            new_user_item_agg = sm.union_user_item(old_user_item=user_item_agg, 
new_user_item=event1_agg_df_item_user)
            new_user_item_agg = 
new_user_item_agg.repartition(executor_count).checkpoint()
            logger.debug("Checkpoint written")
            new_user_item_agg.cache()
            new_user_item_agg.count()
            event1_agg_df_item_user.unpersist()
            logger.debug("new_user_item_agg cached")
            user_sparse_update = True

        # Active accounts check for updates
        
#############################################################################################################################

        gc = sm.df_safe_rename_cols(event2_df, ["accountName"], 
["event3_name_txt"]).dropDuplicates()
        log = sm.df_safe_rename_cols(event3_df, ["accountName"], 
["event3_name_txt"]).dropDuplicates()
        event4 = sm.df_safe_rename_cols(event4_df, ["accountName"], 
["event3_name_txt"]).dropDuplicates()

        gc = gc.select("event3_name_txt")
        log = log.select("event3_name_txt")
        event4 = event4.select("event3_name_txt")

        active_accounts = gc.union(log).union(event4).dropDuplicates()
        active_accounts = active_accounts.join(user_mapping_df,
                                               active_accounts.event3_name_txt 
== user_mapping_df.event3_name_txt,
                                               'inner') \
            .select(active_accounts["event3_name_txt"], 
user_mapping_df["user_idx"]).repartition(executor_count).cache()

        active_accounts_cnt = active_accounts.count()

        if active_accounts_cnt > 0:

            logger.info(f"Active accounts {active_accounts_cnt}'")

            user_item_agg_collect = user_item_agg \
                .join(active_accounts, user_item_agg.user_idx == 
active_accounts.user_idx, 'inner') \
                .select(user_item_agg["*"]) \
                .groupBy('user_idx').agg(collect_list(array("item_idx", 
"metric")).alias('item_ci')) \
                .repartition(executor_count)

            user_item_agg_collect.cache()

            c = user_item_agg_collect.count()
            if c > 0:
                logger.debug(f"Processing {c} to check their predictions.. ")
                user_preference_udf = globals()['user_preference_udf']
                if user_sparse_update:

                    new_user_item_agg_collect = new_user_item_agg \
                        .join(active_accounts, new_user_item_agg.user_idx == 
active_accounts.user_idx, 'inner') \
                        .select(new_user_item_agg["*"]) \
                        .groupBy('user_idx').agg(collect_list(array("item_idx", 
"metric")).alias('item_ci')) \
                        .repartition(executor_count)

                    new_predictions = new_user_item_agg_collect \
                        .select("user_idx", "item_ci", 
user_preference_udf("item_ci").alias("item_model"))

                    updated_accounts = new_predictions \
                        .select(new_predictions['user_idx'],
                                new_predictions['item_ci'],
                                new_predictions["item_model"])

                else:
                    # should not happen in real scenario - we should always 
have some event1
                    updated_accounts = user_item_agg_collect \
                        .select("user_idx", "item_ci", 
user_preference_udf("item_ci").alias("item_model")) \
                        .repartition(executor_count)

                updated_accounts.cache()
                kafka_update_cnt = updated_accounts.count()
                kafka_update = kafka_update_cnt > 0

                # If there is actual updates, run it through
                if kafka_update:
                    logger.info(f"Predictions have changed for 
{kafka_update_cnt} users")
                    application_output_udf = globals()['application_output_udf']
                    application_output_df = updated_accounts \
                        .join(user_mapping_df, updated_accounts.user_idx == 
user_mapping_df.user_idx, 'inner') \
                        .select(updated_accounts["*"], 
user_mapping_df["event3_name_txt"])
                    application_output_df = 
application_output_df.withColumn("cas_output",
                                                                 
application_output_udf(col('event3_name_txt'), col('item_model')))

                    # LOCAL Kafka Produce
                    cas_stream_out = [row['cas_output'] for row in 
application_output_df.select('cas_output').collect()]
                    logger.info(f"Producing to kafka in the driver. Number of 
messages {len(cas_stream_out)}")
                    if len(cas_stream_out) > 0:
                        ku = KafkaConnect(kafka_hosts=kafka_hosts, 
logger=logger)
                        ku.connect_kafka_producer(json_string=True)
                        ku.publish_data_to_kafka(topic=topic_produce, 
data=cas_stream_out)

                else:
                    logger.info(f"No kafka messages to produce")
            else:
                logger.info(f"No accounts to update")

        if (user_sparse_update + kafka_update + user_mapping_update) > 0:
            info_msg = ""
            if user_sparse_update:
                event1_agg_df_item_user.unpersist()
                t = time.time()
                user_item_agg.unpersist()
                globals()['user_item_agg'] = new_user_item_agg
                m = f"user_item_agg overwritten {round(time.time() - t, 2)}"
                info_msg += m

            if kafka_update:
                t = time.time()
                application_output_df \
                    .write \
                    .option("header", "true") \
                    .mode("overwrite") \
                    
.parquet(f'{output_path}/application_output_df/{batch_date}/{output_partition}')
                m = f"Saved application_to_platform {round(time.time() - t, 2)}"
                info_msg += ", " + m

            logger.info(m)
        else:
            logger.info(f"No update in this batch at all")

        um = user_mapping_df.count()
        um2 = user_mapping_df.agg({'user_idx': 'max'}).collect()
        ui = globals()['user_item_agg'].count()
        ui2 = globals()['user_item_agg'].agg({'metric': 'sum'}).collect()
        im = item_mapping_df.agg({'item_idx': 'max'}).collect()
        logger.info(f"user_mapping_df ({um}, {um2}), user_item_agg ({ui}), 
{ui2}, item_mapping_df ({im})")
        
    except Exception:

        if (time.time() - globals()['last_error']) > 60*15: # email error every 
15 mins only
            globals()['last_error'] = time.time()
            cfg = globals()['mail_log_info']
            logging.add_mail_handler(logger, **cfg)
            logger.info("handler added")
        else:
            logger.info("handler not added")
        logger.error("========= BATCH ERROR. {run_str}.  =========", 
exc_info=True)

        logging.remove_mail_handler(logger)

        raise
    finally:
        vars_ = ['event2_df', 'event3_df', 'event4_df', 'event1_df',
                 'event1_agg_df_item_user', 'user_item_agg_collect', 
'updated_accounts', 'active_accounts']
        for var in vars_:
            try:
                locals()[var].unpersist(blocking=True)
                logger.debug(f"Unpersisted {var} variable")
            except:
                pass
                logger.warning(f"Could not upersist {var} variable", 
exc_info=True)
        cas_stream_out = None
        cas_stream_out = None
        in_hdfs_client = None
        user_item_agg_collect = None
        updated_accounts = None
        gc.collect()
        logger.info(f"========= BATCH FINISHED. {run_str}.  =========")


def set_udfs():
    """
    Setting UDFS - which needs to be repeated each time the broadcast variables 
change
        due to serialization
    """

    from application_payload import get_application_output, get_predictions_pure

    def user_preference_func(ls_user_items):
        """
        Return predictions and user factors
        """
        bc_item_factors = item_factors.value
        bc_event4ularization = event4ularization.value
        bc_YtY = YtY.value
        bc_factors = factors.value

        ls_preference_items = []
        for user_items in ls_user_items:
            preference_items, uf = 
get_predictions_pure(user_features={'user_items': list(user_items)},
                                                        
item_factors=bc_item_factors,
                                                        
event4ularization=bc_event4ularization,
                                                        YtY=bc_YtY,
                                                        n_factors=bc_factors,
                                                        with_scores=True)
            item_idxs, attr = zip(*preference_items)
            scores, played_negative_positive = zip(*attr)
            m = np.array(item_idxs) + 0.1
            played_item_idxs_positive = np.multiply(m, played_negative_positive)
            ls_preference_items.append(list(played_item_idxs_positive))

        return pd.Series(ls_preference_items)

    def application_output_func(ls_event3_name_txt, ls_preference_items):
        msgs = []
        bc_game_info = game_info.value
        for event3_name_txt, played_item_idxs_positive in 
zip(ls_event3_name_txt, ls_preference_items):
            cas_user_msg = 
get_application_output(event3_name_txt=event3_name_txt,
                                            
played_item_idxs_positive=played_item_idxs_positive,
                                            game_info=bc_game_info,
                                            items_to_output=15)
            msg = json.dumps(cas_user_msg)
            msgs.append(msg)
        return pd.Series(msgs)

    globals()['user_preference_udf'] = pandas_udf(user_preference_func, 
returnType=ArrayType(DoubleType()))
    globals()['application_output_udf'] = pandas_udf(application_output_func, 
returnType=StringType())


########################################################################################################################
def create_context():
    set_udfs()
    logger.info("Creating new context - not from checkpoint")
    ssc = StreamingContext(sc, 60)
    opts = {"metadata.broker.list": globals()['kafka_hosts'],
            "auto.offset.reset": "largest",
            "group.id": "application_v1"}
    kvs = KafkaUtils.createDirectStream(ssc, [topic_listen], opts)
    kvs.checkpoint(120)
    lines = kvs.map(lambda row: row[1])
    lines.foreachRDD(application_streaming_distributed)
    ssc.checkpoint(checkpoint)
    return ssc


##################################################################################################
run_type = f"{environment}_v{code_version}"
checkpoint = f"hdfs:///data/application/data/checkpoints/checkpoint_{run_type}"

executor_count = 
len(spark.sparkContext._jsc.sc().statusTracker().getExecutorInfos()) - 1
cores_per_executor = 
int(spark.sparkContext.getConf().get('spark.executor.cores', '1'))
total_cores = executor_count * cores_per_executor

topic_listen = "platform.to.application.destination"

mail_log_info = {'MAILHOST': "smtp",
                'FROM': 'application-al...@domain.com',
                'TO': ['exam...@domain.com'],
                'SUBJECT': f'application Error Alert - {run_type}'}

last_error = 0

if environment == "TST":
    topic_produce = 'casino.application.to.platform.destination'
    kafka_hosts = "kafka1:9092,kafka2:9092,kafka3:9092"
    hdfs_nn = "http://1.1.1.1:50070";
    hdfs_user = "hdfs_user"
    save_stream_input = False

elif environment == "PREPRD":
    topic_produce = 'test.application.to.platform.destination'
    kafka_hosts = "kafka1:9092,kafka2:9092,kafka3:9092"
    hdfs_nn = "http://1.1.1.1:50070";
    hdfs_user = "hdfs_user"
    save_stream_input = True

elif environment == "PRD":
    topic_produce = 'casino.application.to.platform.destination'
    kafka_hosts = "kafka1:9092,kafka2:9092,kafka3:9092"
    hdfs_nn = "http://1.1.1.1:50070";
    hdfs_user = "hdfs_user"
    save_stream_input = True
else:
    raise ValueError("Wrong Environment given")

data_folder = '/data/application/data/'
dwh_base_folder = data_folder + 'batch/'
stream_data_input_folder = data_folder + f"stream/input/{run_type}/"
stream_data_output_folder = data_folder + f"stream/output/{run_type}/"
working_directory = data_folder + f"WORKDIR/{run_type}/"

hdfs_client = InsecureClient(hdfs_nn, hdfs_user)
latest_folder = sm.get_latest_success_hdfs(client=hdfs_client, 
root_folder=dwh_base_folder)
latest_dt = datetime.utcfromtimestamp((int(latest_folder) / 10 ** 6))
latest_dwh_folder = f"{dwh_base_folder}/{latest_folder}/"

sm.clear_folder(path=working_directory + "*")
logger.info(f"clearing prev working directory {working_directory} (HDFS)")
sm.mkdir_if_not_exists(path=working_directory)
sm.copy_within_hdfs(frompath=latest_dwh_folder + "*", topath=working_directory)
logger.info(f"copied files from {latest_dwh_folder} to {working_directory} 
(HDFS)")

logger.info("data init start")
user_mapping_df, item_mapping_df, user_item_agg, game_info_df, 
model_parameters, currency_exchange = \
    sm.get_spark_dataframes(spark, working_directory, executor_count, run_type)

item_factors, YtY, event4ularization, factors, game_info = \
    sm.get_and_broadcast_vars(sc, model_parameters, game_info_df)

logger.info("data init end")
clear_checkpoint = True
logger.handlers = [h for h in logger.handlers if isinstance(h, 
logging.FileHandler)]  # delete console logging

clear_checkpoint = True

if clear_checkpoint:
    hdfs_client = InsecureClient(hdfs_nn, user=hdfs_user)
    hdfs_client.delete(checkpoint.replace("hdfs://", ""), recursive=True)
    logger.info(f"checkpoint ({checkpoint.replace('hdfs://', '')}) cleared")

ssc = StreamingContext.getOrCreate(checkpoint, lambda: create_context())
ssc.start()
# ssc.awaitTermination()
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to