Resolving generated expressions in catalyst

2019-04-03 Thread Nikolas Vanderhoof
Hey everyone!

I'm trying to implement a custom catalyst optimization that I think may be
useful to others that make frequent use of the arrays_overlap and
array_contains functions in joins.



Consider this first query joining on overlapping arrays.
```
import org.apache.spark.sql.functions._

val left = Seq((Seq(1, 2, 3, 4), "hi")).toDF("arr", "word")
val right = Seq((Seq(2, 5), "bye")).toDF("arr", "word")

// This results in a cartesian product in the physical plan if the tables
are sufficiently large
val naiveJoin = left.join(right, arrays_overlap(left("arr"), right("arr")))
```

We can transform it into one like this that

// This will result in a non-cartesian product join
val fastJoin = {
  left.withColumn("explode_larr", explode(left("arr"))).as("__lp").join(
right.withColumn("explode_rarr", explode(col("arr"))).as("__rp"),
col("explode_larr") === col("explode_rarr")
  ).drop("explode_larr", "explode_rarr").distinct
}



I've implemented a first attempt of this optimization on my fork:
but I'm having difficulty figuring out how to resolve my attributes
on the exploded column.
https://github.com/nvander1/spark/commit/711184f98774b7ac46fcfdf4e28e2d71041d89e1

Examining the logical tree of fastJoin:
00 Deduplicate [arr#617, arr#626, word#618, word#627] 01 +- Project
[arr#617, word#618, arr#626, word#627] 02 +- Join Inner, (explode_larr#643
= explode_rarr#648) 03 :- SubqueryAlias `__lp` 04 : +- Project [arr#617,
word#618, explode_larr#643] 05 : +- Generate explode(arr#617), false,
[explode_larr#643] 06 : +- Project [_1#614 AS arr#617, _2#615 AS word#618]
07 : +- LocalRelation [_1#614, _2#615] 08 +- SubqueryAlias `__rp` 09 +-
Project [arr#626, word#627, explode_rarr#648] 10 +- Generate
explode(arr#626), false, [explode_rarr#648] 11 +- Project [_1#623 AS
arr#626, _2#624 AS word#627] 12 +- LocalRelation [_1#623, _2#624]



This is the logical tree of my implementation thus far:
'Deduplicate +- 'Project [arr#2143, word#2144, arr#2152, word#2153] +-
'Join Inner, ('explode_larr = 'explode_rarr) :- 'SubqueryAlias `__lp` : +-
'Project [arr#2143, word#2144, 'explode_larr] : +- 'Generate
explode(arr#2143), false, explode_larr : +- LocalRelation [arr#2143,
word#2144] +- 'SubqueryAlias `__rp` +- 'Project [arr#2152, word#2153,
'explode_rarr] +- 'Generate explode(arr#2152), false, explode_rarr +-
LocalRelation [arr#2152, word#2153]



Related information (similar cases):
https://issues.apache.org/jira/projects/SPARK/issues/SPARK-27359?filter=addedrecently


Re: [DISCUSS] Spark Columnar Processing

2019-04-03 Thread Bobby Evans
I am still working on the SPIP and should get it up in the next few days.
I have the basic text more or less ready, but I want to get a high-level
API concept ready too just to have something more concrete.  I have not
really done much with contributing new features to spark so I am not sure
where a design document really fits in here because from
http://spark.apache.org/improvement-proposals.html and
http://spark.apache.org/contributing.html it does not mention a design
anywhere.  I am happy to put one up, but I was hoping the API concept would
cover most of that.

Thanks,

Bobby

On Tue, Apr 2, 2019 at 9:16 PM Renjie Liu  wrote:

> Hi, Bobby:
> Do you have design doc? I'm also interested in this topic and want to help
> contribute.
>
> On Tue, Apr 2, 2019 at 10:00 PM Bobby Evans  wrote:
>
>> Thanks to everyone for the feedback.
>>
>> Overall the feedback has been really positive for exposing columnar as a
>> processing option to users.  I'll write up a SPIP on the proposed changes
>> to support columnar processing (not necessarily implement it) and then ping
>> the list again for more feedback and discussion.
>>
>> Thanks again,
>>
>> Bobby
>>
>> On Mon, Apr 1, 2019 at 5:09 PM Reynold Xin  wrote:
>>
>>> I just realized I didn't make it very clear my stance here ... here's
>>> another try:
>>>
>>> I think it's a no brainer to have a good columnar UDF interface. This
>>> would facilitate a lot of high performance applications, e.g. GPU-based
>>> accelerations for machine learning algorithms.
>>>
>>> On rewriting the entire internals of Spark SQL to leverage columnar
>>> processing, I don't see enough evidence to suggest that's a good idea yet.
>>>
>>>
>>>
>>>
>>> On Wed, Mar 27, 2019 at 8:10 AM, Bobby Evans  wrote:
>>>
 Kazuaki Ishizaki,

 Yes, ColumnarBatchScan does provide a framework for doing code
 generation for the processing of columnar data.  I have to admit that I
 don't have a deep understanding of the code generation piece, so if I get
 something wrong please correct me.  From what I had seen only input formats
 currently inherent from ColumnarBatchScan, and from comments in the trait

   /**
* Generate [[ColumnVector]] expressions for our parent to consume as
 rows.
* This is called once per [[ColumnarBatch]].
*/

 https://github.com/apache/spark/blob/956b52b1670985a67e49b938ac1499ae65c79f6e/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala#L42-L43

 It appears that ColumnarBatchScan is really only intended to pull out
 the data from the batch, and not to process that data in a columnar
 fashion.  The Loading stage that you mentioned.

 > The SIMDzation or GPUization capability depends on a compiler that
 translates native code from the code generated by the whole-stage codegen.
 To be able to support vectorized processing Hive stayed with pure java
 and let the JVM detect and do the SIMDzation of the code.  To make that
 happen they created loops to go through each element in a column and remove
 all conditionals from the body of the loops.  To the best of my knowledge
 that would still require a separate code path like I am proposing to make
 the different processing phases generate code that the JVM can compile down
 to SIMD instructions.  The generated code is full of null checks for each
 element which would prevent the operations we want.  Also, the intermediate
 results are often stored in UnsafeRow instances.  This is really fast for
 row-based processing, but the complexity of how they work I believe would
 prevent the JVM from being able to vectorize the processing.  If you have a
 better way to take java code and vectorize it we should put it into OpenJDK
 instead of spark so everyone can benefit from it.

 Trying to compile directly from generated java code to something a GPU
 can process is something we are tackling but we decided to go a different
 route from what you proposed.  From talking with several compiler experts
 here at NVIDIA my understanding is that IBM in partnership with NVIDIA
 attempted in the past to extend the JVM to run at least partially on GPUs,
 but it was really difficult to get right, especially with how java does
 memory management and memory layout.

 To avoid that complexity we decided to split the JITing up into two
 separate pieces.  I didn't mention any of this before because this
 discussion was intended to just be around the memory layout support, and
 not GPU processing.  The first part would be to take the Catalyst AST and
 produce CUDA code directly from it.  If properly done we should be able to
 do the selection and projection phases within a single kernel.  The biggest
 issue comes with UDFs as they cannot easily be vectorized for the CPU or
 GPU.  So to deal with that we have a prototype written 

Re: Closing a SparkSession stops the SparkContext

2019-04-03 Thread Ryan Blue
For #1, do we agree on the behavior? I think that closing a SparkSession
should not close the SparkContext unless it is the only session. Evidently,
that's not what happens and I consider the current the current behavior a
bug.

For more context, we're working on the new catalog APIs and how to
guarantee consistent operations. Self-joining a table, for example, should
use the same version of the table for both scans, and that state should be
specific to a session, not global. These plans assume that SparkSession
represents a session of interactions, along with a reasonable life-cycle.
If that life-cycle includes closing all sessions when you close any
session, then we can't really use sessions for this.

rb

On Wed, Apr 3, 2019 at 9:35 AM Vinoo Ganesh  wrote:

> Yeah, so I think there are 2 separate issues here:
>
>
>
>1. The coupling of the SparkSession + SparkContext in their current
>form seem unnatural
>2. The current memory leak, which I do believe is a case where the
>session is added onto the spark context, but is only needed by the session
>(but would appreciate a sanity check here). Meaning, it may make sense to
>investigate an API change.
>
>
>
> Thoughts?
>
>
>
> On 4/2/19, 15:13, "Sean Owen"  wrote:
>
> > @Sean – To the point that Ryan made, it feels wrong that stopping a
> session force stops the global context. Building in the logic to only stop
> the context when the last session is stopped also feels like a solution,
> but the best way I can think about doing this involves storing the global
> list of every available SparkSession, which may be difficult.
>
>
>
> I tend to agree it would be more natural for the SparkSession to have
>
> its own lifecycle 'stop' method that only stops/releases its own
>
> resources. But is that the source of the problem here? if the state
>
> you're trying to free is needed by the SparkContext, it won't help. If
>
> it happens to be in the SparkContext but is state only needed by one
>
> SparkSession and that there isn't any way to clean up now, that's a
>
> compelling reason to change the API.  Is that the situation? The only
>
> downside is making the user separately stop the SparkContext then.
>
>
>
> *From: *Vinoo Ganesh 
> *Date: *Tuesday, April 2, 2019 at 13:24
> *To: *Arun Mahadevan , Ryan Blue 
> *Cc: *Sean Owen , "dev@spark.apache.org" <
> dev@spark.apache.org>
> *Subject: *Re: Closing a SparkSession stops the SparkContext
>
>
>
> // Merging threads
>
>
>
> Thanks everyone for your thoughts. I’m very much in sync with Ryan here.
>
>
>
> @Sean – To the point that Ryan made, it feels wrong that stopping a
> session force stops the global context. Building in the logic to only stop
> the context when the last session is stopped also feels like a solution,
> but the best way I can think about doing this involves storing the global
> list of every available SparkSession, which may be difficult.
>
>
>
> @Arun – If the intention is not to be able to clear and create new
> sessions, then what specific is the intended use case of Sessions? 
> https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html
> [databricks.com]
> 
> describes SparkSessions as time bounded interactions which implies that old
> ones should be clear-able an news ones create-able in lockstep without
> adverse effect?
>
>
>
> *From: *Arun Mahadevan 
> *Date: *Tuesday, April 2, 2019 at 12:31
> *To: *Ryan Blue 
> *Cc: *Vinoo Ganesh , Sean Owen , "
> dev@spark.apache.org" 
> *Subject: *Re: Closing a SparkSession stops the SparkContext
>
>
>
> I am not sure how would it cause a leak though. When a spark session or
> the underlying context is stopped it should clean up everything. The
> getOrCreate is supposed to return the active thread local or the global
> session. May be if you keep creating new sessions after explicitly clearing
> the default and the local sessions and keep leaking the sessions it could
> happen, but I don't think Sessions are intended to be used that way.
>
>
>
> On Tue, 2 Apr 2019 at 08:45, Ryan Blue  wrote:
>
> I think Vinoo is right about the intended behavior. If we support multiple
> sessions in one context, then stopping any one session shouldn't stop the
> shared context. The last session to be stopped should stop the context, but
> not any before that. We don't typically run multiple sessions in the same
> context so we haven't hit this, but it sounds reasonable.
>
>
>
> On 4/2/19, 11:44, "Sean Owen"  wrote:
>
>
>
> Yeah there's one global default session, but it's possible to create
>
> others and set them as the thread's active session, 

CfP VHPC19: HPC Virtualization-Containers: Paper due May 1, 2019 (extended)

2019-04-03 Thread VHPC 19

CALL FOR PAPERS


14th Workshop on Virtualization in High­-Performance Cloud Computing
(VHPC '19) held in conjunction with the International Supercomputing
Conference - High Performance, June 16-20, 2019, Frankfurt, Germany.
(Springer LNCS Proceedings)



Date: June 20, 2019
Workshop URL: http://vhpc.org

Paper Submission Deadline: May 1, 2019 (extended)
Springer LNCS, rolling abstract submission

Abstract/Paper Submission Link: https://edas.info/newPaper.php?c=25685


Call for Papers

Containers and virtualization technologies constitute key enabling
factors for flexible resource management in modern data centers, and
particularly in cloud environments.  Cloud providers need to manage
complex infrastructures in a seamless fashion to support the highly
dynamic and heterogeneous workloads and hosted applications customers
deploy. Similarly, HPC  environments have been increasingly adopting
techniques that enable flexible management of vast computing and
networking resources, close to marginal provisioning cost, which is
unprecedented in the history of scientific and commercial computing.

Various virtualization-containerization technologies contribute to the
overall picture in different ways: machine virtualization, with its
capability to enable consolidation of multiple under­utilized servers
with heterogeneous software and operating systems (OSes), and its
capability to live­-migrate a fully operating virtual machine (VM)
with a very short downtime, enables novel and dynamic ways to manage
physical servers; OS-­level virtualization (i.e., containerization),
with its capability to isolate multiple user­-space environments and
to allow for their co­existence within the same OS kernel, promises to
provide many of the advantages of machine virtualization with high
levels of responsiveness and performance; lastly, unikernels provide
for many virtualization benefits with a minimized OS/library surface.
I/O Virtualization in turn allows physical network interfaces to take
traffic from multiple VMs or containers; network virtualization, with
its capability to create logical network overlays that are independent
of the underlying physical topology is furthermore enabling
virtualization of HPC infrastructures.

Publication

Accepted papers will be published in a Springer LNCS proceedings volume.


Topics of Interest

The VHPC program committee solicits original, high-quality submissions
related to virtualization across the entire software stack with a
special focus on the intersection of HPC, containers-virtualization
and the cloud.

Major Topics:
- HPC on Containers and VMs
- Containerized applications with OS-level virtualization
- Lightweight applications with Unikernels
- HP-as-a-Service

each major topic encompassing design/architecture, management,
performance management, modeling and configuration/tooling:

Design / Architecture:
- Containers and OS-level virtualization (LXC, Docker, rkt,
  Singularity, Shifter, i.a.)
- Hypervisor support for heterogeneous resources (GPUs, co-processors,
  FPGAs, etc.)
- Hypervisor extensions to mitigate side-channel attacks
  ([micro-]architectural timing attacks, privilege escalation)
- VM & Container trust and security models
- Multi-environment coupling, system software supporting in-situ
  analysis with HPC simulation
- Cloud reliability, fault-tolerance and high-availability
- Energy-efficient and power-aware virtualization
- Containers inside VMs with hypervisor isolation
- Virtualization support for emerging memory technologies
- Lightweight/specialized operating systems in conjunction with
  virtual machines
- Hypervisor support for heterogeneous resources (GPUs, co-processors,
  FPGAs, etc.)
- Novel unikernels and use cases for virtualized HPC environments
- ARM-based hypervisors, ARM virtualization extensions

Management:
- Container and VM management for HPC and cloud environments
- HPC services integration, services to support HPC
- Service and on-demand scheduling & resource management
- Dedicated workload management with VMs or containers
- Workflow coupling with VMs and containers
- Unikernel, lightweight VM application management
- Environments and tools for operating containerized environments
  (batch, orchestration)
- Novel models for non-HPC workload provisioning on HPC resources

Performance Measurements and Modeling:
- Performance improvements for or driven by unikernels
- Optimizations of virtual machine monitor platforms and hypervisors
- Scalability analysis of VMs and/or containers at large scale
- Performance measurement, modeling and monitoring of
  virtualized/cloud workloads
- Virtualization in supercomputing environments, HPC clusters, HPC in
  the cloud

Configuration / Tooling:
- Tool support for unikernels: configuration/build environments,
  debuggers, profilers
- Job scheduling/control/policy and 

Re: Closing a SparkSession stops the SparkContext

2019-04-03 Thread Vinoo Ganesh
Yeah, so I think there are 2 separate issues here:



  1.  The coupling of the SparkSession + SparkContext in their current form 
seem unnatural
  2.  The current memory leak, which I do believe is a case where the session 
is added onto the spark context, but is only needed by the session (but would 
appreciate a sanity check here). Meaning, it may make sense to investigate an 
API change.



Thoughts?



On 4/2/19, 15:13, "Sean Owen"  wrote:

> @Sean – To the point that Ryan made, it feels wrong that stopping a 
session force stops the global context. Building in the logic to only stop the 
context when the last session is stopped also feels like a solution, but the 
best way I can think about doing this involves storing the global list of every 
available SparkSession, which may be difficult.



I tend to agree it would be more natural for the SparkSession to have

its own lifecycle 'stop' method that only stops/releases its own

resources. But is that the source of the problem here? if the state

you're trying to free is needed by the SparkContext, it won't help. If

it happens to be in the SparkContext but is state only needed by one

SparkSession and that there isn't any way to clean up now, that's a

compelling reason to change the API.  Is that the situation? The only

downside is making the user separately stop the SparkContext then.

From: Vinoo Ganesh 
Date: Tuesday, April 2, 2019 at 13:24
To: Arun Mahadevan , Ryan Blue 
Cc: Sean Owen , "dev@spark.apache.org" 
Subject: Re: Closing a SparkSession stops the SparkContext

// Merging threads

Thanks everyone for your thoughts. I’m very much in sync with Ryan here.

@Sean – To the point that Ryan made, it feels wrong that stopping a session 
force stops the global context. Building in the logic to only stop the context 
when the last session is stopped also feels like a solution, but the best way I 
can think about doing this involves storing the global list of every available 
SparkSession, which may be difficult.

@Arun – If the intention is not to be able to clear and create new sessions, 
then what specific is the intended use case of Sessions? 
https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html
 
[databricks.com]
 describes SparkSessions as time bounded interactions which implies that old 
ones should be clear-able an news ones create-able in lockstep without adverse 
effect?

From: Arun Mahadevan 
Date: Tuesday, April 2, 2019 at 12:31
To: Ryan Blue 
Cc: Vinoo Ganesh , Sean Owen , 
"dev@spark.apache.org" 
Subject: Re: Closing a SparkSession stops the SparkContext

I am not sure how would it cause a leak though. When a spark session or the 
underlying context is stopped it should clean up everything. The getOrCreate is 
supposed to return the active thread local or the global session. May be if you 
keep creating new sessions after explicitly clearing the default and the local 
sessions and keep leaking the sessions it could happen, but I don't think 
Sessions are intended to be used that way.

On Tue, 2 Apr 2019 at 08:45, Ryan Blue  wrote:
I think Vinoo is right about the intended behavior. If we support multiple 
sessions in one context, then stopping any one session shouldn't stop the 
shared context. The last session to be stopped should stop the context, but not 
any before that. We don't typically run multiple sessions in the same context 
so we haven't hit this, but it sounds reasonable.


On 4/2/19, 11:44, "Sean Owen"  wrote:



Yeah there's one global default session, but it's possible to create

others and set them as the thread's active session, to allow for

different configurations in the SparkSession within one app. I think

you're asking why closing one of them would effectively shut all of

them down by stopping the SparkContext. My best guess is simply, well,

that's how it works. You'd only call this, like SparkContext.stop(),

when you know the whole app is done and want to clean up. SparkSession

is a kind of wrapper on SparkContext and it wouldn't be great to make

users stop all the sessions and go find and stop the context.



If there is some per-SparkSession state that needs a cleanup, then

that's a good point, as I don't see a lifecycle method that means

"just close this session".

You're talking about SparkContext state though right, and there is

definitely just one SparkContext though. It can/should only be stopped

when the app is really done.



Is the point that each session is adding some state to the context and

doesn't have any mechanism 

Re: [DISCUSS] Enable blacklisting feature by default in 3.0

2019-04-03 Thread Steve Loughran
On Tue, Apr 2, 2019 at 9:39 PM Ankur Gupta  wrote:

> Hi Steve,
>
> Thanks for your feedback. From your email, I could gather the following
> two important points:
>
>1. Report failures to something (cluster manager) which can opt to
>destroy the node and request a new one
>2. Pluggable failure detection algorithms
>
> Regarding #1, current blacklisting implementation does report blacklist
> status to Yarn here
> ,
> which can choose to take appropriate action based on failures across
> different applications (though it seems it doesn't currently). This doesn't
> work in static allocation though and for other cluster managers. Those
> issues are still open:
>
>- https://issues.apache.org/jira/browse/SPARK-24016
>- https://issues.apache.org/jira/browse/SPARK-19755
>- https://issues.apache.org/jira/browse/SPARK-23485
>
> Regarding #2, that is a good point but I think that is optional and may
> not be tied to enabling the blacklisting feature in the current form.
>

I'd expect the algorithms to be done in the controllers, as failures were
reported.

One other thing to consider is how to rect where you are down to ~0 nodes.
At that point you may as well give up on the blacklisting because you've
just implicitly shut down the cluster. I seem to remember something (HDFS?)
trying to deal with that


>
> Coming back to the concerns raised by Reynold, Chris and Steve, it seems
> that there are at least two tasks that we need to complete before we decide
> to enable blacklisting by default in it's current form:
>
>1. Avoid resource starvation because of blacklisting
>2. Use exponential backoff for blacklisting instead of a configurable
>threshold
>3. Report blacklisting status to all cluster managers (I am not sure
>if this is necessary to move forward though)
>
> Thanks for all the feedback. Please let me know if there are other
> concerns that we would like to resolve before enabling blacklisting.
>
> Thanks,
> Ankur
>
>
>>


[Spark SQL] overload + BinaryOperator for String concatenation.

2019-04-03 Thread Mark Le Noury
Hi,

I've tried searching the archives and haven't found anything relevant - but
apologies if this has been discussed before.

I was wondering how viable it would be to alter the behavior of Spark to
allow:

"String1" + "String2" = "String1String2"

Currently it tries to cast both Strings to double and subsequently returns
null.

Just having a quick look at the code and there are already precedents for
this in

org.apache.spark.sql.catalyst.expressions.{BinaryArithmetic, Add}

in that it handles Dates and Decimals differently from numeric.

Extending that to handle Strings seems straightforward, but I am obviously
not an expert on the Spark code base.

thanks,

Mark Le Noury