Re: Increase batch interval in case of delay

2021-07-01 Thread András Kolbert
After a 10 minutes delay, taking a 10 minutes batch will not take 10 times
more than a 1-minute batch.

It's mainly because of the I/O write operations to HDFS, and also because
certain active users will be active in 1-minute batch, processing this
customer only once (if we take 10 batches) will save time.



On Thu, 1 Jul 2021 at 13:45, Sean Owen  wrote:

> Wouldn't this happen naturally? the large batches would just take a longer
> time to complete already.
>
> On Thu, Jul 1, 2021 at 6:32 AM András Kolbert 
> wrote:
>
>> Hi,
>>
>> I have a spark streaming application which generally able to process the
>> data within the given time frame. However, in certain hours it starts
>> increasing that causes a delay.
>>
>> In my scenario, the number of input records are not linearly increase the
>> processing time. Hence, ideally I'd like to increase the number of
>> batches/records that are being processed after a delay reaches a certain
>> time.
>>
>> Is there a possibility/settings to do so?
>>
>> Thanks
>> Andras
>>
>>
>> [image: image.png]
>>
>


Increase batch interval in case of delay

2021-07-01 Thread András Kolbert
Hi,

I have a spark streaming application which generally able to process the
data within the given time frame. However, in certain hours it starts
increasing that causes a delay.

In my scenario, the number of input records are not linearly increase the
processing time. Hence, ideally I'd like to increase the number of
batches/records that are being processed after a delay reaches a certain
time.

Is there a possibility/settings to do so?

Thanks
Andras


[image: image.png]


Re: Tasks are skewed to one executor

2021-04-11 Thread András Kolbert
Hi,

Sure!

Application:
- Spark version 2.4
- Kafka Stream (DStream, from a kafka 0.8 brokers)
- 7 executors, 2cores, 3700M memory size

Logic:
- Process initialises a dataframe that contains metrics for an
account/product metrics (e.g. {"account":A, "product": X123, "metric"; 51}
- After initialisation, the dataframe is persisted on HDFS (dataframe is
around 1GB total size in memory)
- Streaming:
- each bach, processes incoming data, unions the main dataframe with the
new account/product/metric interaction dataframe, aggregates the total, and
then persist on HDFS again (each batch we save the total dataframe again)
- The screenshot I sent earlier, was after this aggregation, and how all
the data seems to be ended up on the same executor. That could explain why
the executor periodically dies with OOM.

Mich, I hope this provides extra information :)

Thanks
Andras






On Sat, 10 Apr 2021 at 16:42, Mich Talebzadeh 
wrote:

> Hi,
>
> Can you provide a bit more info please?
>
> How are you running this job and what is the streaming framework (kafka,
> files etc)?
>
> HTH
>
>
> Mich
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 10 Apr 2021 at 14:28, András Kolbert 
> wrote:
>
>> hi,
>>
>> I have a streaming job and quite often executors die (due to memory
>> errors/ "unable to find location for shuffle etc) during the processing. I
>> started digging and found that some of the tasks are concentrated to one
>> executor, just as below:
>> [image: image.png]
>>
>> Can this be the reason?
>> Should I repartition the underlying data before I execute a groupby on
>> the top of it?
>>
>> Any advice is welcome
>>
>> Thanks
>> Andras
>>
>


Tasks are skewed to one executor

2021-04-10 Thread András Kolbert
hi,

I have a streaming job and quite often executors die (due to memory errors/
"unable to find location for shuffle etc) during the processing. I started
digging and found that some of the tasks are concentrated to one executor,
just as below:
[image: image.png]

Can this be the reason?
Should I repartition the underlying data before I execute a groupby on the
top of it?

Any advice is welcome

Thanks
Andras


Re: Use case advice

2021-01-14 Thread András Kolbert
sorry missed out a bit. Added, highlighted with yellow.

On Thu, 14 Jan 2021 at 13:54, András Kolbert 
wrote:

> Thanks, Muru, very helpful suggestion! Delta Lake is amazing, completely
> changed a few of my projects!
>
> One question regarding that.
> When I use the following statement, all works fine and I can use delta
> properly, in the spark context that jupyter initiates automatically.
>
> export PYSPARK_DRIVER_PYTHON=jupyter
> export PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port=8890'
>
> PYSPARK_PYTHON=pyspark \
> --master yarn \
> --deploy-mode client \
> --driver-memory 4g \
> --executor-memory 16G \
> --executor-cores 1 \
> --num-executors 8 \
> --conf
> spark.yarn.archive=hdfs://node-master:9000/libs/spark/jars/spark-libs.jar \
> --jars
> hdfs://node-master:9000/libs/spark/common/spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar,hdfs://node-master:9000/libs/spark/common/delta-core_2.11-0.6.1.jar
>
>
> However, I would like to have a local pyspark initially, and only connect
> to YARN when the specific notebook is configured in that way.
>
> 1)
>
> export PYSPARK_DRIVER_PYTHON=jupyter
> export PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port=8890'
>
> PYSPARK_PYTHON=pyspark
>
> 2)
> conf = spark.sparkContext._conf.setAll([
> ('spark.app.name', 'Delta Demo'),
> ('spark.yarn.jars',
> 'hdfs://node-master:9000/libs/spark/common/delta-core_2.11-0.6.1.jar'),
>
('spark.master', 'yarn-client'),
('spark.executor.memory', '16g'),
('spark.executor.instances', '8'),
('spark.executor.cores', '1'),
('spark.driver.memory', '4g'),

> ("spark.jars.packages", "io.delta:delta-core_2.11:0.6.1"),
> ("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension"),
> ("spark.sql.catalog.spark_catalog",
> "org.apache.spark.sql.delta.catalog.DeltaCatalog")
> ])
> spark.sparkContext.stop()
>
> spark = SparkSession \
> .builder \
> .config(conf=conf) \
> .getOrCreate()
> sc = spark.sparkContext
>
>
> spark.sparkContext.addPyFile("hdfs://node-master:9000/libs/spark/common/delta-core_2.11-0.6.1.jar")
> from delta.tables import *
> delta_path = "/data/delta-table"
> data = spark.range(0, 5)
> data.show()
> data.write.format("delta").mode("overwrite").save(delta_path)
>
>
> This way, I keep facing with the ' Error:
> java.lang.ClassNotFoundException: Failed to find data source: delta. '
> error message.
>
> What did I miss in my configuration/env variables?
>
> Thanks
> Andras
>
>
>
> On Sun, 10 Jan 2021, 3:33 am muru,  wrote:
>
>> You could try Delta Lake or Apache Hudi for this use case.
>>
>> On Sat, Jan 9, 2021 at 12:32 PM András Kolbert 
>> wrote:
>>
>>> Sorry if my terminology is misleading.
>>>
>>> What I meant under driver only is to use a local pandas dataframe
>>> (collect the data to the master), and keep updating that instead of dealing
>>> with a spark distributed dataframe for holding this data.
>>>
>>> For example, we have a dataframe with all users and their corresponding
>>> latest activity timestamp. After each streaming batch, aggregations are
>>> performed and the calculation is collected to the driver to update a subset
>>> of users latest activity timestamp.
>>>
>>>
>>>
>>> On Sat, 9 Jan 2021, 6:18 pm Artemis User, 
>>> wrote:
>>>
>>>> Could you please clarify what do you mean by 1)? Driver is only
>>>> responsible for submitting Spark job, not performing.
>>>>
>>>> -- ND
>>>>
>>>> On 1/9/21 9:35 AM, András Kolbert wrote:
>>>> > Hi,
>>>> > I would like to get your advice on my use case.
>>>> > I have a few spark streaming applications where I need to keep
>>>> > updating a dataframe after each batch. Each batch probably affects a
>>>> > small fraction of the dataframe (5k out of 200k records).
>>>> >
>>>> > The options I have been considering so far:
>>>> > 1) keep dataframe on the driver, and update that after each batch
>>>> > 2) keep dataframe distributed, and use checkpointing to mitigate
>>>> lineage
>>>> >
>>>> > I solved previous use cases with option 2, but I am not sure if it is
>>>> > the most optimal as checkpointing is relatively expensive. I also
>>>> > wondered about HBASE or some sort of quick access memory storage,
>>>> > however it is currently not in my stack.
>>>> >
>>>> > Curious to hear your thoughts
>>>> >
>>>> > Andras
>>>> >
>>>>
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>


Re: Use case advice

2021-01-14 Thread András Kolbert
Thanks, Muru, very helpful suggestion! Delta Lake is amazing, completely
changed a few of my projects!

One question regarding that.
When I use the following statement, all works fine and I can use delta
properly, in the spark context that jupyter initiates automatically.

export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port=8890'

PYSPARK_PYTHON=pyspark \
--master yarn \
--deploy-mode client \
--driver-memory 4g \
--executor-memory 16G \
--executor-cores 1 \
--num-executors 8 \
--conf
spark.yarn.archive=hdfs://node-master:9000/libs/spark/jars/spark-libs.jar \
--jars
hdfs://node-master:9000/libs/spark/common/spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar,hdfs://node-master:9000/libs/spark/common/delta-core_2.11-0.6.1.jar


However, I would like to have a local pyspark initially, and only connect
to YARN when the specific notebook is configured in that way.

1)

export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port=8890'

PYSPARK_PYTHON=pyspark

2)
conf = spark.sparkContext._conf.setAll([
('spark.app.name', 'Delta Demo'),
('spark.yarn.jars',
'hdfs://node-master:9000/libs/spark/common/delta-core_2.11-0.6.1.jar'),
("spark.jars.packages", "io.delta:delta-core_2.11:0.6.1"),
("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension"),
("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
])
spark.sparkContext.stop()

spark = SparkSession \
.builder \
.config(conf=conf) \
.getOrCreate()
sc = spark.sparkContext

spark.sparkContext.addPyFile("hdfs://node-master:9000/libs/spark/common/delta-core_2.11-0.6.1.jar")
from delta.tables import *
delta_path = "/data/delta-table"
data = spark.range(0, 5)
data.show()
data.write.format("delta").mode("overwrite").save(delta_path)


This way, I keep facing with the ' Error: java.lang.ClassNotFoundException:
Failed to find data source: delta. ' error message.

What did I miss in my configuration/env variables?

Thanks
Andras



On Sun, 10 Jan 2021, 3:33 am muru,  wrote:

> You could try Delta Lake or Apache Hudi for this use case.
>
> On Sat, Jan 9, 2021 at 12:32 PM András Kolbert 
> wrote:
>
>> Sorry if my terminology is misleading.
>>
>> What I meant under driver only is to use a local pandas dataframe
>> (collect the data to the master), and keep updating that instead of dealing
>> with a spark distributed dataframe for holding this data.
>>
>> For example, we have a dataframe with all users and their corresponding
>> latest activity timestamp. After each streaming batch, aggregations are
>> performed and the calculation is collected to the driver to update a subset
>> of users latest activity timestamp.
>>
>>
>>
>> On Sat, 9 Jan 2021, 6:18 pm Artemis User,  wrote:
>>
>>> Could you please clarify what do you mean by 1)? Driver is only
>>> responsible for submitting Spark job, not performing.
>>>
>>> -- ND
>>>
>>> On 1/9/21 9:35 AM, András Kolbert wrote:
>>> > Hi,
>>> > I would like to get your advice on my use case.
>>> > I have a few spark streaming applications where I need to keep
>>> > updating a dataframe after each batch. Each batch probably affects a
>>> > small fraction of the dataframe (5k out of 200k records).
>>> >
>>> > The options I have been considering so far:
>>> > 1) keep dataframe on the driver, and update that after each batch
>>> > 2) keep dataframe distributed, and use checkpointing to mitigate
>>> lineage
>>> >
>>> > I solved previous use cases with option 2, but I am not sure if it is
>>> > the most optimal as checkpointing is relatively expensive. I also
>>> > wondered about HBASE or some sort of quick access memory storage,
>>> > however it is currently not in my stack.
>>> >
>>> > Curious to hear your thoughts
>>> >
>>> > Andras
>>> >
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>


Re: Use case advice

2021-01-09 Thread András Kolbert
Sorry if my terminology is misleading.

What I meant under driver only is to use a local pandas dataframe (collect
the data to the master), and keep updating that instead of dealing with a
spark distributed dataframe for holding this data.

For example, we have a dataframe with all users and their corresponding
latest activity timestamp. After each streaming batch, aggregations are
performed and the calculation is collected to the driver to update a subset
of users latest activity timestamp.



On Sat, 9 Jan 2021, 6:18 pm Artemis User,  wrote:

> Could you please clarify what do you mean by 1)? Driver is only
> responsible for submitting Spark job, not performing.
>
> -- ND
>
> On 1/9/21 9:35 AM, András Kolbert wrote:
> > Hi,
> > I would like to get your advice on my use case.
> > I have a few spark streaming applications where I need to keep
> > updating a dataframe after each batch. Each batch probably affects a
> > small fraction of the dataframe (5k out of 200k records).
> >
> > The options I have been considering so far:
> > 1) keep dataframe on the driver, and update that after each batch
> > 2) keep dataframe distributed, and use checkpointing to mitigate lineage
> >
> > I solved previous use cases with option 2, but I am not sure if it is
> > the most optimal as checkpointing is relatively expensive. I also
> > wondered about HBASE or some sort of quick access memory storage,
> > however it is currently not in my stack.
> >
> > Curious to hear your thoughts
> >
> > Andras
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Use case advice

2021-01-09 Thread András Kolbert
Hi,
I would like to get your advice on my use case.
I have a few spark streaming applications where I need to keep updating a
dataframe after each batch. Each batch probably affects a small fraction of
the dataframe (5k out of 200k records).

The options I have been considering so far:
1) keep dataframe on the driver, and update that after each batch
2) keep dataframe distributed, and use checkpointing to mitigate lineage

I solved previous use cases with option 2, but I am not sure if it is the
most optimal as checkpointing is relatively expensive. I also wondered
about HBASE or some sort of quick access memory storage, however it is
currently not in my stack.

Curious to hear your thoughts

Andras


Re: Spark Streaming Checkpointing

2020-09-04 Thread András Kolbert
Hi Gábor,


Thanks for your reply on this!

Internally that's used at the company I work at - it hasn't been changed
mainly due to the compatibility of the current deployed java applications.

Hence I am attempting to make the most of this version :)

András



On Fri, 4 Sep 2020, 14:09 Gabor Somogyi,  wrote:

> Hi Andras,
>
> A general suggestion is to use Structured Streaming instead of DStreams
> because it provides several things out of the box (stateful streaming,
> etc...).
> Kafka 0.8 is super old and deprecated (no security...). Do you have a
> specific reason to use that?
>
> BR,
> G
>
>
> On Thu, Sep 3, 2020 at 11:41 AM András Kolbert 
> wrote:
>
>> Hi All,
>>
>> I have a Spark streaming application (2.4.4, Kafka 0.8 >> so Spark Direct
>> Streaming) running just fine.
>>
>> I create a context in the following way:
>>
>> ssc = StreamingContext(sc, 60) opts = 
>> {"metadata.broker.list":kafka_hosts,"auto.offset.reset": "largest", 
>> "group.id": run_type}
>> kvs = KafkaUtils.createDirectStream(ssc, [topic_listen], opts)
>> kvs.checkpoint(120)
>>
>> lines = kvs.map(lambda row: row[1]) lines.foreachRDD(streaming_app)
>> ssc.checkpoint(checkpoint)
>>
>> The streaming app at a high level does this:
>>
>>- processes incoming batch
>>- unions to the dataframe from the previous batch and aggregates them
>>
>> Currently, I use checkpointing explicitly (df = df.checkpoint()) to
>> optimise the lineage. Although this is quite an expensive exercise and was
>> wondering if there is a better way to do this.
>>
>> I tried to disable this explicit checkpointing, as I have a periodical
>> checkpointing (kvs.checkpoint(120) ) so I thought that the lineage will
>> be kept to that checkpointed RDD. Although in reality that is not the case
>> and processing keeps increasing over time.
>>
>> Am I doing something inherently wrong? Is there a better way of doing
>> this?
>>
>> Thanks
>> Andras
>>
>


Spark Streaming Checkpointing

2020-09-03 Thread András Kolbert
Hi All,

I have a Spark streaming application (2.4.4, Kafka 0.8 >> so Spark Direct
Streaming) running just fine.

I create a context in the following way:

ssc = StreamingContext(sc, 60) opts =
{"metadata.broker.list":kafka_hosts,"auto.offset.reset": "largest",
 "group.id": run_type}
kvs = KafkaUtils.createDirectStream(ssc, [topic_listen], opts)
kvs.checkpoint(120)

lines = kvs.map(lambda row: row[1]) lines.foreachRDD(streaming_app)
ssc.checkpoint(checkpoint)

The streaming app at a high level does this:

   - processes incoming batch
   - unions to the dataframe from the previous batch and aggregates them

Currently, I use checkpointing explicitly (df = df.checkpoint()) to
optimise the lineage. Although this is quite an expensive exercise and was
wondering if there is a better way to do this.

I tried to disable this explicit checkpointing, as I have a periodical
checkpointing (kvs.checkpoint(120) ) so I thought that the lineage will be
kept to that checkpointed RDD. Although in reality that is not the case and
processing keeps increasing over time.

Am I doing something inherently wrong? Is there a better way of doing this?

Thanks
Andras


Spark Streaming Memory

2020-05-17 Thread András Kolbert
Hi,

I have a streaming job (Spark 2.4.4) in which the memory usage keeps
increasing over time.

Periodically (20-25) mins the executors fall over
(org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 6987) due to out of memory. In the UI, I can see that
the memory keeps increasing batch by batch, although I do not keep more
data in memory (I keep unpersisting, checkpointing and caching new data
frames though), the storage tabs shows only the expected 4 objects overtime.

I wish I just missed a parameter in the spark configuration (like garbage
collection, reference tracking, etc) that would solve my issue. I have seen
a few JIRA tickets around memory leak (SPARK-19644
, SPARK-29055
, SPARK-29321
) it might be the same
issue?

 ("spark.cleaner.referenceTracking.cleanCheckpoints", "true"),
 ('spark.cleaner.periodicGC.interval', '1min'),
 ('spark.cleaner.referenceTracking','true'),
 ('spark.cleaner.referenceTracking.blocking.shuffle','true'),
 ('spark.sql.streaming.minBatchesToRetain', '2'),
 ('spark.sql.streaming.maxBatchesToRetainInMemory', '5'),
 ('spark.ui.retainedJobs','50' ),
 ('spark.ui.retainedStages','50'),
 ('spark.ui.retainedTasks','500'),
 ('spark.worker.ui.retainedExecutors','50'),
 ('spark.worker.ui.retainedDrivers','50'),
 ('spark.sql.ui.retainedExecutions','50'),
 ('spark.streaming.ui.retainedBatches','1440'),
 ('spark.executor.JavaOptions','-XX:+UseG1GC -verbose:gc
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps')

I've tried lowering the spark.streaming.ui.retainedBatches to 8, did not
help.

The application works fine apart from the fact that the processing
some batches take longer (when the executors fall over).

[image: image.png]
[image: image.png]


Any ideas?

I've attached my code.


Thanks,
Andras
import sys
from datetime import datetime, timedelta
import numpy as np
import time
from pathlib import Path
import json
from subprocess import PIPE, run
import pandas as pd
import pyarrow
import gc

from pyspark import SparkContext
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import ArrayType, IntegerType, DoubleType, StringType, 
BooleanType
from pyspark.sql.functions import col, row_number, udf, collect_list, udf, 
array, struct, lit, log, exp
from pyspark.sql import DataFrame, Window

code_version = "03"
environment = "PREPRD"

def_config = spark.sparkContext._conf.getAll()
conf = spark.sparkContext._conf.setAll(
[('spark.app.name', f'application_Streaming_{environment}_v{code_version}'),
 ('spark.executor.memory', '3500M'),
 ('spark.executor.cores', '2'),
 ('spark.cores.max', '4'),
 ('spark.driver.memory', '8g'),
 ('archives', 
'hdfs://node-master:9000/data/application/deployment/env/application_scoring_v1.tar.gz#application_scoring_v1'),
 ("spark.cleaner.referenceTracking.cleanCheckpoints", "true"),
 ('spark.cleaner.periodicGC.interval', '1min'),
 ('spark.cleaner.referenceTracking','true'),
 ('spark.cleaner.referenceTracking.blocking.shuffle','true'),
 ('spark.sql.streaming.minBatchesToRetain', '2'),
 ('spark.sql.streaming.maxBatchesToRetainInMemory', '5'),
 ('spark.ui.retainedJobs','50' ),
 ('spark.ui.retainedStages','50'),
 ('spark.ui.retainedTasks','500'),
 ('spark.worker.ui.retainedExecutors','50'),
 ('spark.worker.ui.retainedDrivers','50'),
 ('spark.sql.ui.retainedExecutions','50'),
 ('spark.streaming.ui.retainedBatches','1440'),
 ('spark.executor.JavaOptions','-XX:+UseG1GC -verbose:gc 
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps')
 ])

spark.sparkContext.stop()

os.environ['LOG_DIRS'] = '/application/logs/application_logs'

spark = SparkSession \
.builder \
.config(conf=conf) \
.getOrCreate()
sc = spark.sparkContext
spark.conf.set("spark.sql.execution.arrow.enabled", "false")
spark.conf.set("spark.sql.execution.arrow.fallback.enabled", "true")
spark.conf.set("spark.streaming.stopGracefullyOnShutdown", "true")

sc.addFile('hdfs:///data/application/deployment/shared_utils.tar')
sc.addPyFile('hdfs:///data/application/deployment/application_payload.py')
sc.addPyFile('hdfs:///data/application/deployment/spark_logging.py')
sc.setLogLevel("ERROR")

from hdfs import InsecureClient
from application_payload import get_channel_game_info, get_application_output, 
get_predictions_pure
from shared_utils.KafkaUtils.kafka_utils import KafkaConnect
import spark_logging as logging
import spark_misc as sm

logger = logging.getLogger("driver", 
logfile=f"{sc._jsc.sc().applicationId()}.log")
init_handler = logging.StreamHandler()
init_handler.setLevel(logging.DEBUG)