PySpark: Make persist() return a context manager

2016-08-04 Thread Nicholas Chammas
Context managers
 are a
natural way to capture closely related setup and teardown code in Python.

For example, they are commonly used when doing file I/O:

with open('/path/to/file') as f:
contents = f.read()
...

Once the program exits the with block, f is automatically closed.

Does it make sense to apply this pattern to persisting and unpersisting
DataFrames and RDDs? I feel like there are many cases when you want to
persist a DataFrame for a specific set of operations and then unpersist it
immediately afterwards.

For example, take model training. Today, you might do something like this:

labeled_data.persist()
model = pipeline.fit(labeled_data)
labeled_data.unpersist()

If persist() returned a context manager, you could rewrite this as follows:

with labeled_data.persist():
model = pipeline.fit(labeled_data)

Upon exiting the with block, labeled_data would automatically be
unpersisted.

This can be done in a backwards-compatible way since persist() would still
return the parent DataFrame or RDD as it does today, but add two methods to
the object: __enter__() and __exit__()

Does this make sense? Is it attractive?

Nick
​


Re: We don't use ASF Jenkins builds, right?

2016-08-04 Thread Reynold Xin
We don't.

On Friday, August 5, 2016, Sean Owen  wrote:

> There was a recent message about deprecating many Maven, ant and JDK
> combos for ASF Jenkins machines, and I was just triple-checking we're
> only making use of the Amplab ones.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org 
>
>


We don't use ASF Jenkins builds, right?

2016-08-04 Thread Sean Owen
There was a recent message about deprecating many Maven, ant and JDK
combos for ASF Jenkins machines, and I was just triple-checking we're
only making use of the Amplab ones.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Inquery about Spark's behaviour for configurations in Hadoop configuration instance via read/write.options()

2016-08-04 Thread Hyukjin Kwon
Hi all,


If my understanding is correct, now Spark supports to set some options to
Hadoop configuration instance via read/write.option(..) API.

However, I recently saw some comments and opinion about this. If I
understood them correctly, it was as below:

   -

   Respecting all the configurations in Hadoop configuration instance
   including what Spark configurations/options cover and Spark
   configurations/options will override it if the equivalent ones are
   duplicated
   -

   Not respecting the configurations in the instance that Spark
   configurations/options cover, meaning using the default value in Spark
   configuration regardless of the values set in this instance for the
   equivalent ones.

For example, now, Spark is supporting compression for ORC as an option but
currently we are not respecting orc.compress. This is being ignored
(meaning following the latter case).

Maybe I understood those comments and opinions wrongly and might be a dump
question from my misunderstanding but I would really appreciate that if
anyone helps me to know which one is correct.
​

Thanks !


Re: Source API requires unbounded distributed storage?

2016-08-04 Thread Michael Armbrust
Yeah, this API is in the private execution package because we are planning
to continue to iterate on it.  Today, we will only ever go back one batch,
though that might change in the future if we do async checkpointing of
internal state.

You are totally right that we should relay this info back to the source.
Opening a JIRA sounds like a good first step.

On Thu, Aug 4, 2016 at 4:38 PM, Fred Reiss  wrote:

> Hi,
>
> I've been looking over the Source API in 
> org.apache.spark.sql.execution.streaming,
> and I'm at a loss for how the current API can be implemented in a practical
> way. The API defines a single getBatch() method for fetching records from
> the source, with the following Scaladoc comments defining the semantics:
>
>
> */**  * Returns the data that is between the offsets (*`*start*`*, *`*end*
> `*]. When *`*start*` *is *`*None*`
>
> *then  * the batch should begin with the first available record. This
> method must always return the  * same data for a particular *`*start*` *and
> *`*end*`
> *pair.  */*
> * def *getBatch(start: Option[Offset], end: Offset): DataFrame
>
> If I read the semantics described here correctly, a Source is required to
> retain all past history for the stream that it backs. Further, a Source
> is also required to retain this data across restarts of the process where
> the Source is instantiated, even when the Source is restarted on a
> different machine.
>
> The current implementation of FileStreamSource follows my reading of the
> requirements above. FileStreamSource never deletes a file.
>
> I feel like this requirement for unbounded state retention must be a
> mistake or misunderstanding of some kind. The scheduler is internally
> maintaining a high water mark (StreamExecution.committedOffsets in
> StreamExecution.scala) of data that has been successfully processed. There
> must have been an intent to communicate that high water mark back to the
> Source so that the Source can clean up its state. Indeed, the DataBricks
> blog post from last week (https://databricks.com/blog/
> 2016/07/28/structured-streaming-in-apache-spark.html) says that "Only a
> few minutes’ worth of data needs to be retained; Structured Streaming will
> maintain its own internal state after that."
>
> But the code checked into git and shipped with Spark 2.0 does not have an
> API call for the scheduler to tell a Source where the boundary of "only a
> few minutes' worth of data" lies.
>
> Is there a JIRA that I'm not aware of to change the Source API? If not,
> should we maybe open one?
>
> Fred
>


Source API requires unbounded distributed storage?

2016-08-04 Thread Fred Reiss
Hi,

I've been looking over the Source API in
org.apache.spark.sql.execution.streaming, and I'm at a loss for how the
current API can be implemented in a practical way. The API defines a single
getBatch() method for fetching records from the source, with the following
Scaladoc comments defining the semantics:


*/**  * Returns the data that is between the offsets (*`*start*`*, *`*end*`*].
When *`*start*` *is *`*None*`

*then  * the batch should begin with the first available record. This
method must always return the  * same data for a particular *`*start*` *and
*`*end*`
*pair.  */*
* def *getBatch(start: Option[Offset], end: Offset): DataFrame

If I read the semantics described here correctly, a Source is required to
retain all past history for the stream that it backs. Further, a Source is
also required to retain this data across restarts of the process where the
Source is instantiated, even when the Source is restarted on a different
machine.

The current implementation of FileStreamSource follows my reading of the
requirements above. FileStreamSource never deletes a file.

I feel like this requirement for unbounded state retention must be a
mistake or misunderstanding of some kind. The scheduler is internally
maintaining a high water mark (StreamExecution.committedOffsets in
StreamExecution.scala) of data that has been successfully processed. There
must have been an intent to communicate that high water mark back to the
Source so that the Source can clean up its state. Indeed, the DataBricks
blog post from last week (
https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html)
says that "Only a few minutes’ worth of data needs to be retained;
Structured Streaming will maintain its own internal state after that."

But the code checked into git and shipped with Spark 2.0 does not have an
API call for the scheduler to tell a Source where the boundary of "only a
few minutes' worth of data" lies.

Is there a JIRA that I'm not aware of to change the Source API? If not,
should we maybe open one?

Fred


Re: Spark SQL and Kryo registration

2016-08-04 Thread Amit Sela
It should. Codegen uses the SparkConf in SparkEnv when instantiating a new
Serializer.

On Thu, Aug 4, 2016 at 6:14 PM Jacek Laskowski  wrote:

> Hi Olivier,
>
> I don't know either, but am curious what you've tried already.
>
> Jacek
>
> On 3 Aug 2016 10:50 a.m., "Olivier Girardot" <
> o.girar...@lateral-thoughts.com> wrote:
>
>> Hi everyone,
>> I'm currently to use Spark 2.0.0 and making Dataframes work
>> with kryo.registrationRequired=true
>> Is it even possible at all considering the codegen ?
>>
>> Regards,
>>
>> *Olivier Girardot* | Associé
>> o.girar...@lateral-thoughts.com
>> +33 6 24 09 17 94
>>
>


Re: Spark SQL and Kryo registration

2016-08-04 Thread Jacek Laskowski
Hi Olivier,

I don't know either, but am curious what you've tried already.

Jacek

On 3 Aug 2016 10:50 a.m., "Olivier Girardot" <
o.girar...@lateral-thoughts.com> wrote:

> Hi everyone,
> I'm currently to use Spark 2.0.0 and making Dataframes work with kryo.
> registrationRequired=true
> Is it even possible at all considering the codegen ?
>
> Regards,
>
> *Olivier Girardot* | Associé
> o.girar...@lateral-thoughts.com
> +33 6 24 09 17 94
>