Checkpoints not cleaned using Spark streaming + watermarking + kafka

2017-09-21 Thread MathieuP
Hi Spark Users ! :)

I come to you with a question about checkpoints. 
I have a streaming application that consumes and produces to Kafka.
The computation requires a window and watermarking.
Since this is a streaming application with a Kafka output, a checkpoint is
expected.

The application runs using spark-submit on a single master and writes on the
local hard drive. 
It runs fine until the number of checkpoints files in "state" directory
totally fills the disk.
It is  due to the fact that there is no more inode available (not a space
issue ; but tens of thousands inodes are consumed).

I searched in the docs and SO.

I've found the settings :
- spark.cleaner.referenceTracking.cleanCheckpoints
- spark.cleaner.periodicGC.interval
I set them from the app and from the command line, without any success.
Do I misuse them ? Is there another setting ?

I can also see this kind of logs :
...
17/09/21 23:27:46 INFO spark.ContextCleaner: Cleaned accumulator 25
17/09/21 23:27:46 INFO spark.ContextCleaner: Cleaned accumulator 11
17/09/21 23:27:46 INFO spark.ContextCleaner: Cleaned shuffle 0
17/09/21 23:27:46 INFO spark.ContextCleaner: Cleaned accumulator 7
...

A sample that reproduces the issue:
The window, watermarking and output trigger durations are set to 10 seconds.
The kafka topic is quite small (2 messages per seconds are added).

https://gist.github.com/anonymous/2e83db84d5190ed1ad7a7d2d5cd632f0

Regards,




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to pass sparkSession from driver to executor

2017-09-21 Thread ayan guha
The point here is - spark session is not available in executors. So, you
have to use appropriate storage clients.

On Fri, Sep 22, 2017 at 1:44 AM, lucas.g...@gmail.com 
wrote:

> I'm not sure what you're doing.  But I have in the past used spark to
> consume a manifest file and then execute a .mapPartition on the result like
> this:
>
>
> def map_key_to_event(s3_events_data_lake):
>
> def _map_key_to_event(event_key_list, s3_client=test_stub):
> print("Events in list")
> start = time.time()
>
> return_list = []
>
> if s3_client is None:
> s3_client = boto3.Session().client('s3')
>
> for event_key in event_key_list:
>   try:
> response = s3_client.get_object(Bucket=s3_events_data_lake,
> Key=event_key)
> contents = response['Body'].read().decode('utf-8')
> entity = json.loads(contents)
> event_type = json.loads(entity["Message"])["type"]
> entity["Message"] = json.loads(entity["Message"])
> # json.dumps here because Spark doesn't have a good json
> datatype.
> return_list.append((event_type, json.dumps(entity)))
>   except Exception:
> print("Key: {k} did not yield a valid object:
> {o}".format(k=event_key, o=contents))
>
> end = time.time()
> print('time elapsed:')
> print(end - start)
>
> return return_list
>
> return _map_key_to_event
>
>
> pkeys = spark.context.parallelize(full_list_for_time_slice, 32)
> print("partitions: ")
> print(pkeys.getNumPartitions())
> events = pkeys.mapPartitions(map_func)
>
>
>
>
>
> In this case I'm loading heterogeneous json files with wildly different
> schemas, then saving them into parquet file / event type (IE turning one
> big heterogeneous dump into numerous smaller homogenous dumps)
>
> I'm sure this isn't the only or even best way to do it.
>
> The underlying issue is that you're trying to violate the programming
> model.  The model in this case consists of telling the driver what you want
> and then having the executors go do it.
>
> Spark Context is a driver level abstraction, it kind of doesn't make sense
> in the executor context, the executor is acting on behalf of the driver and
> shouldn't need a back reference to it.  You'd end up with some interesting
> execution graphs.
>
> This is a common pattern in spark as far as I can tell.  IE calling a map
> and and then doing something with the items in the executor, either
> computing or enriching.  My case above is a bit weird and I'm not certain
> it's the right mechanism in that I'm literally taking a manifest file and
> turning it into 'n' actual records.
>
> Also, if you're going to be constructing a connection string / jdbc call /
> s3 client... You really don't want to use a straight .map(func).  You'll
> end up instantiating a connection on every iteration.
>
> Hope this is somewhat helpful.
>
> Gary
>
> On 21 September 2017 at 06:31, Weichen Xu 
> wrote:
>
>> Spark do not allow executor code using `sparkSession`.
>> But I think you can move all json files to one directory, and them run:
>>
>> ```
>> spark.read.json("/path/to/jsonFileDir")
>> ```
>> But if you want to get filename at the same time, you can use
>> ```
>> spark.sparkContext.wholeTextFiles("/path/to/jsonFileDir")...
>> ```
>>
>> On Thu, Sep 21, 2017 at 9:18 PM, Riccardo Ferrari 
>> wrote:
>>
>>> Depends on your use-case however broadcasting
>>> 
>>> could be a better option.
>>>
>>> On Thu, Sep 21, 2017 at 2:03 PM, Chackravarthy Esakkimuthu <
>>> chaku.mi...@gmail.com> wrote:
>>>
 Hi,

 I want to know how to pass sparkSession from driver to executor.

 I have a spark program (batch job) which does following,

 #

 val spark = SparkSession.builder().appName("SampleJob").config("spark.
 master", "local") .getOrCreate()

 val df = this is dataframe which has list of file names (hdfs)

 df.foreach { fileName =>

   *spark.read.json(fileName)*

   .. some logic here
 }

 #


 *spark.read.json(fileName) --- this fails as it runs in executor. When
 I put it outside foreach, i.e. in driver, it works.*

 As I am trying to use spark (sparkSession) in executor which is not
 visible outside driver. But I want to read hdfs files inside foreach, how
 do I do it.

 Can someone help how to do this.

 Thanks,
 Chackra

>>>
>>>
>>
>


-- 
Best Regards,
Ayan Guha


Re: How to pass sparkSession from driver to executor

2017-09-21 Thread lucas.g...@gmail.com
I'm not sure what you're doing.  But I have in the past used spark to
consume a manifest file and then execute a .mapPartition on the result like
this:


def map_key_to_event(s3_events_data_lake):

def _map_key_to_event(event_key_list, s3_client=test_stub):
print("Events in list")
start = time.time()

return_list = []

if s3_client is None:
s3_client = boto3.Session().client('s3')

for event_key in event_key_list:
  try:
response = s3_client.get_object(Bucket=s3_events_data_lake,
Key=event_key)
contents = response['Body'].read().decode('utf-8')
entity = json.loads(contents)
event_type = json.loads(entity["Message"])["type"]
entity["Message"] = json.loads(entity["Message"])
# json.dumps here because Spark doesn't have a good json
datatype.
return_list.append((event_type, json.dumps(entity)))
  except Exception:
print("Key: {k} did not yield a valid object:
{o}".format(k=event_key, o=contents))

end = time.time()
print('time elapsed:')
print(end - start)

return return_list

return _map_key_to_event


pkeys = spark.context.parallelize(full_list_for_time_slice, 32)
print("partitions: ")
print(pkeys.getNumPartitions())
events = pkeys.mapPartitions(map_func)





In this case I'm loading heterogeneous json files with wildly different
schemas, then saving them into parquet file / event type (IE turning one
big heterogeneous dump into numerous smaller homogenous dumps)

I'm sure this isn't the only or even best way to do it.

The underlying issue is that you're trying to violate the programming
model.  The model in this case consists of telling the driver what you want
and then having the executors go do it.

Spark Context is a driver level abstraction, it kind of doesn't make sense
in the executor context, the executor is acting on behalf of the driver and
shouldn't need a back reference to it.  You'd end up with some interesting
execution graphs.

This is a common pattern in spark as far as I can tell.  IE calling a map
and and then doing something with the items in the executor, either
computing or enriching.  My case above is a bit weird and I'm not certain
it's the right mechanism in that I'm literally taking a manifest file and
turning it into 'n' actual records.

Also, if you're going to be constructing a connection string / jdbc call /
s3 client... You really don't want to use a straight .map(func).  You'll
end up instantiating a connection on every iteration.

Hope this is somewhat helpful.

Gary

On 21 September 2017 at 06:31, Weichen Xu  wrote:

> Spark do not allow executor code using `sparkSession`.
> But I think you can move all json files to one directory, and them run:
>
> ```
> spark.read.json("/path/to/jsonFileDir")
> ```
> But if you want to get filename at the same time, you can use
> ```
> spark.sparkContext.wholeTextFiles("/path/to/jsonFileDir")...
> ```
>
> On Thu, Sep 21, 2017 at 9:18 PM, Riccardo Ferrari 
> wrote:
>
>> Depends on your use-case however broadcasting
>> 
>> could be a better option.
>>
>> On Thu, Sep 21, 2017 at 2:03 PM, Chackravarthy Esakkimuthu <
>> chaku.mi...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I want to know how to pass sparkSession from driver to executor.
>>>
>>> I have a spark program (batch job) which does following,
>>>
>>> #
>>>
>>> val spark = SparkSession.builder().appName("SampleJob").config("spark.
>>> master", "local") .getOrCreate()
>>>
>>> val df = this is dataframe which has list of file names (hdfs)
>>>
>>> df.foreach { fileName =>
>>>
>>>   *spark.read.json(fileName)*
>>>
>>>   .. some logic here
>>> }
>>>
>>> #
>>>
>>>
>>> *spark.read.json(fileName) --- this fails as it runs in executor. When I
>>> put it outside foreach, i.e. in driver, it works.*
>>>
>>> As I am trying to use spark (sparkSession) in executor which is not
>>> visible outside driver. But I want to read hdfs files inside foreach, how
>>> do I do it.
>>>
>>> Can someone help how to do this.
>>>
>>> Thanks,
>>> Chackra
>>>
>>
>>
>


Re: How to pass sparkSession from driver to executor

2017-09-21 Thread Weichen Xu
Spark do not allow executor code using `sparkSession`.
But I think you can move all json files to one directory, and them run:

```
spark.read.json("/path/to/jsonFileDir")
```
But if you want to get filename at the same time, you can use
```
spark.sparkContext.wholeTextFiles("/path/to/jsonFileDir")...
```

On Thu, Sep 21, 2017 at 9:18 PM, Riccardo Ferrari 
wrote:

> Depends on your use-case however broadcasting
> 
> could be a better option.
>
> On Thu, Sep 21, 2017 at 2:03 PM, Chackravarthy Esakkimuthu <
> chaku.mi...@gmail.com> wrote:
>
>> Hi,
>>
>> I want to know how to pass sparkSession from driver to executor.
>>
>> I have a spark program (batch job) which does following,
>>
>> #
>>
>> val spark = SparkSession.builder().appName("SampleJob").config("spark.
>> master", "local") .getOrCreate()
>>
>> val df = this is dataframe which has list of file names (hdfs)
>>
>> df.foreach { fileName =>
>>
>>   *spark.read.json(fileName)*
>>
>>   .. some logic here
>> }
>>
>> #
>>
>>
>> *spark.read.json(fileName) --- this fails as it runs in executor. When I
>> put it outside foreach, i.e. in driver, it works.*
>>
>> As I am trying to use spark (sparkSession) in executor which is not
>> visible outside driver. But I want to read hdfs files inside foreach, how
>> do I do it.
>>
>> Can someone help how to do this.
>>
>> Thanks,
>> Chackra
>>
>
>


How to pass sparkSession from driver to executor

2017-09-21 Thread Chackravarthy Esakkimuthu
Hi,

I want to know how to pass sparkSession from driver to executor.

I have a spark program (batch job) which does following,

#

val spark = SparkSession.builder().appName("SampleJob").config(
"spark.master", "local") .getOrCreate()

val df = this is dataframe which has list of file names (hdfs)

df.foreach { fileName =>

  *spark.read.json(fileName)*

  .. some logic here
}

#


*spark.read.json(fileName) --- this fails as it runs in executor. When I
put it outside foreach, i.e. in driver, it works.*

As I am trying to use spark (sparkSession) in executor which is not visible
outside driver. But I want to read hdfs files inside foreach, how do I do
it.

Can someone help how to do this.

Thanks,
Chackra


Re: for loops in pyspark

2017-09-21 Thread Alexander Czech
That is not really possible the whole project is rather large and I would
not like to release it before I published the results.

But if there is no know issues with doing spark in a for loop I will look
into other possibilities for memory leaks.

Thanks


On 20 Sep 2017 15:22, "Weichen Xu"  wrote:

Spark manage memory allocation and release automatically. Can you post the
complete program which help checking where is wrong ?

On Wed, Sep 20, 2017 at 8:12 PM, Alexander Czech <
alexander.cz...@googlemail.com> wrote:

> Hello all,
>
> I'm running a pyspark script that makes use of for loop to create smaller
> chunks of my main dataset.
>
> some example code:
>
> for chunk in chunks:
> my_rdd = sc.parallelize(chunk).flatmap(somefunc)
> # do some stuff with my_rdd
>
> my_df = make_df(my_rdd)
> # do some stuff with my_df
> my_df.write.parquet('./some/path')
>
> After a couple of loops I always start to loose executors because out of
> memory errors. Is there a way free up memory after an loop? Do I have to do
> it in python or with spark?
>
> Thanks
>