Re: DataSourceV2 write input requirements

2018-03-28 Thread Russell Spitzer
Ah yeah sorry I got a bit mixed up.

On Wed, Mar 28, 2018 at 7:54 PM Ted Yu  wrote:

> bq. this shuffle could outweigh the benefits of the organized data if the
> cardinality is lower.
>
> I wonder if you meant higher in place of the last word above.
>
> Cheers
>
> On Wed, Mar 28, 2018 at 7:50 PM, Russell Spitzer <
> russell.spit...@gmail.com> wrote:
>
>> For added color, one thing that I may want to consider as a data source
>> implementer is the cost / benefit of applying a particular clustering. For
>> example, a dataset with low cardinality in the clustering key could benefit
>> greatly from clustering on that key before writing to Cassandra since
>> Cassandra can benefit from these sorts of batching. But the cost of
>> performing this shuffle could outweigh the benefits of the organized data
>> if the cardinality is lower.
>>
>> I imagine other sources might have similar benefit calculations. Doing a
>> particular sort or clustering can provide increased throughput but only in
>> certain situations based on some facts about the data.
>>
>>
>> For a concrete example here.
>>
>> Cassandra can insert records with the same partition-key faster if they
>> arrive in the same payload. But this is only beneficial if the incoming
>> dataset has multiple entries for the same partition key. If the incoming
>> source does not have any duplicates then there is no benefit to requiring a
>> sort or partitioning.
>>
>> On Wed, Mar 28, 2018 at 7:14 PM Patrick Woody 
>> wrote:
>>
>>> Spark would always apply the required clustering and sort order because
 they are required by the data source. It is reasonable for a source to
 reject data that isn’t properly prepared. For example, data must be written
 to HTable files with keys in order or else the files are invalid. Sorting
 should not be implemented in the sources themselves because Spark handles
 concerns like spilling to disk. Spark must prepare data correctly, which is
 why the interfaces start with “Requires”.
>>>
>>>
>>> This was in reference to Russell's suggestion that the data source could
>>> have a required sort, but only a recommended partitioning. I don't have an
>>> immediate recommending use case that would come to mind though. I'm
>>> definitely in sync that the data source itself shouldn't do work outside of
>>> the writes themselves.
>>>
>>>
>>> Considering the second use case you mentioned first, I don’t think it is
 a good idea for a table to put requirements on the number of tasks used for
 a write. The parallelism should be set appropriately for the data volume,
 which is for Spark or the user to determine. A minimum or maximum number of
 tasks could cause bad behavior.
>>>
>>> For your first use case, an explicit global ordering, the problem is
 that there can’t be an explicit global ordering for a table when it is
 populated by a series of independent writes. Each write could have a global
 order, but once those files are written, you have to deal with multiple
 sorted data sets. I think it makes sense to focus on order within data
 files, not order between data files.
>>>
>>>
>>> This is where I'm interested in learning about the separation of
>>> responsibilities for the data source and how "smart" it is supposed to be.
>>>
>>> For the first part, I would assume that given the estimated data size
>>> from Spark and options passed in from the user, the data source could make
>>> a more intelligent requirement on the write format than Spark
>>> independently. Somewhat analogous to how the current FileSource does bin
>>> packing of small files on the read side, restricting parallelism for the
>>> sake of overhead.
>>>
>>> For the second, I wouldn't assume that a data source requiring a certain
>>> write format would give any guarantees around reading the same data? In the
>>> cases where it is a complete overwrite it would, but for independent writes
>>> it could still be useful for statistics or compression.
>>>
>>> Thanks
>>> Pat
>>>
>>>
>>>
>>> On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue  wrote:
>>>
 How would Spark determine whether or not to apply a recommendation - a
 cost threshold?

 Spark would always apply the required clustering and sort order because
 they are required by the data source. It is reasonable for a source to
 reject data that isn’t properly prepared. For example, data must be written
 to HTable files with keys in order or else the files are invalid. Sorting
 should not be implemented in the sources themselves because Spark handles
 concerns like spilling to disk. Spark must prepare data correctly, which is
 why the interfaces start with “Requires”.

 I’m not sure what the second half of your question means. What does
 Spark need to pass into the data source?

 Should a datasource be able to provide a Distribution proper rather
 than 

Re: DataSourceV2 write input requirements

2018-03-28 Thread Ted Yu
bq. this shuffle could outweigh the benefits of the organized data if the
cardinality is lower.

I wonder if you meant higher in place of the last word above.

Cheers

On Wed, Mar 28, 2018 at 7:50 PM, Russell Spitzer 
wrote:

> For added color, one thing that I may want to consider as a data source
> implementer is the cost / benefit of applying a particular clustering. For
> example, a dataset with low cardinality in the clustering key could benefit
> greatly from clustering on that key before writing to Cassandra since
> Cassandra can benefit from these sorts of batching. But the cost of
> performing this shuffle could outweigh the benefits of the organized data
> if the cardinality is lower.
>
> I imagine other sources might have similar benefit calculations. Doing a
> particular sort or clustering can provide increased throughput but only in
> certain situations based on some facts about the data.
>
>
> For a concrete example here.
>
> Cassandra can insert records with the same partition-key faster if they
> arrive in the same payload. But this is only beneficial if the incoming
> dataset has multiple entries for the same partition key. If the incoming
> source does not have any duplicates then there is no benefit to requiring a
> sort or partitioning.
>
> On Wed, Mar 28, 2018 at 7:14 PM Patrick Woody 
> wrote:
>
>> Spark would always apply the required clustering and sort order because
>>> they are required by the data source. It is reasonable for a source to
>>> reject data that isn’t properly prepared. For example, data must be written
>>> to HTable files with keys in order or else the files are invalid. Sorting
>>> should not be implemented in the sources themselves because Spark handles
>>> concerns like spilling to disk. Spark must prepare data correctly, which is
>>> why the interfaces start with “Requires”.
>>
>>
>> This was in reference to Russell's suggestion that the data source could
>> have a required sort, but only a recommended partitioning. I don't have an
>> immediate recommending use case that would come to mind though. I'm
>> definitely in sync that the data source itself shouldn't do work outside of
>> the writes themselves.
>>
>>
>> Considering the second use case you mentioned first, I don’t think it is
>>> a good idea for a table to put requirements on the number of tasks used for
>>> a write. The parallelism should be set appropriately for the data volume,
>>> which is for Spark or the user to determine. A minimum or maximum number of
>>> tasks could cause bad behavior.
>>
>> For your first use case, an explicit global ordering, the problem is that
>>> there can’t be an explicit global ordering for a table when it is populated
>>> by a series of independent writes. Each write could have a global order,
>>> but once those files are written, you have to deal with multiple sorted
>>> data sets. I think it makes sense to focus on order within data files, not
>>> order between data files.
>>
>>
>> This is where I'm interested in learning about the separation of
>> responsibilities for the data source and how "smart" it is supposed to be.
>>
>> For the first part, I would assume that given the estimated data size
>> from Spark and options passed in from the user, the data source could make
>> a more intelligent requirement on the write format than Spark
>> independently. Somewhat analogous to how the current FileSource does bin
>> packing of small files on the read side, restricting parallelism for the
>> sake of overhead.
>>
>> For the second, I wouldn't assume that a data source requiring a certain
>> write format would give any guarantees around reading the same data? In the
>> cases where it is a complete overwrite it would, but for independent writes
>> it could still be useful for statistics or compression.
>>
>> Thanks
>> Pat
>>
>>
>>
>> On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue  wrote:
>>
>>> How would Spark determine whether or not to apply a recommendation - a
>>> cost threshold?
>>>
>>> Spark would always apply the required clustering and sort order because
>>> they are required by the data source. It is reasonable for a source to
>>> reject data that isn’t properly prepared. For example, data must be written
>>> to HTable files with keys in order or else the files are invalid. Sorting
>>> should not be implemented in the sources themselves because Spark handles
>>> concerns like spilling to disk. Spark must prepare data correctly, which is
>>> why the interfaces start with “Requires”.
>>>
>>> I’m not sure what the second half of your question means. What does
>>> Spark need to pass into the data source?
>>>
>>> Should a datasource be able to provide a Distribution proper rather than
>>> just the clustering expressions? Two use cases would be for explicit global
>>> sorting of the dataset and attempting to ensure a minimum write task
>>> size/number of write tasks.
>>>
>>> Considering the second use 

Re: DataSourceV2 write input requirements

2018-03-28 Thread Russell Spitzer
For added color, one thing that I may want to consider as a data source
implementer is the cost / benefit of applying a particular clustering. For
example, a dataset with low cardinality in the clustering key could benefit
greatly from clustering on that key before writing to Cassandra since
Cassandra can benefit from these sorts of batching. But the cost of
performing this shuffle could outweigh the benefits of the organized data
if the cardinality is lower.

I imagine other sources might have similar benefit calculations. Doing a
particular sort or clustering can provide increased throughput but only in
certain situations based on some facts about the data.


For a concrete example here.

Cassandra can insert records with the same partition-key faster if they
arrive in the same payload. But this is only beneficial if the incoming
dataset has multiple entries for the same partition key. If the incoming
source does not have any duplicates then there is no benefit to requiring a
sort or partitioning.

On Wed, Mar 28, 2018 at 7:14 PM Patrick Woody 
wrote:

> Spark would always apply the required clustering and sort order because
>> they are required by the data source. It is reasonable for a source to
>> reject data that isn’t properly prepared. For example, data must be written
>> to HTable files with keys in order or else the files are invalid. Sorting
>> should not be implemented in the sources themselves because Spark handles
>> concerns like spilling to disk. Spark must prepare data correctly, which is
>> why the interfaces start with “Requires”.
>
>
> This was in reference to Russell's suggestion that the data source could
> have a required sort, but only a recommended partitioning. I don't have an
> immediate recommending use case that would come to mind though. I'm
> definitely in sync that the data source itself shouldn't do work outside of
> the writes themselves.
>
>
> Considering the second use case you mentioned first, I don’t think it is a
>> good idea for a table to put requirements on the number of tasks used for a
>> write. The parallelism should be set appropriately for the data volume,
>> which is for Spark or the user to determine. A minimum or maximum number of
>> tasks could cause bad behavior.
>
> For your first use case, an explicit global ordering, the problem is that
>> there can’t be an explicit global ordering for a table when it is populated
>> by a series of independent writes. Each write could have a global order,
>> but once those files are written, you have to deal with multiple sorted
>> data sets. I think it makes sense to focus on order within data files, not
>> order between data files.
>
>
> This is where I'm interested in learning about the separation of
> responsibilities for the data source and how "smart" it is supposed to be.
>
> For the first part, I would assume that given the estimated data size from
> Spark and options passed in from the user, the data source could make a
> more intelligent requirement on the write format than Spark independently.
> Somewhat analogous to how the current FileSource does bin packing of small
> files on the read side, restricting parallelism for the sake of overhead.
>
> For the second, I wouldn't assume that a data source requiring a certain
> write format would give any guarantees around reading the same data? In the
> cases where it is a complete overwrite it would, but for independent writes
> it could still be useful for statistics or compression.
>
> Thanks
> Pat
>
>
>
> On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue  wrote:
>
>> How would Spark determine whether or not to apply a recommendation - a
>> cost threshold?
>>
>> Spark would always apply the required clustering and sort order because
>> they are required by the data source. It is reasonable for a source to
>> reject data that isn’t properly prepared. For example, data must be written
>> to HTable files with keys in order or else the files are invalid. Sorting
>> should not be implemented in the sources themselves because Spark handles
>> concerns like spilling to disk. Spark must prepare data correctly, which is
>> why the interfaces start with “Requires”.
>>
>> I’m not sure what the second half of your question means. What does Spark
>> need to pass into the data source?
>>
>> Should a datasource be able to provide a Distribution proper rather than
>> just the clustering expressions? Two use cases would be for explicit global
>> sorting of the dataset and attempting to ensure a minimum write task
>> size/number of write tasks.
>>
>> Considering the second use case you mentioned first, I don’t think it is
>> a good idea for a table to put requirements on the number of tasks used for
>> a write. The parallelism should be set appropriately for the data volume,
>> which is for Spark or the user to determine. A minimum or maximum number of
>> tasks could cause bad behavior.
>>
>> That said, I think there is a related use case 

Re: DataSourceV2 write input requirements

2018-03-28 Thread Patrick Woody
>
> Spark would always apply the required clustering and sort order because
> they are required by the data source. It is reasonable for a source to
> reject data that isn’t properly prepared. For example, data must be written
> to HTable files with keys in order or else the files are invalid. Sorting
> should not be implemented in the sources themselves because Spark handles
> concerns like spilling to disk. Spark must prepare data correctly, which is
> why the interfaces start with “Requires”.


This was in reference to Russell's suggestion that the data source could
have a required sort, but only a recommended partitioning. I don't have an
immediate recommending use case that would come to mind though. I'm
definitely in sync that the data source itself shouldn't do work outside of
the writes themselves.

Considering the second use case you mentioned first, I don’t think it is a
> good idea for a table to put requirements on the number of tasks used for a
> write. The parallelism should be set appropriately for the data volume,
> which is for Spark or the user to determine. A minimum or maximum number of
> tasks could cause bad behavior.


For your first use case, an explicit global ordering, the problem is that
> there can’t be an explicit global ordering for a table when it is populated
> by a series of independent writes. Each write could have a global order,
> but once those files are written, you have to deal with multiple sorted
> data sets. I think it makes sense to focus on order within data files, not
> order between data files.


This is where I'm interested in learning about the separation of
responsibilities for the data source and how "smart" it is supposed to be.

For the first part, I would assume that given the estimated data size from
Spark and options passed in from the user, the data source could make a
more intelligent requirement on the write format than Spark independently.
Somewhat analogous to how the current FileSource does bin packing of small
files on the read side, restricting parallelism for the sake of overhead.

For the second, I wouldn't assume that a data source requiring a certain
write format would give any guarantees around reading the same data? In the
cases where it is a complete overwrite it would, but for independent writes
it could still be useful for statistics or compression.

Thanks
Pat



On Wed, Mar 28, 2018 at 8:28 PM, Ryan Blue  wrote:

> How would Spark determine whether or not to apply a recommendation - a
> cost threshold?
>
> Spark would always apply the required clustering and sort order because
> they are required by the data source. It is reasonable for a source to
> reject data that isn’t properly prepared. For example, data must be written
> to HTable files with keys in order or else the files are invalid. Sorting
> should not be implemented in the sources themselves because Spark handles
> concerns like spilling to disk. Spark must prepare data correctly, which is
> why the interfaces start with “Requires”.
>
> I’m not sure what the second half of your question means. What does Spark
> need to pass into the data source?
>
> Should a datasource be able to provide a Distribution proper rather than
> just the clustering expressions? Two use cases would be for explicit global
> sorting of the dataset and attempting to ensure a minimum write task
> size/number of write tasks.
>
> Considering the second use case you mentioned first, I don’t think it is a
> good idea for a table to put requirements on the number of tasks used for a
> write. The parallelism should be set appropriately for the data volume,
> which is for Spark or the user to determine. A minimum or maximum number of
> tasks could cause bad behavior.
>
> That said, I think there is a related use case for sharding. But that’s
> really just a clustering by an expression with the shard calculation, e.g., 
> hash(id_col,
> 64). The shards should be handled as a cluster, but it doesn’t matter how
> many tasks are used for it.
>
> For your first use case, an explicit global ordering, the problem is that
> there can’t be an explicit global ordering for a table when it is populated
> by a series of independent writes. Each write could have a global order,
> but once those files are written, you have to deal with multiple sorted
> data sets. I think it makes sense to focus on order within data files, not
> order between data files.
> ​
>
> On Wed, Mar 28, 2018 at 7:26 AM, Patrick Woody 
> wrote:
>
>> How would Spark determine whether or not to apply a recommendation - a
>> cost threshold? And yes, it would be good to flesh out what information we
>> get from Spark in the datasource when providing these
>> recommendations/requirements - I could see statistics and the existing
>> outputPartitioning/Ordering of the child plan being used for providing the
>> requirement.
>>
>> Should a datasource be able to provide a Distribution proper rather than
>> just 

Re: DataSourceV2 write input requirements

2018-03-28 Thread Ryan Blue
How would Spark determine whether or not to apply a recommendation - a cost
threshold?

Spark would always apply the required clustering and sort order because
they are required by the data source. It is reasonable for a source to
reject data that isn’t properly prepared. For example, data must be written
to HTable files with keys in order or else the files are invalid. Sorting
should not be implemented in the sources themselves because Spark handles
concerns like spilling to disk. Spark must prepare data correctly, which is
why the interfaces start with “Requires”.

I’m not sure what the second half of your question means. What does Spark
need to pass into the data source?

Should a datasource be able to provide a Distribution proper rather than
just the clustering expressions? Two use cases would be for explicit global
sorting of the dataset and attempting to ensure a minimum write task
size/number of write tasks.

Considering the second use case you mentioned first, I don’t think it is a
good idea for a table to put requirements on the number of tasks used for a
write. The parallelism should be set appropriately for the data volume,
which is for Spark or the user to determine. A minimum or maximum number of
tasks could cause bad behavior.

That said, I think there is a related use case for sharding. But that’s
really just a clustering by an expression with the shard calculation,
e.g., hash(id_col,
64). The shards should be handled as a cluster, but it doesn’t matter how
many tasks are used for it.

For your first use case, an explicit global ordering, the problem is that
there can’t be an explicit global ordering for a table when it is populated
by a series of independent writes. Each write could have a global order,
but once those files are written, you have to deal with multiple sorted
data sets. I think it makes sense to focus on order within data files, not
order between data files.
​

On Wed, Mar 28, 2018 at 7:26 AM, Patrick Woody 
wrote:

> How would Spark determine whether or not to apply a recommendation - a
> cost threshold? And yes, it would be good to flesh out what information we
> get from Spark in the datasource when providing these
> recommendations/requirements - I could see statistics and the existing
> outputPartitioning/Ordering of the child plan being used for providing the
> requirement.
>
> Should a datasource be able to provide a Distribution proper rather than
> just the clustering expressions? Two use cases would be for explicit global
> sorting of the dataset and attempting to ensure a minimum write task
> size/number of write tasks.
>
>
>
> On Tue, Mar 27, 2018 at 7:59 PM, Russell Spitzer <
> russell.spit...@gmail.com> wrote:
>
>> Thanks for the clarification, definitely would want to require Sort but
>> only recommend partitioning ...  I think that would be useful to request
>> based on details about the incoming dataset.
>>
>> On Tue, Mar 27, 2018 at 4:55 PM Ryan Blue  wrote:
>>
>>> A required clustering would not, but a required sort would. Clustering
>>> is asking for the input dataframe's partitioning, and sorting would be how
>>> each partition is sorted.
>>>
>>> On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer <
>>> russell.spit...@gmail.com> wrote:
>>>
 I forgot since it's been a while, but does Clustering support allow
 requesting that partitions contain elements in order as well? That would be
 a useful trick for me. IE
 Request/Require(SortedOn(Col1))
 Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2))

 On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue 
 wrote:

> Thanks, it makes sense that the existing interface is for aggregation
> and not joins. Why are there requirements for the number of partitions 
> that
> are returned then?
>
> Does it makes sense to design the write-side `Requirement` classes and
> the read-side reporting separately?
>
> On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan 
> wrote:
>
>> Hi Ryan, yea you are right that SupportsReportPartitioning doesn't
>> expose hash function, so Join can't benefit from this interface, as Join
>> doesn't require a general ClusteredDistribution, but a more specific one
>> called HashClusteredDistribution.
>>
>> So currently only Aggregate can benefit from
>> SupportsReportPartitioning and save shuffle. We can add a new interface 
>> to
>> expose the hash function to make it work for Join.
>>
>> On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue  wrote:
>>
>>> I just took a look at SupportsReportPartitioning and I'm not sure
>>> that it will work for real use cases. It doesn't specify, as far as I 
>>> can
>>> tell, a hash function for combining clusters into tasks or a way to 
>>> provide
>>> Spark a hash function for the other side of a join. It seems 

Re: Toward an "API" for spark images used by the Kubernetes back-end

2018-03-28 Thread Kimoon Kim
Thanks for starting this discussion.

When I was troubleshooting Spark on K8s, I often faced a need to turn on
debug messages on the driver and executor pods of my jobs, which would be
possible if I somehow put the right log4j.properties file inside the pods.
I know I can build custom Docker images, but that seems like too much. (So
being lazy, I usually just gave up)

If there is an alternative mechanism, like using a ConfigMap, I would
prefer that for this log4j need. Maybe we should document what are possible
alternatives to building Docker images for certain use cases and guide
people toward the right mechanisms?

Thanks,
Kimoon


Thanks,
Kimoon

On Wed, Mar 21, 2018 at 10:54 PM, Felix Cheung 
wrote:

> I like being able to customize the docker image itself - but I realize
> this thread is more about “API” for the stock image.
>
> Environment is nice. Probably we need a way to set custom spark config (as
> a file??)
>
>
> --
> *From:* Holden Karau 
> *Sent:* Wednesday, March 21, 2018 10:44:20 PM
> *To:* Erik Erlandson
> *Cc:* dev
> *Subject:* Re: Toward an "API" for spark images used by the Kubernetes
> back-end
>
> I’m glad this discussion is happening on dev@ :)
>
> Personally I like customizing with shell env variables during rolling my
> own image, but definitely documentation the expectations/usage of the
> variables is needed before we can really call it an API.
>
> On the related question I suspect two of the more “common” likely
> customizations is adding additional jars for bootstrapping fetching from a
> DFS & also similarity complicated Python dependencies (although given the
> Pythons support isn’t merged yet it’s hard to say what exactly this would
> look like).
>
> I could also see some vendors wanting to add some bootstrap/setup scripts
> to fetch keys or other things.
>
> What other ways do folks foresee customizing their Spark docker
> containers?
>
> On Wed, Mar 21, 2018 at 5:04 PM Erik Erlandson 
> wrote:
>
>> During the review of the recent PR to remove use of the init_container
>> from kube pods as created by the Kubernetes back-end, the topic of
>> documenting the "API" for these container images also came up. What
>> information does the back-end provide to these containers? In what form?
>> What assumptions does the back-end make about the structure of these
>> containers?  This information is important in a scenario where a user wants
>> to create custom images, particularly if these are not based on the
>> reference dockerfiles.
>>
>> A related topic is deciding what such an API should look like.  For
>> example, early incarnations were based more purely on environment
>> variables, which could have advantages in terms of an API that is easy to
>> describe in a document.  If we document the current API, should we annotate
>> it as Experimental?  If not, does that effectively freeze the API?
>>
>> We are interested in community input about possible customization use
>> cases and opinions on possible API designs!
>> Cheers,
>> Erik
>>
> --
> Twitter: https://twitter.com/holdenkarau
>


Re: [Spark R] Proposal: Exposing RBackend in RRunner

2018-03-28 Thread Reynold Xin
If you need the functionality I would recommend you just copying the code
over to your project and use it that way.

On Wed, Mar 28, 2018 at 9:02 AM Felix Cheung 
wrote:

> I think the difference is py4j is a public library whereas the R backend
> is specific to SparkR.
>
> Can you elaborate what you need JVMObjectTracker for? We have provided R
> convenient APIs to call into JVM: sparkR.callJMethod for example
>
> _
> From: Jeremy Liu 
> Sent: Tuesday, March 27, 2018 12:20 PM
> Subject: Re: [Spark R] Proposal: Exposing RBackend in RRunner
> To: 
>
>
>
> Spark Dev,
>
> On second thought, the below topic seems more appropriate for spark-dev
> rather than spark-users:
>
> Spark Users,
>>
>> In SparkR, RBackend is created in RRunner.main(). This in particular
>> makes it difficult to control or use the RBackend. For my use case, I am
>> looking to access the JVMObjectTracker that RBackend maintains for SparkR
>> dataframes.
>>
>> Analogously, pyspark starts a py4j.GatewayServer in PythonRunner.main().
>> It's then possible to start a ClientServer that then has access to the
>> object bindings between Python/Java.
>>
>> Is there something similar for SparkR? Or a reasonable way to expose
>> RBackend?
>>
>> Thanks!
>>
> --
> -
> Jeremy Liu
> jeremy.jl@gmail.com
>
>
>


Re: [Spark R] Proposal: Exposing RBackend in RRunner

2018-03-28 Thread Felix Cheung
I think the difference is py4j is a public library whereas the R backend is 
specific to SparkR.

Can you elaborate what you need JVMObjectTracker for? We have provided R 
convenient APIs to call into JVM: sparkR.callJMethod for example

_
From: Jeremy Liu 
Sent: Tuesday, March 27, 2018 12:20 PM
Subject: Re: [Spark R] Proposal: Exposing RBackend in RRunner
To: 


Spark Dev,

On second thought, the below topic seems more appropriate for spark-dev rather 
than spark-users:

Spark Users,

In SparkR, RBackend is created in RRunner.main(). This in particular makes it 
difficult to control or use the RBackend. For my use case, I am looking to 
access the JVMObjectTracker that RBackend maintains for SparkR dataframes.

Analogously, pyspark starts a py4j.GatewayServer in PythonRunner.main(). It's 
then possible to start a ClientServer that then has access to the object 
bindings between Python/Java.

Is there something similar for SparkR? Or a reasonable way to expose RBackend?

Thanks!
--
-
Jeremy Liu
jeremy.jl@gmail.com




Re: Build issues with apache-spark-on-k8s.

2018-03-28 Thread Anirudh Ramanathan
As Lucas said, those directories are generated and copied when you run a
full maven build with the -Pkubernetes flag specified (or use instructions
in
https://spark.apache.org/docs/latest/building-spark.html#building-a-runnable-distribution
).

Also, using the Kubernetes integration in the  main Apache Spark project is
recommended. The fork https://github.com/apache-spark-on-k8s/spark/ will be
retired once we finish upstreaming all those features in Spark 2.4.


On Wed, Mar 28, 2018, 6:42 AM Lucas Kacher  wrote:

> Are you building on the fork or on the official release now? I built
> v2.3.0 from source w/out issue. One thing I noticed is that I needed to run
> the build-image command from the bin which was placed in dist/ as opposed
> to the one in the repo (as that's how it copies the necessary targets).
>
> (Failed to reply-all to the list).
>
> On Wed, Mar 28, 2018 at 4:30 AM, Atul Sowani  wrote:
>
>> Hi,
>>
>> I built apache-spark-on-k8s from source on Ubuntu 16.04 and it got built
>> without errors. Next, I wanted to create docker images, so as explained at
>> https://apache-spark-on-k8s.github.io/userdocs/running-on-kubernetes.html
>>  I used sbin/build-push-docker-images.sh to create those. While using
>> this script I came across 2 issues:
>>
>> 1. It references "dockerfiles" directory which should be in "spark",
>> however this directory is missing. I created "dockerfiles" directory and
>> copied Dockerfiles from resource-managers/kubernetes/docker-minimal-bundle
>>
>> 2, spark-base dockerfile expects to have some JAR files present in a
>> directory called "jars" - this directory is missing. I tried rebuilding the
>> code but this directory is not getting generated if it is supposed to be.
>>
>> My doubt is, if this is a genuine/known issue or am I missing out some
>> build steps?
>>
>> Thanks,
>> Atul.
>>
>>
>
>
> --
>
> *Lucas Kacher*Senior Engineer
> -
> vsco.co 
> New York, NY
> 818.512.5239
>


Re: [Kubernetes] structured-streaming driver restarts / roadmap

2018-03-28 Thread Anirudh Ramanathan
We discussed this early on in our fork and I think we should have this in a
JIRA and discuss it further. It's something we want to address in the
future.

One proposed method is using a StatefulSet of size 1 for the driver. This
ensures recovery but at the same time takes away from the completion
semantics of a single pod.


See history in https://github.com/apache-spark-on-k8s/spark/issues/288

On Wed, Mar 28, 2018, 6:56 AM Lucas Kacher  wrote:

> A carry-over from the apache-spark-on-k8s project, it would be useful to
> have a configurable restart policy for submitted jobs with the Kubernetes
> resource manager. See the following issues:
>
> https://github.com/apache-spark-on-k8s/spark/issues/133
> https://github.com/apache-spark-on-k8s/spark/issues/288
> https://github.com/apache-spark-on-k8s/spark/issues/546
>
> Use case: I have a structured streaming job that reads from Kafka,
> aggregates, and writes back out to Kafka deployed via k8s and checkpointing
> to a remote location. If the driver pod dies for a any number of reasons,
> it will not restart.
>
> For us, as all data is stored via checkpoint and we are satisfied with
> at-least-once semantics, it would be useful if the driver were to come back
> on it's own and pick back up.
>
> Firstly, may we add this to JIRA? Secondly, Is there any insight as to
> what the thought is around allowing that to be configurable in the future?
> If it's not something that will happen natively, we will end up needing to
> write something that polls or listens to k8s and has the ability to
> re-submit any failed jobs.
>
> Thanks!
>
> --
>
> *Lucas Kacher*Senior Engineer
> -
> vsco.co 
> New York, NY
> 818.512.5239
>


Re: DataSourceV2 write input requirements

2018-03-28 Thread Patrick Woody
How would Spark determine whether or not to apply a recommendation - a cost
threshold? And yes, it would be good to flesh out what information we get
from Spark in the datasource when providing these
recommendations/requirements - I could see statistics and the existing
outputPartitioning/Ordering of the child plan being used for providing the
requirement.

Should a datasource be able to provide a Distribution proper rather than
just the clustering expressions? Two use cases would be for explicit global
sorting of the dataset and attempting to ensure a minimum write task
size/number of write tasks.



On Tue, Mar 27, 2018 at 7:59 PM, Russell Spitzer 
wrote:

> Thanks for the clarification, definitely would want to require Sort but
> only recommend partitioning ...  I think that would be useful to request
> based on details about the incoming dataset.
>
> On Tue, Mar 27, 2018 at 4:55 PM Ryan Blue  wrote:
>
>> A required clustering would not, but a required sort would. Clustering is
>> asking for the input dataframe's partitioning, and sorting would be how
>> each partition is sorted.
>>
>> On Tue, Mar 27, 2018 at 4:53 PM, Russell Spitzer <
>> russell.spit...@gmail.com> wrote:
>>
>>> I forgot since it's been a while, but does Clustering support allow
>>> requesting that partitions contain elements in order as well? That would be
>>> a useful trick for me. IE
>>> Request/Require(SortedOn(Col1))
>>> Partition 1 -> ((A,1), (A, 2), (B,1) , (B,2) , (C,1) , (C,2))
>>>
>>> On Tue, Mar 27, 2018 at 4:38 PM Ryan Blue 
>>> wrote:
>>>
 Thanks, it makes sense that the existing interface is for aggregation
 and not joins. Why are there requirements for the number of partitions that
 are returned then?

 Does it makes sense to design the write-side `Requirement` classes and
 the read-side reporting separately?

 On Tue, Mar 27, 2018 at 3:56 PM, Wenchen Fan 
 wrote:

> Hi Ryan, yea you are right that SupportsReportPartitioning doesn't
> expose hash function, so Join can't benefit from this interface, as Join
> doesn't require a general ClusteredDistribution, but a more specific one
> called HashClusteredDistribution.
>
> So currently only Aggregate can benefit from
> SupportsReportPartitioning and save shuffle. We can add a new interface to
> expose the hash function to make it work for Join.
>
> On Tue, Mar 27, 2018 at 9:33 AM, Ryan Blue  wrote:
>
>> I just took a look at SupportsReportPartitioning and I'm not sure
>> that it will work for real use cases. It doesn't specify, as far as I can
>> tell, a hash function for combining clusters into tasks or a way to 
>> provide
>> Spark a hash function for the other side of a join. It seems unlikely to 
>> me
>> that many data sources would have partitioning that happens to match the
>> other side of a join. And, it looks like task order matters? Maybe I'm
>> missing something?
>>
>> I think that we should design the write side independently based on
>> what data stores actually need, and take a look at the read side based on
>> what data stores can actually provide. Wenchen, was there a design doc 
>> for
>> partitioning on the read path?
>>
>> I completely agree with your point about a global sort. We recommend
>> to all of our data engineers to add a sort to most tables because it
>> introduces the range partitioner and does a skew calculation, in addition
>> to making data filtering much better when it is read. It's really common
>> for tables to be skewed by partition values.
>>
>> rb
>>
>> On Mon, Mar 26, 2018 at 7:59 PM, Patrick Woody <
>> patrick.woo...@gmail.com> wrote:
>>
>>> Hey Ryan, Ted, Wenchen
>>>
>>> Thanks for the quick replies.
>>>
>>> @Ryan - the sorting portion makes sense, but I think we'd have to
>>> ensure something similar to requiredChildDistribution in SparkPlan 
>>> where we
>>> have the number of partitions as well if we'd want to further report to
>>> SupportsReportPartitioning, yeah?
>>>
>>> Specifying an explicit global sort can also be useful for filtering
>>> purposes on Parquet row group stats if we have a time based/high
>>> cardinality ID field. If my datasource or catalog knows about previous
>>> queries on a table, it could be really useful to recommend more 
>>> appropriate
>>> formatting for consumers on the next materialization. The same would be
>>> true of clustering on commonly joined fields.
>>>
>>> Thanks again
>>> Pat
>>>
>>>
>>>
>>> On Mon, Mar 26, 2018 at 10:05 PM, Ted Yu 
>>> wrote:
>>>
 Hmm. Ryan seems to be right.

 Looking at sql/core/src/main/java/org/apache/spark/sql/sources/v2/

[Kubernetes] structured-streaming driver restarts / roadmap

2018-03-28 Thread Lucas Kacher
A carry-over from the apache-spark-on-k8s project, it would be useful to
have a configurable restart policy for submitted jobs with the Kubernetes
resource manager. See the following issues:

https://github.com/apache-spark-on-k8s/spark/issues/133
https://github.com/apache-spark-on-k8s/spark/issues/288
https://github.com/apache-spark-on-k8s/spark/issues/546

Use case: I have a structured streaming job that reads from Kafka,
aggregates, and writes back out to Kafka deployed via k8s and checkpointing
to a remote location. If the driver pod dies for a any number of reasons,
it will not restart.

For us, as all data is stored via checkpoint and we are satisfied with
at-least-once semantics, it would be useful if the driver were to come back
on it's own and pick back up.

Firstly, may we add this to JIRA? Secondly, Is there any insight as to what
the thought is around allowing that to be configurable in the future? If
it's not something that will happen natively, we will end up needing to
write something that polls or listens to k8s and has the ability to
re-submit any failed jobs.

Thanks!

-- 

*Lucas Kacher*Senior Engineer
-
vsco.co 
New York, NY
818.512.5239


Re: Build issues with apache-spark-on-k8s.

2018-03-28 Thread Lucas Kacher
Are you building on the fork or on the official release now? I built v2.3.0
from source w/out issue. One thing I noticed is that I needed to run the
build-image command from the bin which was placed in dist/ as opposed to
the one in the repo (as that's how it copies the necessary targets).

(Failed to reply-all to the list).

On Wed, Mar 28, 2018 at 4:30 AM, Atul Sowani  wrote:

> Hi,
>
> I built apache-spark-on-k8s from source on Ubuntu 16.04 and it got built
> without errors. Next, I wanted to create docker images, so as explained at
> https://apache-spark-on-k8s.github.io/userdocs/running-on-kubernetes.html I
> used sbin/build-push-docker-images.sh to create those. While using this
> script I came across 2 issues:
>
> 1. It references "dockerfiles" directory which should be in "spark",
> however this directory is missing. I created "dockerfiles" directory and
> copied Dockerfiles from resource-managers/kubernetes/docker-minimal-bundle
>
> 2, spark-base dockerfile expects to have some JAR files present in a
> directory called "jars" - this directory is missing. I tried rebuilding the
> code but this directory is not getting generated if it is supposed to be.
>
> My doubt is, if this is a genuine/known issue or am I missing out some
> build steps?
>
> Thanks,
> Atul.
>
>


-- 

*Lucas Kacher*Senior Engineer
-
vsco.co 
New York, NY
818.512.5239


Build issues with apache-spark-on-k8s.

2018-03-28 Thread Atul Sowani
 Hi,

I built apache-spark-on-k8s from source on Ubuntu 16.04 and it got built
without errors. Next, I wanted to create docker images, so as explained at
https://apache-spark-on-k8s.github.io/userdocs/running-on-kubernetes.html I
used sbin/build-push-docker-images.sh to create those. While using this
script I came across 2 issues:

1. It references "dockerfiles" directory which should be in "spark",
however this directory is missing. I created "dockerfiles" directory and
copied Dockerfiles from resource-managers/kubernetes/docker-minimal-bundle

2, spark-base dockerfile expects to have some JAR files present in a
directory called "jars" - this directory is missing. I tried rebuilding the
code but this directory is not getting generated if it is supposed to be.

My doubt is, if this is a genuine/known issue or am I missing out some
build steps?

Thanks,
Atul.