Re: Spark writing API

2023-08-16 Thread Andrew Melo
Hello Wenchen,

On Wed, Aug 16, 2023 at 23:33 Wenchen Fan  wrote:

> > is there a way to hint to the downstream users on the number of rows
> expected to write?
>
> It will be very hard to do. Spark pipelines the execution (within shuffle
> boundaries) and we can't predict the number of final output rows.
>

Perhaps I don't understand -- even in the case of multiple shuffles, you
can assume that there is exactly one shuffle boundary before the write
operation, and that shuffle boundary knows the number of input rows for
that shuffle. That number of rows has to be, by construction, the upper
bound on the number of rows that will be passed to the writer.

If the writer can be hinted that bound then it can do something smart with
allocating (memory or disk). By comparison, the current API just gives
rows/batches one at a time, and in the case of off-heap allocation (like
with arrow's off-heap storage), it's crazy inefficient to try and do the
equivalent of realloc() to grow the buffer size.

Thanks
Andrew



> On Mon, Aug 7, 2023 at 8:27 PM Steve Loughran 
> wrote:
>
>>
>>
>> On Thu, 1 Jun 2023 at 00:58, Andrew Melo  wrote:
>>
>>> Hi all
>>>
>>> I've been developing for some time a Spark DSv2 plugin "Laurelin" (
>>> https://github.com/spark-root/laurelin
>>> ) to read the ROOT (https://root.cern) file format (which is used in
>>> high energy physics). I've recently presented my work in a conference (
>>> https://indico.jlab.org/event/459/contributions/11603/).
>>>
>>>
>> nice paper given the esoteric nature of HEP file formats.
>>
>> All of that to say,
>>>
>>> A) is there no reason that the builtin (eg parquet) data sources can't
>>> consume the external APIs? It's hard to write a plugin that has to use a
>>> specific API when you're competing with another source who gets access to
>>> the internals directly.
>>>
>>> B) What is the Spark-approved API to code against for to write? There is
>>> a mess of *ColumnWriter classes in the Java namespace, and while there is
>>> no documentation, it's unclear which is preferred by the core (maybe
>>> ArrowWriterColumnVector?). We can give a zero copy write if the API
>>> describes it
>>>
>>
>> There's a dangerous tendency for things that libraries need to be tagged
>> private [spark], normally worked around by people putting their code into
>> org.apache.spark packages. Really everyone who does that should try to get
>> a longer term fix in, as well as that quick-and-effective workaround.
>> Knowing where problems lie would be a good first step. spark sub-modules
>> are probably a place to get insight into where those low-level internal
>> operations are considered important, although many uses may be for historic
>> "we wrote it that way a long time ago" reasons
>>
>>
>>>
>>> C) Putting aside everything above, is there a way to hint to the
>>> downstream users on the number of rows expected to write? Any smart writer
>>> will use off-heap memory to write to disk/memory, so the current API that
>>> shoves rows in doesn't do the trick. You don't want to keep reallocating
>>> buffers constantly
>>>
>>> D) what is sparks plan to use arrow-based columnar data representations?
>>> I see that there a lot of external efforts whose only option is to inject
>>> themselves in the CLASSPATH. The regular DSv2 api is already crippled for
>>> reads and for writes it's even worse. Is there a commitment from the spark
>>> core to bring the API to parity? Or is instead is it just a YMMV commitment
>>>
>>
>> No idea, I'm afraid. I do think arrow makes a good format for processing,
>> and it'd be interesting to see how well it actually works as a wire format
>> to replace other things (e.g hive's protocol), especially on RDMA networks
>> and the like. I'm not up to date with ongoing work there -if anyone has
>> pointers that'd be interesting.
>>
>>>
>>> Thanks!
>>> Andrew
>>>
>>>
>>>
>>>
>>>
>>> --
>>> It's dark in this basement.
>>>
>> --
It's dark in this basement.


Re: Spark writing API

2023-08-02 Thread Andrew Melo
Hello Spark Devs

Could anyone help me with this?

Thanks,
Andrew

On Wed, May 31, 2023 at 20:57 Andrew Melo  wrote:

> Hi all
>
> I've been developing for some time a Spark DSv2 plugin "Laurelin" (
> https://github.com/spark-root/laurelin
> ) to read the ROOT (https://root.cern) file format (which is used in high
> energy physics). I've recently presented my work in a conference (
> https://indico.jlab.org/event/459/contributions/11603/).
>
> All of that to say,
>
> A) is there no reason that the builtin (eg parquet) data sources can't
> consume the external APIs? It's hard to write a plugin that has to use a
> specific API when you're competing with another source who gets access to
> the internals directly.
>
> B) What is the Spark-approved API to code against for to write? There is a
> mess of *ColumnWriter classes in the Java namespace, and while there is no
> documentation, it's unclear which is preferred by the core (maybe
> ArrowWriterColumnVector?). We can give a zero copy write if the API
> describes it
>
> C) Putting aside everything above, is there a way to hint to the
> downstream users on the number of rows expected to write? Any smart writer
> will use off-heap memory to write to disk/memory, so the current API that
> shoves rows in doesn't do the trick. You don't want to keep reallocating
> buffers constantly
>
> D) what is sparks plan to use arrow-based columnar data representations? I
> see that there a lot of external efforts whose only option is to inject
> themselves in the CLASSPATH. The regular DSv2 api is already crippled for
> reads and for writes it's even worse. Is there a commitment from the spark
> core to bring the API to parity? Or is instead is it just a YMMV commitment
>
> Thanks!
> Andrew
>
>
>
>
>
> --
> It's dark in this basement.
>
-- 
It's dark in this basement.


Spark writing API

2023-05-31 Thread Andrew Melo
Hi all

I've been developing for some time a Spark DSv2 plugin "Laurelin" (
https://github.com/spark-root/laurelin
) to read the ROOT (https://root.cern) file format (which is used in high
energy physics). I've recently presented my work in a conference (
https://indico.jlab.org/event/459/contributions/11603/).

All of that to say,

A) is there no reason that the builtin (eg parquet) data sources can't
consume the external APIs? It's hard to write a plugin that has to use a
specific API when you're competing with another source who gets access to
the internals directly.

B) What is the Spark-approved API to code against for to write? There is a
mess of *ColumnWriter classes in the Java namespace, and while there is no
documentation, it's unclear which is preferred by the core (maybe
ArrowWriterColumnVector?). We can give a zero copy write if the API
describes it

C) Putting aside everything above, is there a way to hint to the downstream
users on the number of rows expected to write? Any smart writer will use
off-heap memory to write to disk/memory, so the current API that shoves
rows in doesn't do the trick. You don't want to keep reallocating buffers
constantly

D) what is sparks plan to use arrow-based columnar data representations? I
see that there a lot of external efforts whose only option is to inject
themselves in the CLASSPATH. The regular DSv2 api is already crippled for
reads and for writes it's even worse. Is there a commitment from the spark
core to bring the API to parity? Or is instead is it just a YMMV commitment

Thanks!
Andrew





-- 
It's dark in this basement.


Re: [Java] Constructing a list from ArrowBufs

2023-04-27 Thread Andrew Melo
Very sorry, I deleted an important part of the code. it should read:

// Number of bytes each element takes up Int is 4, etc..
int itemSize = new AsDtype(dtype).memory_itemsize();
int countBufferSize = (entryStop - entryStart + 1) * INT_SIZE;

ArrowBuf countsBuf = allocator.buffer(countBufferSize);
for (int x = 0; x < (entryStop - entryStart + 1); x++) {
countsBuf.setInt(x * INT_SIZE, x * desc.getFixedLength());
}
// File format uses BE, so perform a byte swap to get to LE
ArrowBuf contentBuf = swapEndianness(contentTemp);

ArrowType outerType = new ArrowType.List();
// Convert from our internal dtype to the Arrow equivalent
ArrowType innerType = dtypeToArrow();

FieldType outerField = new FieldType(false, outerType, null);
FieldType innerField = new FieldType(false, innerType, null);

int outerLen = (entryStop - entryStart) * contentTemp.multiplicity();
int innerLen = contentTemp.numitems();
ArrowFieldNode outerNode = new ArrowFieldNode(outerLen, 0);
ArrowFieldNode innerNode = new ArrowFieldNode(innerLen, 0);

ListVector arrowVec = ListVector.empty("testcol", allocator);
arrowVec.loadFieldBuffers(outerNode, Arrays.asList(null, countsBuf));

AddOrGetResult children =
arrowVec.addOrGetVector(innerField);

FieldVector innerVec = (FieldVector) children.getVector();
innerVec.loadFieldBuffers(innerNode, Arrays.asList(null, contentBuf));

On Thu, Apr 27, 2023 at 2:20 PM Andrew Melo  wrote:
>
> Hi all,
>
> I am working on a Spark datasource plugin that reads a (custom) file
> format and outputs arrow-backed columns. I'm having difficulty
> figuring out how to construct a ListVector if I have an ArrowBuf with
> the contents and know the width of each list. I've tried constructing
> the buffer with the code I pasted below, but it appears something
> becomes unaligned, and I get incorrect values back when reading the
> vector back.
>
> The documentation and elsewhere on the internet has examples where you
> construct the ListVector element-by-element (e.g. with
> UnionListWriter), but I'm having difficulty finding an example where
> you start from ArrowBufs and use that to directly construct the
> ListVector.
>
> Does anyone have an example they could point me to?
>
> Thanks!
> Andrew
>
> // Number of bytes each element takes up Int is 4, etc..
> int itemSize = new AsDtype(dtype).memory_itemsize();
> int countBufferSize = (entryStop - entryStart + 1) * INT_SIZE;
>
> ArrowBuf countsBuf = allocator.buffer(countBufferSize);
> // File format uses BE, so perform a byte swap to get to LE
> ArrowBuf contentBuf = swapEndianness(contentTemp);
>
> ArrowType outerType = new ArrowType.List();
> // Convert from our internal dtype to the Arrow equivalent
> ArrowType innerType = dtypeToArrow();
>
> FieldType outerField = new FieldType(false, outerType, null);
> FieldType innerField = new FieldType(false, innerType, null);
>
> int outerLen = (entryStop - entryStart) * contentTemp.multiplicity();
> int innerLen = contentTemp.numitems();
> ArrowFieldNode outerNode = new ArrowFieldNode(outerLen, 0);
> ArrowFieldNode innerNode = new ArrowFieldNode(innerLen, 0);
>
> ListVector arrowVec = ListVector.empty("testcol", allocator);
> arrowVec.loadFieldBuffers(outerNode, Arrays.asList(null, countsBuf));
>
> AddOrGetResult children =
> arrowVec.addOrGetVector(innerField);
>
> FieldVector innerVec = (FieldVector) children.getVector();
> innerVec.loadFieldBuffers(innerNode, Arrays.asList(null, contentBuf));


[Java] Constructing a list from ArrowBufs

2023-04-27 Thread Andrew Melo
Hi all,

I am working on a Spark datasource plugin that reads a (custom) file
format and outputs arrow-backed columns. I'm having difficulty
figuring out how to construct a ListVector if I have an ArrowBuf with
the contents and know the width of each list. I've tried constructing
the buffer with the code I pasted below, but it appears something
becomes unaligned, and I get incorrect values back when reading the
vector back.

The documentation and elsewhere on the internet has examples where you
construct the ListVector element-by-element (e.g. with
UnionListWriter), but I'm having difficulty finding an example where
you start from ArrowBufs and use that to directly construct the
ListVector.

Does anyone have an example they could point me to?

Thanks!
Andrew

// Number of bytes each element takes up Int is 4, etc..
int itemSize = new AsDtype(dtype).memory_itemsize();
int countBufferSize = (entryStop - entryStart + 1) * INT_SIZE;

ArrowBuf countsBuf = allocator.buffer(countBufferSize);
// File format uses BE, so perform a byte swap to get to LE
ArrowBuf contentBuf = swapEndianness(contentTemp);

ArrowType outerType = new ArrowType.List();
// Convert from our internal dtype to the Arrow equivalent
ArrowType innerType = dtypeToArrow();

FieldType outerField = new FieldType(false, outerType, null);
FieldType innerField = new FieldType(false, innerType, null);

int outerLen = (entryStop - entryStart) * contentTemp.multiplicity();
int innerLen = contentTemp.numitems();
ArrowFieldNode outerNode = new ArrowFieldNode(outerLen, 0);
ArrowFieldNode innerNode = new ArrowFieldNode(innerLen, 0);

ListVector arrowVec = ListVector.empty("testcol", allocator);
arrowVec.loadFieldBuffers(outerNode, Arrays.asList(null, countsBuf));

AddOrGetResult children =
arrowVec.addOrGetVector(innerField);

FieldVector innerVec = (FieldVector) children.getVector();
innerVec.loadFieldBuffers(innerNode, Arrays.asList(null, contentBuf));


Re: Spark on Kube (virtua) coffee/tea/pop times

2023-02-07 Thread Andrew Melo
I'm Central US time (AKA UTC -6:00)

On Tue, Feb 7, 2023 at 5:32 PM Holden Karau  wrote:
>
> Awesome, I guess I should have asked folks for timezones that they’re in.
>
> On Tue, Feb 7, 2023 at 3:30 PM Andrew Melo  wrote:
>>
>> Hello Holden,
>>
>> We are interested in Spark on k8s and would like the opportunity to
>> speak with devs about what we're looking for slash better ways to use
>> spark.
>>
>> Thanks!
>> Andrew
>>
>> On Tue, Feb 7, 2023 at 5:24 PM Holden Karau  wrote:
>> >
>> > Hi Folks,
>> >
>> > It seems like we could maybe use some additional shared context around 
>> > Spark on Kube so I’d like to try and schedule a virtual coffee session.
>> >
>> > Who all would be interested in virtual adventures around Spark on Kube 
>> > development?
>> >
>> > No pressure if the idea of hanging out in a virtual chat with coffee and 
>> > Spark devs does not sound like your thing, just trying to make something 
>> > informal so we can have a better understanding of everyone’s goals here.
>> >
>> > Cheers,
>> >
>> > Holden :)
>> > --
>> > Twitter: https://twitter.com/holdenkarau
>> > Books (Learning Spark, High Performance Spark, etc.): 
>> > https://amzn.to/2MaRAG9
>> > YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>
> --
> Twitter: https://twitter.com/holdenkarau
> Books (Learning Spark, High Performance Spark, etc.): https://amzn.to/2MaRAG9
> YouTube Live Streams: https://www.youtube.com/user/holdenkarau

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Spark on Kube (virtua) coffee/tea/pop times

2023-02-07 Thread Andrew Melo
Hello Holden,

We are interested in Spark on k8s and would like the opportunity to
speak with devs about what we're looking for slash better ways to use
spark.

Thanks!
Andrew

On Tue, Feb 7, 2023 at 5:24 PM Holden Karau  wrote:
>
> Hi Folks,
>
> It seems like we could maybe use some additional shared context around Spark 
> on Kube so I’d like to try and schedule a virtual coffee session.
>
> Who all would be interested in virtual adventures around Spark on Kube 
> development?
>
> No pressure if the idea of hanging out in a virtual chat with coffee and 
> Spark devs does not sound like your thing, just trying to make something 
> informal so we can have a better understanding of everyone’s goals here.
>
> Cheers,
>
> Holden :)
> --
> Twitter: https://twitter.com/holdenkarau
> Books (Learning Spark, High Performance Spark, etc.): https://amzn.to/2MaRAG9
> YouTube Live Streams: https://www.youtube.com/user/holdenkarau

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [Spark Sql] Global Setting for Case-Insensitive String Compare

2022-11-21 Thread Andrew Melo
I think this is the right place, just a hard question :) As far as I
know, there's no "case insensitive flag", so YMMV

On Mon, Nov 21, 2022 at 5:40 PM Patrick Tucci  wrote:
>
> Is this the wrong list for this type of question?
>
> On 2022/11/12 16:34:48 Patrick Tucci wrote:
>  > Hello,
>  >
>  > Is there a way to set string comparisons to be case-insensitive
> globally? I
>  > understand LOWER() can be used, but my codebase contains 27k lines of SQL
>  > and many string comparisons. I would need to apply LOWER() to each string
>  > literal in the code base. I'd also need to change all the ETL/import code
>  > to apply LOWER() to each string value on import.
>  >
>  > Current behavior:
>  >
>  > SELECT 'ABC' = 'abc';
>  > false
>  > Time taken: 5.466 seconds, Fetched 1 row(s)
>  >
>  > SELECT 'ABC' IN ('AbC', 'abc');
>  > false
>  > Time taken: 5.498 seconds, Fetched 1 row(s)
>  >
>  > SELECT 'ABC' like 'Ab%'
>  > false
>  > Time taken: 5.439 seconds, Fetched 1 row(s)
>  >
>  > Desired behavior would be true for all of the above with the proposed
>  > case-insensitive flag set.
>  >
>  > Thanks,
>  >
>  > Patrick
>  >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Profiling PySpark Pandas UDF

2022-08-25 Thread Andrew Melo
Hi Gourav,

Since Koalas needs the same round-trip to/from JVM and Python, I
expect that the performance should be nearly the same for UDFs in
either API

Cheers
Andrew

On Thu, Aug 25, 2022 at 11:22 AM Gourav Sengupta
 wrote:
>
> Hi,
>
> May be I am jumping to conclusions and making stupid guesses, but have you 
> tried koalas now that it is natively integrated with pyspark??
>
> Regards
> Gourav
>
> On Thu, 25 Aug 2022, 11:07 Subash Prabanantham,  
> wrote:
>>
>> Hi All,
>>
>> I was wondering if we have any best practices on using pandas UDF ? 
>> Profiling UDF is not an easy task and our case requires some drilling down 
>> on the logic of the function.
>>
>>
>> Our use case:
>> We are using func(Dataframe) => Dataframe as interface to use Pandas UDF, 
>> while running locally only the function, it runs faster but when executed in 
>> Spark environment - the processing time is more than expected. We have one 
>> column where the value is large (BinaryType -> 600KB), wondering whether 
>> this could make the Arrow computation slower ?
>>
>> Is there any profiling or best way to debug the cost incurred using pandas 
>> UDF ?
>>
>>
>> Thanks,
>> Subash
>>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



PySpark cores

2022-07-28 Thread Andrew Melo
Hello,

Is there a way to tell Spark that PySpark (arrow) functions use
multiple cores? If we have an executor with 8 cores, we would like to
have a single PySpark function use all 8 cores instead of having 8
single core python functions run.

Thanks!
Andrew

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [EXTERNAL] RDD.pipe() for binary data

2022-07-16 Thread Andrew Melo
I'm curious about using shared memory to speed up the JVM->Python round
trip. Is there any sane way to do anonymous shared memory in Java/scale?

On Sat, Jul 16, 2022 at 16:10 Sebastian Piu  wrote:

> Other alternatives are to look at how PythonRDD does it in spark, you
> could also try to go for a more traditional setup where you expose your
> python functions behind a local/remote service and call that from scala -
> say over thrift/grpc/http/local socket etc.
> Another option, but I've never done it so I'm not sure if it would work,
> is to maybe look if arrow could help by sharing a piece of memory with the
> data you need from scala and then read it from python
>
> On Sat, Jul 16, 2022 at 9:56 PM Sean Owen  wrote:
>
>> Use GraphFrames?
>>
>> On Sat, Jul 16, 2022 at 3:54 PM Yuhao Zhang 
>> wrote:
>>
>>> Hi Shay,
>>>
>>> Thanks for your reply! I would very much like to use pyspark. However,
>>> my project depends on GraphX, which is only available in the Scala API as
>>> far as I know. So I'm locked with Scala and trying to find a way out. I
>>> wonder if there's a way to go around it.
>>>
>>> Best regards,
>>> Yuhao Zhang
>>>
>>>
>>> On Sun, Jul 10, 2022 at 5:36 AM Shay Elbaz  wrote:
>>>
 Yuhao,


 You can use pyspark as entrypoint to your application. With py4j you
 can call Java/Scala functions from the python application. There's no need
 to use the pipe() function for that.


 Shay
 --
 *From:* Yuhao Zhang 
 *Sent:* Saturday, July 9, 2022 4:13:42 AM
 *To:* user@spark.apache.org
 *Subject:* [EXTERNAL] RDD.pipe() for binary data


 *ATTENTION:* This email originated from outside of GM.


 Hi All,

 I'm currently working on a project involving transferring between
 Spark 3.x (I use Scala) and a Python runtime. In Spark, data is stored in
 an RDD as floating-point number arrays/vectors and I have custom routines
 written in Python to process them. On the Spark side, I also have some
 operations specific to Spark Scala APIs, so I need to use both runtimes.

 Now to achieve data transfer I've been using the RDD.pipe() API, by 1.
 converting the arrays to strings in Spark and calling RDD.pipe(script.py)
 2. Then Python receives the strings and casts them as Python's data
 structures and conducts operations. 3. Python converts the arrays into
 strings and prints them back to Spark. 4. Spark gets the strings and cast
 them back as arrays.

 Needless to say, this feels unnatural and slow to me, and there are
 some potential floating-point number precision issues, as I think the
 floating number arrays should have been transmitted as raw bytes. I found
 no way to use the RDD.pipe() for this purpose, as written in
 https://github.com/apache/spark/blob/3331d4ccb7df9aeb1972ed86472269a9dbd261ff/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala#L139,
 .pipe() seems to be locked with text-based streaming.

 Can anyone shed some light on how I can achieve this? I'm trying to
 come up with a way that does not involve modifying the core Spark myself.
 One potential solution I can think of is saving/loading the RDD as binary
 files but I'm hoping to find a streaming-based solution. Any help is much
 appreciated, thanks!


 Best regards,
 Yuhao

>>> --
It's dark in this basement.


Re: How is union() implemented? Need to implement column bind

2022-04-20 Thread Andrew Melo
It would certainly be useful for our domain to have some sort of native
cbind(). Is there a fundamental disapproval of adding that functionality,
or is it just a matter of nobody implementing it?

On Wed, Apr 20, 2022 at 16:28 Sean Owen  wrote:

> Good lead, pandas on Spark concat() is worth trying. It looks like it uses
> a join, but not 100% sure from the source.
> The SQL concat() function is indeed a different thing.
>
> On Wed, Apr 20, 2022 at 3:24 PM Bjørn Jørgensen 
> wrote:
>
>> Sorry for asking. But why does`t concat work?
>>
>> Pandas on spark have ps.concat
>> 
>>  which
>> takes 2 dataframes and concat them to 1 dataframe.
>> It seems
>> 
>> like the pyspark version takes 2 columns and concat it to one column.
>>
>> ons. 20. apr. 2022 kl. 21:04 skrev Sean Owen :
>>
>>> cbind? yeah though the answer is typically a join. I don't know if
>>> there's a better option in a SQL engine, as SQL doesn't have anything to
>>> offer except join and pivot either (? right?)
>>> Certainly, the dominant data storage paradigm is wide tables, whereas
>>> you're starting with effectively a huge number of tiny slim tables, which
>>> is the impedance mismatch here.
>>>
>>> On Wed, Apr 20, 2022 at 1:51 PM Andrew Davidson 
>>> wrote:
>>>
 Thanks Sean



 I imagine this is a fairly common problem in data science. Any idea how
 other solve?  For example I wonder if running join something like BigQuery
 might work better? I do not know much about the implementation.



 No one tool will  solve all problems. Once I get the matrix I think it
 spark will work well for our need



 Kind regards



 Andy



 *From: *Sean Owen 
 *Date: *Monday, April 18, 2022 at 6:58 PM
 *To: *Andrew Davidson 
 *Cc: *"user @spark" 
 *Subject: *Re: How is union() implemented? Need to implement column
 bind



 A join is the natural answer, but this is a 10114-way join, which
 probably chokes readily just to even plan it, let alone all the shuffling
 and shuffling of huge data. You could tune your way out of it maybe, but
 not optimistic. It's just huge.



 You could go off-road and lower-level to take advantage of the
 structure of the data. You effectively want "column bind". There is no such
 operation in Spark. (union is 'row bind'.) You could do this with
 zipPartition, which is in the RDD API, and to my surprise, not in the
 Python API but exists in Scala. And R (!). If you can read several RDDs of
 data, you can use this method to pair all their corresponding values and
 ultimately get rows of 10114 values out. In fact that is how sparklyr
 implements cbind on Spark, FWIW:
 https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html



 The issue I see is that you can only zip a few at a time; you don't
 want to zip 10114 of them. Perhaps you have to do that iteratively, and I
 don't know if that is going to face the same issues with huge huge plans.



 I like the pivot idea. If you can read the individual files as data
 rows (maybe list all the file names, parallelize with Spark, write a UDF
 that reads the data for that file to generate the rows). If you can emit
 (file, index, value) and groupBy index, pivot on file (I think?) that
 should be about it? I think it doesn't need additional hashing or whatever.
 Not sure how fast it is but that seems more direct than the join, as well.



 On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson
  wrote:

 Hi have a hard problem



 I have  10114 column vectors each in a separate file. The file has 2
 columns, the row id, and numeric values. The row ids are identical and in
 sort order. All the column vectors have the same number of rows. There are
 over 5 million rows.  I need to combine them into a single table. The row
 ids are very long strings. The column names are about 20 chars long.



 My current implementation uses join. This takes a long time on a
 cluster with 2 works totaling 192 vcpu and 2.8 tb of memory. It often
 crashes. I mean totally dead start over. Checkpoints do not seem  help, It
 still crashes and need to be restarted from scratch. What is really
 surprising is the final file size is only 213G ! The way got the file
  was to copy all the column vectors to a single BIG IRON machine and used
 unix cut and paste. Took about 44 min to run once I got all the data moved
 around. It was very tedious and error prone. I had to move a lot data
 around. Not a particularly reproducible 

Re: Grabbing the current MemoryManager in a plugin

2022-04-13 Thread Andrew Melo
Hello,

Any wisdom on the question below?

Thanks
Andrew

On Fri, Apr 8, 2022 at 16:04 Andrew Melo  wrote:

> Hello,
>
> I've implemented support for my DSv2 plugin to back its storage with
> ArrowColumnVectors, which necessarily means using off-heap memory. Is
> it possible to somehow grab either a reference to the current
> MemoryManager so that the off-heap memory usage is properly accounted
> for and to prevent inadvertently OOM-ing the system?
>
> Thanks
> Andrew
>
-- 
It's dark in this basement.


Re: cannot access class sun.nio.ch.DirectBuffer

2022-04-13 Thread Andrew Melo
Gotcha. Seeing as there's a lot of large projects who used the unsafe API
either directly or indirectly (via netty, etc..) it's a bit surprising that
it was so thoroughly closed off without an escape hatch, but I'm sure there
was a lively discussion around it...

Cheers
Andrew

On Wed, Apr 13, 2022 at 09:07 Sean Owen  wrote:

> It is intentionally closed by the JVM going forward, as direct access is
> discouraged. But it's still necessary for Spark. In some cases, like direct
> mem access, there is a new API but it's in Java 17 I think, and we can't
> assume Java 17 any time soon.
>
> On Wed, Apr 13, 2022 at 9:05 AM Andrew Melo  wrote:
>
>> Hi Sean,
>>
>> Out of curiosity, will Java 11+ always require special flags to access
>> the unsafe direct memory interfaces, or is this something that will either
>> be addressed by the spec (by making an "approved" interface) or by
>> libraries (with some other workaround)?
>>
>> Thanks
>> Andrew
>>
>> On Tue, Apr 12, 2022 at 08:45 Sean Owen  wrote:
>>
>>> In Java 11+, you will need to tell the JVM to allow access to internal
>>> packages in some cases, for any JVM application. You will need flags like
>>> "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", which you can see in
>>> the pom.xml file for the project.
>>>
>>> Spark 3.2 does not necessarily work with Java 17 (3.3 should have
>>> support), but it may well work after you address those flags.
>>>
>>> On Tue, Apr 12, 2022 at 7:05 AM Arunachalam Sibisakkaravarthi <
>>> arunacha...@mcruncher.com> wrote:
>>>
>>>> Hi guys,
>>>>
>>>> spark-sql_2.12:3.2.1 is used in our application.
>>>>
>>>> It throws following exceptions when the app runs using JRE17
>>>>
>>>> java.lang.IllegalAccessError: class org.apache.spark.storage.StorageUtils$ 
>>>> (in unnamed module @0x451f1bd4) cannot access class 
>>>> sun.nio.ch.DirectBuffer (in module java.base) because module java.base 
>>>> does not export sun.nio.ch to unnamed module @0x451f1bd43at 
>>>> org.apache.spark.storage.StorageUtils$.(StorageUtils.scala:213)4 
>>>>   at org.apache.spark.storage.StorageUtils$.(StorageUtils.scala)5 
>>>> at 
>>>> org.apache.spark.storage.BlockManagerMasterEndpoint.(BlockManagerMasterEndpoint.scala:110)6
>>>> at org.apache.spark.SparkEnv$.$anonfun$create$9(SparkEnv.scala:348)7   
>>>>  at 
>>>> org.apache.spark.SparkEnv$.registerOrLookupEndpoint$1(SparkEnv.scala:287)8 
>>>>   at org.apache.spark.SparkEnv$.create(SparkEnv.scala:336)9   at 
>>>> org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:191)10 at 
>>>> org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:277)11 
>>>>   at org.apache.spark.SparkContext.(SparkContext.scala:460)12   
>>>> at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2690)13   
>>>>  at 
>>>> org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:949)14
>>>>at scala.Option.getOrElse(Option.scala:189)15   at 
>>>> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:943)
>>>>
>>>> How do we fix this?
>>>>
>>>>
>>>>
>>>>
>>>> *Thanks And RegardsSibi.ArunachalammCruncher*
>>>>
>>> --
>> It's dark in this basement.
>>
> --
It's dark in this basement.


Re: cannot access class sun.nio.ch.DirectBuffer

2022-04-13 Thread Andrew Melo
Hi Sean,

Out of curiosity, will Java 11+ always require special flags to access the
unsafe direct memory interfaces, or is this something that will either be
addressed by the spec (by making an "approved" interface) or by libraries
(with some other workaround)?

Thanks
Andrew

On Tue, Apr 12, 2022 at 08:45 Sean Owen  wrote:

> In Java 11+, you will need to tell the JVM to allow access to internal
> packages in some cases, for any JVM application. You will need flags like
> "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", which you can see in the
> pom.xml file for the project.
>
> Spark 3.2 does not necessarily work with Java 17 (3.3 should have
> support), but it may well work after you address those flags.
>
> On Tue, Apr 12, 2022 at 7:05 AM Arunachalam Sibisakkaravarthi <
> arunacha...@mcruncher.com> wrote:
>
>> Hi guys,
>>
>> spark-sql_2.12:3.2.1 is used in our application.
>>
>> It throws following exceptions when the app runs using JRE17
>>
>> java.lang.IllegalAccessError: class org.apache.spark.storage.StorageUtils$ 
>> (in unnamed module @0x451f1bd4) cannot access class sun.nio.ch.DirectBuffer 
>> (in module java.base) because module java.base does not export sun.nio.ch to 
>> unnamed module @0x451f1bd43  at 
>> org.apache.spark.storage.StorageUtils$.(StorageUtils.scala:213)4   
>> at org.apache.spark.storage.StorageUtils$.(StorageUtils.scala)5 at 
>> org.apache.spark.storage.BlockManagerMasterEndpoint.(BlockManagerMasterEndpoint.scala:110)6
>> at org.apache.spark.SparkEnv$.$anonfun$create$9(SparkEnv.scala:348)7
>> at 
>> org.apache.spark.SparkEnv$.registerOrLookupEndpoint$1(SparkEnv.scala:287)8   
>> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:336)9   at 
>> org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:191)10 at 
>> org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:277)11   
>> at org.apache.spark.SparkContext.(SparkContext.scala:460)12   at 
>> org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2690)13
>> at 
>> org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:949)14
>>at scala.Option.getOrElse(Option.scala:189)15   at 
>> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:943)
>>
>> How do we fix this?
>>
>>
>>
>>
>> *Thanks And RegardsSibi.ArunachalammCruncher*
>>
> --
It's dark in this basement.


Grabbing the current MemoryManager in a plugin

2022-04-08 Thread Andrew Melo
Hello,

I've implemented support for my DSv2 plugin to back its storage with
ArrowColumnVectors, which necessarily means using off-heap memory. Is
it possible to somehow grab either a reference to the current
MemoryManager so that the off-heap memory usage is properly accounted
for and to prevent inadvertently OOM-ing the system?

Thanks
Andrew

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



ArrowBuf.nioBuffer() reference counting

2022-04-06 Thread Andrew Melo
Hello,

When using (on Java) ArrowBuf.nioBuffer(), does care need to be taken
so that the underlying ArrowBuf doesn't go out of scope? Or does it
increment the reference count somewhere behind the scenes?

Thanks
Andrew


Re: Apache Spark 3.3 Release

2022-03-16 Thread Andrew Melo
Hello,

I've been trying for a bit to get the following two PRs merged and
into a release, and I'm having some difficulty moving them forward:

https://github.com/apache/spark/pull/34903 - This passes the current
python interpreter to spark-env.sh to allow some currently-unavailable
customization to happen
https://github.com/apache/spark/pull/31774 - This fixes a bug in the
SparkUI reverse proxy-handling code where it does a greedy match for
"proxy" in the URL, and will mistakenly replace the App-ID in the
wrong place.

I'm not exactly sure of how to get attention of PRs that have been
sitting around for a while, but these are really important to our
use-cases, and it would be nice to have them merged in.

Cheers
Andrew

On Wed, Mar 16, 2022 at 6:21 PM Holden Karau  wrote:
>
> I'd like to add/backport the logging in 
> https://github.com/apache/spark/pull/35881 PR so that when users submit 
> issues with dynamic allocation we can better debug what's going on.
>
> On Wed, Mar 16, 2022 at 3:45 PM Chao Sun  wrote:
>>
>> There is one item on our side that we want to backport to 3.3:
>> - vectorized DELTA_BYTE_ARRAY/DELTA_LENGTH_BYTE_ARRAY encodings for
>> Parquet V2 support (https://github.com/apache/spark/pull/35262)
>>
>> It's already reviewed and approved.
>>
>> On Wed, Mar 16, 2022 at 9:13 AM Tom Graves  
>> wrote:
>> >
>> > It looks like the version hasn't been updated on master and still shows 
>> > 3.3.0-SNAPSHOT, can you please update that.
>> >
>> > Tom
>> >
>> > On Wednesday, March 16, 2022, 01:41:00 AM CDT, Maxim Gekk 
>> >  wrote:
>> >
>> >
>> > Hi All,
>> >
>> > I have created the branch for Spark 3.3:
>> > https://github.com/apache/spark/commits/branch-3.3
>> >
>> > Please, backport important fixes to it, and if you have some doubts, ping 
>> > me in the PR. Regarding new features, we are still building the allow list 
>> > for branch-3.3.
>> >
>> > Best regards,
>> > Max Gekk
>> >
>> >
>> > On Wed, Mar 16, 2022 at 5:51 AM Dongjoon Hyun  
>> > wrote:
>> >
>> > Yes, I agree with you for your whitelist approach for backporting. :)
>> > Thank you for summarizing.
>> >
>> > Thanks,
>> > Dongjoon.
>> >
>> >
>> > On Tue, Mar 15, 2022 at 4:20 PM Xiao Li  wrote:
>> >
>> > I think I finally got your point. What you want to keep unchanged is the 
>> > branch cut date of Spark 3.3. Today? or this Friday? This is not a big 
>> > deal.
>> >
>> > My major concern is whether we should keep merging the feature work or the 
>> > dependency upgrade after the branch cut. To make our release time more 
>> > predictable, I am suggesting we should finalize the exception PR list 
>> > first, instead of merging them in an ad hoc way. In the past, we spent a 
>> > lot of time on the revert of the PRs that were merged after the branch 
>> > cut. I hope we can minimize unnecessary arguments in this release. Do you 
>> > agree, Dongjoon?
>> >
>> >
>> >
>> > Dongjoon Hyun  于2022年3月15日周二 15:55写道:
>> >
>> > That is not totally fine, Xiao. It sounds like you are asking a change of 
>> > plan without a proper reason.
>> >
>> > Although we cut the branch Today according our plan, you still can collect 
>> > the list and make a list of exceptions. I'm not blocking what you want to 
>> > do.
>> >
>> > Please let the community start to ramp down as we agreed before.
>> >
>> > Dongjoon
>> >
>> >
>> >
>> > On Tue, Mar 15, 2022 at 3:07 PM Xiao Li  wrote:
>> >
>> > Please do not get me wrong. If we don't cut a branch, we are allowing all 
>> > patches to land Apache Spark 3.3. That is totally fine. After we cut the 
>> > branch, we should avoid merging the feature work. In the next three days, 
>> > let us collect the actively developed PRs that we want to make an 
>> > exception (i.e., merged to 3.3 after the upcoming branch cut). Does that 
>> > make sense?
>> >
>> > Dongjoon Hyun  于2022年3月15日周二 14:54写道:
>> >
>> > Xiao. You are working against what you are saying.
>> > If you don't cut a branch, it means you are allowing all patches to land 
>> > Apache Spark 3.3. No?
>> >
>> > > we need to avoid backporting the feature work that are not being well 
>> > > discussed.
>> >
>> >
>> >
>> > On Tue, Mar 15, 2022 at 12:12 PM Xiao Li  wrote:
>> >
>> > Cutting the branch is simple, but we need to avoid backporting the feature 
>> > work that are not being well discussed. Not all the members are actively 
>> > following the dev list. I think we should wait 3 more days for collecting 
>> > the PR list before cutting the branch.
>> >
>> > BTW, there are very few 3.4-only feature work that will be affected.
>> >
>> > Xiao
>> >
>> > Dongjoon Hyun  于2022年3月15日周二 11:49写道:
>> >
>> > Hi, Max, Chao, Xiao, Holden and all.
>> >
>> > I have a different idea.
>> >
>> > Given the situation and small patch list, I don't think we need to 
>> > postpone the branch cut for those patches. It's easier to cut a branch-3.3 
>> > and allow backporting.
>> >
>> > As of today, we already have an obvious Apache Spark 3.4 patch in the 
>> > branch together. This 

Re: Time to start publishing Spark Docker Images?

2021-08-17 Thread Andrew Melo
Hi Mich,

By default, pip caches downloaded binaries to somewhere like
$HOME/.cache/pip. So after doing any "pip install", you'll want to either
delete that directory, or pass the "--no-cache-dir" option to pip to
prevent the download binaries from being added to the image.

HTH
Andrew

On Tue, Aug 17, 2021 at 2:29 PM Mich Talebzadeh 
wrote:

> Hi Andrew,
>
> Can you please elaborate on blowing pip cache before committing the layer?
>
> Thanks,
>
> Much
>
> On Tue, 17 Aug 2021 at 16:57, Andrew Melo  wrote:
>
>> Silly Q, did you blow away the pip cache before committing the layer?
>> That always trips me up.
>>
>> Cheers
>> Andrew
>>
>> On Tue, Aug 17, 2021 at 10:56 Mich Talebzadeh 
>> wrote:
>>
>>> With no additional python packages etc we get 1.4GB compared to 2.19GB
>>> before
>>>
>>> REPOSITORY   TAG  IMAGE ID
>>>  CREATED  SIZE
>>> spark/spark-py   3.1.1_sparkpy_3.7-scala_2.12-java8only   faee4dbb95dd
>>>  Less than a second ago   1.41GB
>>> spark/spark-py   3.1.1_sparkpy_3.7-scala_2.12-java8   ba3c17bc9337
>>>  4 hours ago  2.19GB
>>>
>>> root@233a81199b43:/opt/spark/work-dir# pip list
>>> Package   Version
>>> - ---
>>> asn1crypto0.24.0
>>> cryptography  2.6.1
>>> entrypoints   0.3
>>> keyring   17.1.1
>>> keyrings.alt  3.1.1
>>> pip   21.2.4
>>> pycrypto  2.6.1
>>> PyGObject 3.30.4
>>> pyxdg 0.25
>>> SecretStorage 2.3.1
>>> setuptools57.4.0
>>> six   1.12.0
>>> wheel 0.32.3
>>>
>>>
>>> HTH
>>>
>>>
>>>view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Tue, 17 Aug 2021 at 16:24, Mich Talebzadeh 
>>> wrote:
>>>
>>>> Yes, I will double check. it includes java 8 in addition to base java
>>>> 11.
>>>>
>>>> in addition it has these Python packages for now (added for my own
>>>> needs for now)
>>>>
>>>> root@ce6773017a14:/opt/spark/work-dir# pip list
>>>> Package   Version
>>>> - ---
>>>> asn1crypto0.24.0
>>>> cryptography  2.6.1
>>>> cx-Oracle 8.2.1
>>>> entrypoints   0.3
>>>> keyring   17.1.1
>>>> keyrings.alt  3.1.1
>>>> numpy 1.21.2
>>>> pip   21.2.4
>>>> py4j  0.10.9
>>>> pycrypto  2.6.1
>>>> PyGObject 3.30.4
>>>> pyspark   3.1.2
>>>> pyxdg 0.25
>>>> PyYAML5.4.1
>>>> SecretStorage 2.3.1
>>>> setuptools57.4.0
>>>> six   1.12.0
>>>> wheel 0.32.3
>>>>
>>>>
>>>> HTH
>>>>
>>>>
>>>>view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, 17 Aug 2021 at 16:17, Maciej  wrote:
>>>>
>>>>> Quick question ‒ is this actual output? If so, do we know what
>>>>> accounts 1.5GB overhead for PySpark image. Even without
>>>>> --no-install-recommends this seems like a lot (if I recall correctly
>>>>> it was around 400MB for existing images).
>>>>>
>>>>>
>>>>> On 8/17/21 2:24 PM, Mich Talebzadeh wrote:
>>>&

Re: Time to start publishing Spark Docker Images?

2021-08-17 Thread Andrew Melo
Silly Q, did you blow away the pip cache before committing the layer? That
always trips me up.

Cheers
Andrew

On Tue, Aug 17, 2021 at 10:56 Mich Talebzadeh 
wrote:

> With no additional python packages etc we get 1.4GB compared to 2.19GB
> before
>
> REPOSITORY   TAG  IMAGE ID
>  CREATED  SIZE
> spark/spark-py   3.1.1_sparkpy_3.7-scala_2.12-java8only   faee4dbb95dd
>  Less than a second ago   1.41GB
> spark/spark-py   3.1.1_sparkpy_3.7-scala_2.12-java8   ba3c17bc9337   4
> hours ago  2.19GB
>
> root@233a81199b43:/opt/spark/work-dir# pip list
> Package   Version
> - ---
> asn1crypto0.24.0
> cryptography  2.6.1
> entrypoints   0.3
> keyring   17.1.1
> keyrings.alt  3.1.1
> pip   21.2.4
> pycrypto  2.6.1
> PyGObject 3.30.4
> pyxdg 0.25
> SecretStorage 2.3.1
> setuptools57.4.0
> six   1.12.0
> wheel 0.32.3
>
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 17 Aug 2021 at 16:24, Mich Talebzadeh 
> wrote:
>
>> Yes, I will double check. it includes java 8 in addition to base java 11.
>>
>> in addition it has these Python packages for now (added for my own needs
>> for now)
>>
>> root@ce6773017a14:/opt/spark/work-dir# pip list
>> Package   Version
>> - ---
>> asn1crypto0.24.0
>> cryptography  2.6.1
>> cx-Oracle 8.2.1
>> entrypoints   0.3
>> keyring   17.1.1
>> keyrings.alt  3.1.1
>> numpy 1.21.2
>> pip   21.2.4
>> py4j  0.10.9
>> pycrypto  2.6.1
>> PyGObject 3.30.4
>> pyspark   3.1.2
>> pyxdg 0.25
>> PyYAML5.4.1
>> SecretStorage 2.3.1
>> setuptools57.4.0
>> six   1.12.0
>> wheel 0.32.3
>>
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 17 Aug 2021 at 16:17, Maciej  wrote:
>>
>>> Quick question ‒ is this actual output? If so, do we know what accounts
>>> 1.5GB overhead for PySpark image. Even without --no-install-recommends
>>> this seems like a lot (if I recall correctly it was around 400MB for
>>> existing images).
>>>
>>>
>>> On 8/17/21 2:24 PM, Mich Talebzadeh wrote:
>>>
>>> Examples:
>>>
>>> *docker images*
>>>
>>> REPOSITORY   TAG  IMAGE ID
>>>  CREATED  SIZE
>>>
>>> spark/spark-py   3.1.1_sparkpy_3.7-scala_2.12-java8   ba3c17bc9337   2
>>> minutes ago2.19GB
>>>
>>> spark3.1.1-scala_2.12-java11  4595c4e78879   18
>>> minutes ago   635MB
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Tue, 17 Aug 2021 at 10:31, Mich Talebzadeh 
>>> wrote:
>>>
 3.1.2_sparkpy_3.7-scala_2.12-java11

 3.1.2_sparkR_3.6-scala_2.12-java11
 Yes let us go with that and remember that we can change the tags
 anytime. The accompanying release note should detail what is inside the
 image downloaded.

 +1 for me


view my Linkedin profile
 



 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.




 On Tue, 17 Aug 2021 at 09:51, Maciej  wrote:

> On 8/17/21 4:04 AM, Holden Karau wrote:
>
> These are some really good points all around.
>
> I think, in the interest of simplicity, well start with just the 3
> current Dockerfiles in the Spark repo but for the next release (3.3) we
> should 

Re: WholeStageCodeGen + DSv2

2021-05-19 Thread Andrew Melo
As it turns out, I also commented on the same Jira (and forgot about
it until just now).

On Wed, May 19, 2021 at 8:32 AM Shubham Chaurasia
 wrote:
>
> Hi,
>
> I remember creating one for a similar scenario in the past - 
> https://issues.apache.org/jira/browse/SPARK-29372.
>
> Thanks,
> Shubham
>
> On Wed, May 19, 2021 at 5:34 PM Takeshi Yamamuro  
> wrote:
>>
>> hi, Andrew,
>>
>> Welcome any improvement proposal for that.
>> Could you file an issue in jira first to show us your idea and an example 
>> query
>> to reproduce the issue you described?
>>
>> Bests,
>> Takeshi
>>
>> On Wed, May 19, 2021 at 11:38 AM Andrew Melo  wrote:
>>>
>>> Hello,
>>>
>>> When reading a very wide (> 1000 cols) input, WholeStageCodeGen blows
>>> past the 64kB source limit and fails. Looking at the generated code, a
>>> big part of the code is simply the DSv2 convention that the codegen'd
>>> variable names are the same as the columns instead of something more
>>> compact like 'c1', 'c2', etc..
>>>
>>> Would there be any interest in accepting a patch that shortens these
>>> variable names to try and stay under the limit?
>>>
>>> Thanks
>>> Andrew
>>>
>>> -
>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



WholeStageCodeGen + DSv2

2021-05-18 Thread Andrew Melo
Hello,

When reading a very wide (> 1000 cols) input, WholeStageCodeGen blows
past the 64kB source limit and fails. Looking at the generated code, a
big part of the code is simply the DSv2 convention that the codegen'd
variable names are the same as the columns instead of something more
compact like 'c1', 'c2', etc..

Would there be any interest in accepting a patch that shortens these
variable names to try and stay under the limit?

Thanks
Andrew

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Secrets store for DSv2

2021-05-18 Thread Andrew Melo
Hello,

When implementing a DSv2 datasource, where is an appropriate place to
store/transmit secrets from the driver to the executors? Is there
built-in spark functionality for that, or is my best bet to stash it
as a member variable in one of the classes that gets sent to the
executors?

Thanks!
Andrew

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Merge two dataframes

2021-05-17 Thread Andrew Melo
On Mon, May 17, 2021 at 2:31 PM Lalwani, Jayesh  wrote:
>
> If the UDFs are computationally expensive, I wouldn't solve this problem with 
>  UDFs at all. If they are working in an iterative manner, and assuming each 
> iteration is independent of other iterations (yes, I know that's a big 
> assumptiuon), I would think about exploding your dataframe to have a row per 
> iteration, and working on each row separately, and then aggregating in the 
> end. This allows you to scale your computation much better.

Ah, in this case, I mean "iterative" in the sense of the
"code/run/examine" sense of the word, not that the UDF itself is
performing an iterative computation.

>
> I know not all computations can be map-reducable like that. However, most can.
>
> Split and merge data workflows in Spark don't work like their DAG 
> representations, unless you add costly caches. Without caching, each split 
> will result in Spark rereading data from the source, even if the splits are 
> getting merged together. The only way to avoid it is by caching at the split 
> point, which depending on the amount of data can become costly. Also, joins 
> result in shuffles. Avoiding splits and merges is better.
>
> To give you an example, we had an application that applied a series of rules 
> to rows. The output required was a dataframe with an additional column that 
> indicated which rule the row satisfied. In our initial implementation, we had 
> a series of r one per rule. For N rules, we created N dataframes that had the 
> rows that satisfied the rules. The we unioned the N data frames. Horrible 
> performance that didn't scale with N. We reimplemented to add N Boolean 
> columns; one per rule; that indicated if the rule was satisfied. We just kept 
> adding the boolen columns to the dataframe. After iterating over the rules, 
> we added another column that indicated out which rule was satisfied, and then 
> dropped the Boolean columns. Much better performance that scaled with N. 
> Spark read from datasource just once, and since there were no joins/unions, 
> there was no shuffle

The hitch in your example, and what we're trying to avoid, is that if
you need to change one of these boolean columns, you end up needing to
recompute everything "afterwards" in the DAG (AFAICT), even if the
"latter" stages don't have a true dependency on the changed column. We
do explorations of very large physics datasets, and one of the
disadvantages of our bespoke analysis software is that any change to
the analysis code involves re-computing everything from scratch. A big
goal of mine is to make it so that what was changed is recomputed, and
no more, which will speed up the rate at which we can find new
physics.

Cheers
Andrew

>
> On 5/17/21, 2:56 PM, "Andrew Melo"  wrote:
>
> CAUTION: This email originated from outside of the organization. Do not 
> click links or open attachments unless you can confirm the sender and know 
> the content is safe.
>
>
>
> In our case, these UDFs are quite expensive and worked on in an
> iterative manner, so being able to cache the two "sides" of the graphs
> independently will speed up the development cycle. Otherwise, if you
> modify foo() here, then you have to recompute bar and baz, even though
> they're unchanged.
>
> df.withColumn('a', foo('x')).withColumn('b', bar('x')).withColumn('c', 
> baz('x'))
>
> Additionally, a longer goal would be to be able to persist/cache these
> columns to disk so a downstream user could later mix and match several
> (10s) of these columns together as their inputs w/o having to
> explicitly compute them themselves.
>
>     Cheers
> Andrew
>
> On Mon, May 17, 2021 at 1:10 PM Sean Owen  wrote:
> >
> > Why join here - just add two columns to the DataFrame directly?
> >
> > On Mon, May 17, 2021 at 1:04 PM Andrew Melo  
> wrote:
> >>
> >> Anyone have ideas about the below Q?
> >>
> >> It seems to me that given that "diamond" DAG, that spark could see
> >> that the rows haven't been shuffled/filtered, it could do some type of
> >> "zip join" to push them together, but I've not been able to get a plan
> >> that doesn't do a hash/sort merge join
> >>
> >> Cheers
> >> Andrew
> >>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Merge two dataframes

2021-05-17 Thread Andrew Melo
In our case, these UDFs are quite expensive and worked on in an
iterative manner, so being able to cache the two "sides" of the graphs
independently will speed up the development cycle. Otherwise, if you
modify foo() here, then you have to recompute bar and baz, even though
they're unchanged.

df.withColumn('a', foo('x')).withColumn('b', bar('x')).withColumn('c', baz('x'))

Additionally, a longer goal would be to be able to persist/cache these
columns to disk so a downstream user could later mix and match several
(10s) of these columns together as their inputs w/o having to
explicitly compute them themselves.

Cheers
Andrew

On Mon, May 17, 2021 at 1:10 PM Sean Owen  wrote:
>
> Why join here - just add two columns to the DataFrame directly?
>
> On Mon, May 17, 2021 at 1:04 PM Andrew Melo  wrote:
>>
>> Anyone have ideas about the below Q?
>>
>> It seems to me that given that "diamond" DAG, that spark could see
>> that the rows haven't been shuffled/filtered, it could do some type of
>> "zip join" to push them together, but I've not been able to get a plan
>> that doesn't do a hash/sort merge join
>>
>> Cheers
>> Andrew
>>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Merge two dataframes

2021-05-17 Thread Andrew Melo
Anyone have ideas about the below Q?

It seems to me that given that "diamond" DAG, that spark could see
that the rows haven't been shuffled/filtered, it could do some type of
"zip join" to push them together, but I've not been able to get a plan
that doesn't do a hash/sort merge join

Cheers
Andrew

On Wed, May 12, 2021 at 11:32 AM Andrew Melo  wrote:
>
> Hi,
>
> In the case where the left and right hand side share a common parent like:
>
> df = spark.read.someDataframe().withColumn('rownum', row_number())
> df1 = df.withColumn('c1', expensive_udf1('foo')).select('c1', 'rownum')
> df2 = df.withColumn('c2', expensive_udf2('bar')).select('c2', 'rownum')
> df_joined = df1.join(df2, 'rownum', 'inner')
>
> (or maybe replacing row_number() with monotonically_increasing_id())
>
> Is there some hint/optimization that can be done to let Spark know
> that the left and right hand-sides of the join share the same
> ordering, and a sort/hash merge doesn't need to be done?
>
> Thanks
> Andrew
>
> On Wed, May 12, 2021 at 11:07 AM Sean Owen  wrote:
> >
> > Yeah I don't think that's going to work - you aren't guaranteed to get 1, 
> > 2, 3, etc. I think row_number() might be what you need to generate a join 
> > ID.
> >
> > RDD has a .zip method, but (unless I'm forgetting!) DataFrame does not. You 
> > could .zip two RDDs you get from DataFrames and manually convert the Rows 
> > back to a single Row and back to DataFrame.
> >
> >
> > On Wed, May 12, 2021 at 10:47 AM kushagra deep  
> > wrote:
> >>
> >> Thanks Raghvendra
> >>
> >> Will the ids for corresponding columns  be same always ? Since 
> >> monotonic_increasing_id() returns a number based on partitionId and the 
> >> row number of the partition  ,will it be same for corresponding columns? 
> >> Also is it guaranteed that the two dataframes will be divided into logical 
> >> spark partitions with the same cardinality for each partition ?
> >>
> >> Reg,
> >> Kushagra Deep
> >>
> >> On Wed, May 12, 2021, 21:00 Raghavendra Ganesh  
> >> wrote:
> >>>
> >>> You can add an extra id column and perform an inner join.
> >>>
> >>> val df1_with_id = df1.withColumn("id", monotonically_increasing_id())
> >>>
> >>> val df2_with_id = df2.withColumn("id", monotonically_increasing_id())
> >>>
> >>> df1_with_id.join(df2_with_id, Seq("id"), "inner").drop("id").show()
> >>>
> >>> +-+-+
> >>>
> >>> |amount_6m|amount_9m|
> >>>
> >>> +-+-+
> >>>
> >>> |  100|  500|
> >>>
> >>> |  200|  600|
> >>>
> >>> |  300|  700|
> >>>
> >>> |  400|  800|
> >>>
> >>> |  500|  900|
> >>>
> >>> +-+-+
> >>>
> >>>
> >>> --
> >>> Raghavendra
> >>>
> >>>
> >>> On Wed, May 12, 2021 at 6:20 PM kushagra deep  
> >>> wrote:
> >>>>
> >>>> Hi All,
> >>>>
> >>>> I have two dataframes
> >>>>
> >>>> df1
> >>>>
> >>>> amount_6m
> >>>>  100
> >>>>  200
> >>>>  300
> >>>>  400
> >>>>  500
> >>>>
> >>>> And a second data df2 below
> >>>>
> >>>>  amount_9m
> >>>>   500
> >>>>   600
> >>>>   700
> >>>>   800
> >>>>   900
> >>>>
> >>>> The number of rows is same in both dataframes.
> >>>>
> >>>> Can I merge the two dataframes to achieve below df
> >>>>
> >>>> df3
> >>>>
> >>>> amount_6m | amount_9m
> >>>> 100   500
> >>>>  200  600
> >>>>  300  700
> >>>>  400  800
> >>>>  500  900
> >>>>
> >>>> Thanks in advance
> >>>>
> >>>> Reg,
> >>>> Kushagra Deep
> >>>>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Merge two dataframes

2021-05-12 Thread Andrew Melo
Hi,

In the case where the left and right hand side share a common parent like:

df = spark.read.someDataframe().withColumn('rownum', row_number())
df1 = df.withColumn('c1', expensive_udf1('foo')).select('c1', 'rownum')
df2 = df.withColumn('c2', expensive_udf2('bar')).select('c2', 'rownum')
df_joined = df1.join(df2, 'rownum', 'inner')

(or maybe replacing row_number() with monotonically_increasing_id())

Is there some hint/optimization that can be done to let Spark know
that the left and right hand-sides of the join share the same
ordering, and a sort/hash merge doesn't need to be done?

Thanks
Andrew

On Wed, May 12, 2021 at 11:07 AM Sean Owen  wrote:
>
> Yeah I don't think that's going to work - you aren't guaranteed to get 1, 2, 
> 3, etc. I think row_number() might be what you need to generate a join ID.
>
> RDD has a .zip method, but (unless I'm forgetting!) DataFrame does not. You 
> could .zip two RDDs you get from DataFrames and manually convert the Rows 
> back to a single Row and back to DataFrame.
>
>
> On Wed, May 12, 2021 at 10:47 AM kushagra deep  
> wrote:
>>
>> Thanks Raghvendra
>>
>> Will the ids for corresponding columns  be same always ? Since 
>> monotonic_increasing_id() returns a number based on partitionId and the row 
>> number of the partition  ,will it be same for corresponding columns? Also is 
>> it guaranteed that the two dataframes will be divided into logical spark 
>> partitions with the same cardinality for each partition ?
>>
>> Reg,
>> Kushagra Deep
>>
>> On Wed, May 12, 2021, 21:00 Raghavendra Ganesh  
>> wrote:
>>>
>>> You can add an extra id column and perform an inner join.
>>>
>>> val df1_with_id = df1.withColumn("id", monotonically_increasing_id())
>>>
>>> val df2_with_id = df2.withColumn("id", monotonically_increasing_id())
>>>
>>> df1_with_id.join(df2_with_id, Seq("id"), "inner").drop("id").show()
>>>
>>> +-+-+
>>>
>>> |amount_6m|amount_9m|
>>>
>>> +-+-+
>>>
>>> |  100|  500|
>>>
>>> |  200|  600|
>>>
>>> |  300|  700|
>>>
>>> |  400|  800|
>>>
>>> |  500|  900|
>>>
>>> +-+-+
>>>
>>>
>>> --
>>> Raghavendra
>>>
>>>
>>> On Wed, May 12, 2021 at 6:20 PM kushagra deep  
>>> wrote:

 Hi All,

 I have two dataframes

 df1

 amount_6m
  100
  200
  300
  400
  500

 And a second data df2 below

  amount_9m
   500
   600
   700
   800
   900

 The number of rows is same in both dataframes.

 Can I merge the two dataframes to achieve below df

 df3

 amount_6m | amount_9m
 100   500
  200  600
  300  700
  400  800
  500  900

 Thanks in advance

 Reg,
 Kushagra Deep


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [DISCUSS] Support pandas API layer on PySpark

2021-03-16 Thread Andrew Melo
Hi,

Integrating Koalas with pyspark might help enable a richer integration
between the two. Something that would be useful with a tighter
integration is support for custom column array types. Currently, Spark
takes dataframes, converts them to arrow buffers then transmits them
over the socket to Python. On the other side, pyspark takes the arrow
buffer and converts it to a Pandas dataframe. Unfortunately, the
default Pandas representation of a list-type for a column causes it to
turn what was contiguous value/offset arrays in Arrow into
deserialized Python objects for each row. Obviously, this kills
performance.

A PR to extend the pyspark API to elide the pandas conversion
(https://github.com/apache/spark/pull/26783) was submitted and
rejected, which is unfortunate, but perhaps this proposed integration
would provide the hooks via Pandas' ExtensionArray interface to allow
Spark to performantly interchange jagged/ragged lists to/from python
UDFs.

Cheers
Andrew

On Tue, Mar 16, 2021 at 8:15 PM Hyukjin Kwon  wrote:
>
> Thank you guys for all your feedback. I will start working on SPIP with 
> Koalas team.
> I would expect the SPIP can be sent late this week or early next week.
>
>
> I inlined and answered the questions unanswered as below:
>
> Is the community developing the pandas API layer for Spark interested in 
> being part of Spark or do they prefer having their own release cycle?
>
> Yeah, Koalas team used to have its own release cycle to develop and move 
> quickly.
> Now it became pretty mature with reaching 1.7.0, and the team thinks that 
> it’s now
> fine to have less frequent releases, and they are happy to work together with 
> Spark with
> contributing to it. The active contributors in the Koalas community will 
> continue to
> make the contributions in Spark.
>
> How about test code? Does it fit into the PySpark test framework?
>
> Yes, this will be one of the places where it needs some efforts. Koalas 
> currently uses pytest
> with various dependency version combinations (e.g., Python version, conda vs 
> pip) whereas
> PySpark uses the plain unittests with less dependency version combinations.
>
> For pytest in Koalas <> unittests in PySpark:
>
>   I am currently thinking we will have to convert the Koalas tests to use 
> unittests to match
>   with PySpark for now.
>   It is a feasible option for PySpark to migrate to pytest too but it will 
> need extra effort to
>   make it working with our own PySpark testing framework seamlessly.
>   Koalas team (presumably and likely I) will take a look in any event.
>
> For the combinations of dependency versions:
>
>   Due to the lack of the resources in GitHub Actions, I currently plan to 
> just add the
>   Koalas tests into the matrix PySpark is currently using.
>
> one question I have; what’s an initial goal of the proposal?
> Is that to port all the pandas interfaces that Koalas has already implemented?
> Or, the basic set of them?
>
> The goal of the proposal is to port all of Koalas project into PySpark.
> For example,
>
> import koalas
>
> will be equivalent to
>
> # Names, etc. might change in the final proposal or during the review
> from pyspark.sql import pandas
>
> Koalas supports pandas APIs with a separate layer to cover a bit of 
> difference between
> DataFrame structures in pandas and PySpark, e.g.) other types as column names 
> (labels),
> index (something like row number in DBMSs) and so on. So I think it would 
> make more sense
> to port the whole layer instead of a subset of the APIs.
>
>
>
>
>
> 2021년 3월 17일 (수) 오전 12:32, Wenchen Fan 님이 작성:
>>
>> +1, it's great to have Pandas support in Spark out of the box.
>>
>> On Tue, Mar 16, 2021 at 10:12 PM Takeshi Yamamuro  
>> wrote:
>>>
>>> +1; the pandas interfaces are pretty popular and supporting them in pyspark 
>>> looks promising, I think.
>>> one question I have; what's an initial goal of the proposal?
>>> Is that to port all the pandas interfaces that Koalas has already 
>>> implemented?
>>> Or, the basic set of them?
>>>
>>> On Tue, Mar 16, 2021 at 1:44 AM Ismaël Mejía  wrote:

 +1

 Bringing a Pandas API for pyspark to upstream Spark will only bring
 benefits for everyone (more eyes to use/see/fix/improve the API) as
 well as better alignment with core Spark improvements, the extra
 weight looks manageable.

 On Mon, Mar 15, 2021 at 4:45 PM Nicholas Chammas
  wrote:
 >
 > On Mon, Mar 15, 2021 at 2:12 AM Reynold Xin  wrote:
 >>
 >> I don't think we should deprecate existing APIs.
 >
 >
 > +1
 >
 > I strongly prefer Spark's immutable DataFrame API to the Pandas API. I 
 > could be wrong, but I wager most people who have worked with both Spark 
 > and Pandas feel the same way.
 >
 > For the large community of current PySpark users, or users switching to 
 > PySpark from another Spark language API, it doesn't make sense to 
 > deprecate the current API, even by convention.

Re: Java dataframe library for arrow suggestions

2021-03-16 Thread Andrew Melo
I can't speak to how complete it is, but I looked earlier for
something similar and ran across
https://github.com/deeplearning4j/nd4j .. it's probably not an exact
fit, but it does appear to be able to consume arrow buffers and expose
them to java.

Cheers
Andrew

On Tue, Mar 16, 2021 at 6:36 PM Wes McKinney  wrote:
>
> This has been asked several times in the past but I'm not aware of
> anything "dataframe-like" in Java that's build against Arrow (or
> otherwise) that fills the kind of need that pandas does. There was a
> Scala project some years ago Saddle [1] (not Arrow-based) built
> initially by one of the early pandas developers but I don't think it's
> still being actively developed. To build a higher-level Java API on
> top of the Arrow Java libraries would be incredibly useful to the
> community I'm sure.
>
> [1]: https://github.com/saddle/saddle
>
> On Tue, Mar 16, 2021 at 5:06 PM Paul Whalen  wrote:
> >
> > Hi,
> >
> > I've been using Arrow for some time now, mostly in the context of Arrow 
> > Flight between Java and Python.  While it's quite easy to convert Arrow 
> > data in Python to a pandas dataframe and manipulate it, I'm struggling to 
> > find an obvious analogue on the Java side.  VectorSchemaRoot is useful for 
> > loading/unloading/moving data, but clumsy for doing higher level 
> > operations, especially joins/aggregations/etc across "tables".
> >
> > In other words, if I wanted to load non Arrow formatted data from somewhere 
> > into Java, manipulate it with a dataframe like API, and then send the 
> > result somewhere via Flight, what library would be the best/simplest way to 
> > accomplish that?  I see lots of progress in other languages, but I'm 
> > wondering what would be recommended for Java.
> >
> > I'm currently looking at Spark SQL just in-application, but that seems a 
> > touch heavyweight, and I'm not sure it would do exactly what I've described 
> > (nor am I terribly familiar with Spark in the first place).
> >
> > If the premise of this question is flawed, please feel free to correct me.
> >
> > Thanks!
> > Paul


Re: [DISCUSS] SPIP: FunctionCatalog

2021-02-16 Thread Andrew Melo
Hello Ryan,

This proposal looks very interesting. Would future goals for this
functionality include both support for aggregation functions, as well
as support for processing ColumnBatch-es (instead of Row/InternalRow)?

Thanks
Andrew

On Mon, Feb 15, 2021 at 12:44 PM Ryan Blue  wrote:
>
> Thanks for the positive feedback, everyone. It sounds like there is a clear 
> path forward for calling functions. Even without a prototype, the `invoke` 
> plans show that Wenchen's suggested optimization can be done, and 
> incorporating it as an optional extension to this proposal solves many of the 
> unknowns.
>
> With that area now understood, is there any discussion about other parts of 
> the proposal, besides the function call interface?
>
> On Fri, Feb 12, 2021 at 10:40 PM Chao Sun  wrote:
>>
>> This is an important feature which can unblock several other projects 
>> including bucket join support for DataSource v2, complete support for 
>> enforcing DataSource v2 distribution requirements on the write path, etc. I 
>> like Ryan's proposals which look simple and elegant, with nice support on 
>> function overloading and variadic arguments. On the other hand, I think 
>> Wenchen made a very good point about performance. Overall, I'm excited to 
>> see active discussions on this topic and believe the community will come to 
>> a proposal with the best of both sides.
>>
>> Chao
>>
>> On Fri, Feb 12, 2021 at 7:58 PM Hyukjin Kwon  wrote:
>>>
>>> +1 for Liang-chi's.
>>>
>>> Thanks Ryan and Wenchen for leading this.
>>>
>>>
>>> 2021년 2월 13일 (토) 오후 12:18, Liang-Chi Hsieh 님이 작성:

 Basically I think the proposal makes sense to me and I'd like to support 
 the
 SPIP as it looks like we have strong need for the important feature.

 Thanks Ryan for working on this and I do also look forward to Wenchen's
 implementation. Thanks for the discussion too.

 Actually I think the SupportsInvoke proposed by Ryan looks a good
 alternative to me. Besides Wenchen's alternative implementation, is there a
 chance we also have the SupportsInvoke for comparison?


 John Zhuge wrote
 > Excited to see our Spark community rallying behind this important 
 > feature!
 >
 > The proposal lays a solid foundation of minimal feature set with careful
 > considerations for future optimizations and extensions. Can't wait to see
 > it leading to more advanced functionalities like views with shared custom
 > functions, function pushdown, lambda, etc. It has already borne fruit 
 > from
 > the constructive collaborations in this thread. Looking forward to
 > Wenchen's prototype and further discussions including the SupportsInvoke
 > extension proposed by Ryan.
 >
 >
 > On Fri, Feb 12, 2021 at 4:35 PM Owen O'Malley 

 > owen.omalley@

 > 
 > wrote:
 >
 >> I think this proposal is a very good thing giving Spark a standard way 
 >> of
 >> getting to and calling UDFs.
 >>
 >> I like having the ScalarFunction as the API to call the UDFs. It is
 >> simple, yet covers all of the polymorphic type cases well. I think it
 >> would
 >> also simplify using the functions in other contexts like pushing down
 >> filters into the ORC & Parquet readers although there are a lot of
 >> details
 >> that would need to be considered there.
 >>
 >> .. Owen
 >>
 >>
 >> On Fri, Feb 12, 2021 at 11:07 PM Erik Krogen 

 > ekrogen@.com

 > 
 >> wrote:
 >>
 >>> I agree that there is a strong need for a FunctionCatalog within Spark
 >>> to
 >>> provide support for shareable UDFs, as well as make movement towards
 >>> more
 >>> advanced functionality like views which themselves depend on UDFs, so I
 >>> support this SPIP wholeheartedly.
 >>>
 >>> I find both of the proposed UDF APIs to be sufficiently user-friendly
 >>> and
 >>> extensible. I generally think Wenchen's proposal is easier for a user 
 >>> to
 >>> work with in the common case, but has greater potential for confusing
 >>> and
 >>> hard-to-debug behavior due to use of reflective method signature
 >>> searches.
 >>> The merits on both sides can hopefully be more properly examined with
 >>> code,
 >>> so I look forward to seeing an implementation of Wenchen's ideas to
 >>> provide
 >>> a more concrete comparison. I am optimistic that we will not let the
 >>> debate
 >>> over this point unreasonably stall the SPIP from making progress.
 >>>
 >>> Thank you to both Wenchen and Ryan for your detailed consideration and
 >>> evaluation of these ideas!
 >>> --
 >>> *From:* Dongjoon Hyun 

 > dongjoon.hyun@

 > 
 >>> *Sent:* Wednesday, February 10, 2021 6:06 PM
 >>> *To:* Ryan Blue 

 > blue@

 > 
 >>> *Cc:* Holden Karau 

 > 

Re: Spark DataFrame Creation

2020-07-22 Thread Andrew Melo
Hi Mark,

On Wed, Jul 22, 2020 at 4:49 PM Mark Bidewell  wrote:
>
> Sorry if this is the wrong place for this.  I am trying to debug an issue 
> with this library:
> https://github.com/springml/spark-sftp
>
> When I attempt to create a dataframe:
>
> spark.read.
> format("com.springml.spark.sftp").
> option("host", "...").
> option("username", "...").
> option("password", "...").
> option("fileType", "csv").
> option("inferSchema", "true").
> option("tempLocation","/srv/spark/tmp").
> option("hdfsTempLocation","/srv/spark/tmp");
>  .load("...")
>
> What I am seeing is that the download is occurring on the spark driver not 
> the spark worker,  This leads to a failure when spark tries to create the 
> DataFrame on the worker.
>
> I'm confused by the behavior.  my understanding was that load() was lazily 
> executed on the Spark worker.  Why would some elements be executing on the 
> driver?

Looking at the code, it appears that your sftp plugin downloads the
file to a local location and opens from there.

https://github.com/springml/spark-sftp/blob/090917547001574afa93cddaf2a022151a3f4260/src/main/scala/com/springml/spark/sftp/DefaultSource.scala#L38

You may have more luck with an sftp hadoop filesystem plugin that can
read sftp:// URLs directly.

Cheers
Andrew
>
> Thanks for your help
> --
> Mark Bidewell
> http://www.linkedin.com/in/markbidewell

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



PySpark aggregation w/pandas_udf

2020-07-15 Thread Andrew Melo
Hi all,

For our use case, we would like to perform an aggregation using a
pandas_udf with dataframes that have O(100m) rows and a few 10s of
columns. Conceptually, this looks a bit like pyspark.RDD.aggregate,
where the user provides:

* A "seqOp" which accepts pandas series(*) and outputs an intermediate output
* A "combOp" which combines the intermediate outputs into a final output

There's no direct DataFrame equivalent to RDD.aggregate(), but you can
somewhat emulate the functionality with

df.groupBy().applyInPandas(seqOp).agg(combOp)

However, it seems like using groupBy() w/o any columns isn't the
intended use. The docs for groupBy().applyInPandas() has the following
note:

> Note This function requires a full shuffle. All the data of a group will be 
> loaded into > memory, so the user should be aware of the potential OOM risk 
> if data is skewed > and certain groups are too large to fit in memory.

The Spark SQL guide has the following note as well:

> The configuration for maxRecordsPerBatch is not applied on groups and it is 
> up to > the user to ensure that the grouped data will fit into the available 
> memory.

Since we want to perform this aggregation over the entire DataFrame,
we end up with one group who is entirely loaded into memory which
immediately OOMs (requiring a shuffle probably doesn't help either).

To work around this, we make smaller groups by repartitioning, adding
a new column with the partition ID, then do the groupBy against that
column to make smaller groups, but that's not great -- it's a lot of
extra data movement.

Am I missing something obvious? Or is this simply a part of the API
that's not fleshed out yet?

Thanks
Andrew


* Unfortunately, Pandas' data model is less rich than spark/arrow/our
code, so the JVM composes an arrow stream and transmits it to the
python worker who then converts it to pandas before passing it to the
UDF. We then have to undo the conversion to get the original data
back. It'd be nice to have more control over that intermediate
conversion.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: REST Structured Steaming Sink

2020-07-01 Thread Andrew Melo
On Wed, Jul 1, 2020 at 8:13 PM Burak Yavuz  wrote:
>
> I'm not sure having a built-in sink that allows you to DDOS servers is the 
> best idea either. foreachWriter is typically used for such use cases, not 
> foreachBatch. It's also pretty hard to guarantee exactly-once, rate limiting, 
> etc.

If you control the machines and can run arbitrary code, you can DDOS
whatever you want. What's the difference between this proposal and
writing a UDF that opens 1,000 connections to a target machine?

> Best,
> Burak
>
> On Wed, Jul 1, 2020 at 5:54 PM Holden Karau  wrote:
>>
>> I think adding something like this (if it doesn't already exist) could help 
>> make structured streaming easier to use, foreachBatch is not the best API.
>>
>> On Wed, Jul 1, 2020 at 2:21 PM Jungtaek Lim  
>> wrote:
>>>
>>> I guess the method, query parameter, header, and the payload would be all 
>>> different for almost every use case - that makes it hard to generalize and 
>>> requires implementation to be pretty much complicated to be flexible enough.
>>>
>>> I'm not aware of any custom sink implementing REST so your best bet would 
>>> be simply implementing your own with foreachBatch, but so someone might 
>>> jump in and provide a pointer if there is something in the Spark ecosystem.
>>>
>>> Thanks,
>>> Jungtaek Lim (HeartSaVioR)
>>>
>>> On Thu, Jul 2, 2020 at 3:21 AM Sam Elamin  wrote:

 Hi All,


 We ingest alot of restful APIs into our lake and I'm wondering if it is at 
 all possible to created a rest sink in structured streaming?

 For now I'm only focusing on restful services that have an incremental ID 
 so my sink can just poll for new data then ingest.

 I can't seem to find a connector that does this and my gut instinct tells 
 me it's probably because it isn't possible due to something completely 
 obvious that I am missing

 I know some RESTful API obfuscate the IDs to a hash of strings and that 
 could be a problem but since I'm planning on focusing on just numerical 
 IDs that just get incremented I think I won't be facing that issue


 Can anyone let me know if this sounds like a daft idea? Will I need 
 something like Kafka or kinesis as a buffer and redundancy or am I 
 overthinking this?


 I would love to bounce ideas with people who runs structured streaming 
 jobs in production


 Kind regards
 San


>>
>>
>> --
>> Twitter: https://twitter.com/holdenkarau
>> Books (Learning Spark, High Performance Spark, etc.): https://amzn.to/2MaRAG9
>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Use Hadoop-3.2 as a default Hadoop profile in 3.0.0?

2020-06-24 Thread Andrew Melo
Hello,

On Wed, Jun 24, 2020 at 2:13 PM Holden Karau  wrote:
>
> So I thought our theory for the pypi packages was it was for local 
> developers, they really shouldn't care about the Hadoop version. If you're 
> running on a production cluster you ideally pip install from the same release 
> artifacts as your production cluster to match.

That's certainly one use of pypi packages, but not the only one. In
our case, we provide clusters for our users, with SPARK_CONF pre
configured with (e.g.) the master connection URL. But the analyses
they're doing are their own and unique, so they work in their own
personal python virtual environments. There are no "release artifacts"
to publish, per-se, since each user is working independently and can
install whatever they'd like into their virtual environments.

Cheers
Andrew

>
> On Wed, Jun 24, 2020 at 12:11 PM Wenchen Fan  wrote:
>>
>> Shall we start a new thread to discuss the bundled Hadoop version in 
>> PySpark? I don't have a strong opinion on changing the default, as users can 
>> still download the Hadoop 2.7 version.
>>
>> On Thu, Jun 25, 2020 at 2:23 AM Dongjoon Hyun  
>> wrote:
>>>
>>> To Xiao.
>>> Why Apache project releases should be blocked by PyPi / CRAN? It's 
>>> completely optional, isn't it?
>>>
>>> > let me repeat my opinion:  the top priority is to provide two options 
>>> for PyPi distribution
>>>
>>> IIRC, Apache Spark 3.0.0 fails to upload to CRAN and this is not the first 
>>> incident. Apache Spark already has a history of missing SparkR uploading. 
>>> We don't say Spark 3.0.0 fails due to CRAN uploading or other non-Apache 
>>> distribution channels. In short, non-Apache distribution channels cannot be 
>>> a `blocker` for Apache project releases. We only do our best for the 
>>> community.
>>>
>>> SPARK-32017 (Make Pyspark Hadoop 3.2+ Variant available in PyPI) is really 
>>> irrelevant to this PR. If someone wants to do that and the PR is ready, why 
>>> don't we do it in `Apache Spark 3.0.1 timeline`? Why do we wait for 
>>> December? Is there a reason why we need to wait?
>>>
>>> To Sean
>>> Thanks!
>>>
>>> To Nicholas.
>>> Do you think `pip install pyspark` is version-agnostic? In the Python 
>>> world, `pip install somepackage` fails frequently. In production, you 
>>> should use `pip install somepackage==specificversion`. I don't think the 
>>> production pipeline has non-versinoned Python package installation.
>>>
>>> The bottom line is that the PR doesn't change PyPi uploading, the AS-IS PR 
>>> keeps Hadoop 2.7 on PySpark due to Xiao's comments. I don't think there is 
>>> a blocker for that PR.
>>>
>>> Bests,
>>> Dongjoon.
>>>
>>> On Wed, Jun 24, 2020 at 10:54 AM Nicholas Chammas 
>>>  wrote:

 To rephrase my earlier email, PyPI users would care about the bundled 
 Hadoop version if they have a workflow that, in effect, looks something 
 like this:

 ```
 pip install pyspark
 pyspark --packages org.apache.hadoop:hadoop-aws:2.7.7
 spark.read.parquet('s3a://...')
 ```

 I agree that Hadoop 3 would be a better default (again, the s3a support is 
 just much better). But to Xiao's point, if you are expecting Spark to work 
 with some package like hadoop-aws that assumes an older version of Hadoop 
 bundled with Spark, then changing the default may break your workflow.

 In the case of hadoop-aws the fix is simple--just bump hadoop-aws:2.7.7 to 
 hadoop-aws:3.2.1. But perhaps there are other PyPI-based workflows that 
 would be more difficult to repair. 路‍♂️

 On Wed, Jun 24, 2020 at 1:44 PM Sean Owen  wrote:
>
> I'm also genuinely curious when PyPI users would care about the
> bundled Hadoop jars - do we even need two versions? that itself is
> extra complexity for end users.
> I do think Hadoop 3 is the better choice for the user who doesn't
> care, and better long term.
> OK but let's at least move ahead with changing defaults.
>
> On Wed, Jun 24, 2020 at 12:38 PM Xiao Li  wrote:
> >
> > Hi, Dongjoon,
> >
> > Please do not misinterpret my point. I already clearly said "I do not 
> > know how to track the popularity of Hadoop 2 vs Hadoop 3."
> >
> > Also, let me repeat my opinion:  the top priority is to provide two 
> > options for PyPi distribution and let the end users choose the ones 
> > they need. Hadoop 3.2 or Hadoop 2.7. In general, when we want to make 
> > any breaking change, let us follow our protocol documented in 
> > https://spark.apache.org/versioning-policy.html.
> >
> > If you just want to change the Jenkins setup, I am OK about it. If you 
> > want to change the default distribution, we need more discussions in 
> > the community for getting an agreement.
> >
> >  Thanks,
> >
> > Xiao
> >
>
>
>
> --
> Twitter: https://twitter.com/holdenkarau
> Books (Learning Spark, High Performance Spark, etc.): 

Re: Can I collect Dataset[Row] to driver without converting it to Array [Row]?

2020-04-22 Thread Andrew Melo
Hi Maqy

On Wed, Apr 22, 2020 at 3:24 AM maqy <454618...@qq.com> wrote:
>
> I will traverse this Dataset to convert it to Arrow and send it to Tensorflow 
> through Socket.

(I presume you're using the python tensorflow API, if you're not, please ignore)

There is a JIRA/PR ([1] [2]) which proposes to add native support for
Arrow serialization to python,

Under the hood, Spark is already serializing into Arrow format to
transmit to python, it's just additionally doing an unconditional
conversion to pandas once it reaches the python runner -- which is
good if you're using pandas, not so great if pandas isn't what you
operate on. The JIRA above would let you receive the arrow buffers
(that already exist) directly.

Cheers,
Andrew
[1] https://issues.apache.org/jira/browse/SPARK-30153
[2] https://github.com/apache/spark/pull/26783

>
> I tried to use toLocalIterator() to traverse the dataset instead of collect  
> to the driver, but toLocalIterator() will create a lot of jobs and will bring 
> a lot of time consumption.
>
>
>
> Best regards,
>
> maqy
>
>
>
> 发件人: Michael Artz
> 发送时间: 2020年4月22日 16:09
> 收件人: maqy
> 抄送: user@spark.apache.org
> 主题: Re: Can I collect Dataset[Row] to driver without converting it to Array 
> [Row]?
>
>
>
> What would you do with it once you get it into driver in a Dataset[Row]?
>
> Sent from my iPhone
>
>
>
> On Apr 22, 2020, at 3:06 AM, maqy <454618...@qq.com> wrote:
>
> 
>
> When the data is stored in the Dataset [Row] format, the memory usage is very 
> small.
>
> When I use collect () to collect data to the driver, each line of the dataset 
> will be converted to Row and stored in an array, which will bring great 
> memory overhead.
>
> So, can I collect Dataset[Row] to driver and keep its data format?
>
>
>
> Best regards,
>
> maqy
>
>
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: DSv2 & DataSourceRegister

2020-04-16 Thread Andrew Melo
Hi again,

Does anyone have thoughts on either the idea or the implementation?

Thanks,
Andrew

On Thu, Apr 9, 2020 at 11:32 PM Andrew Melo  wrote:
>
> Hi all,
>
> I've opened a WIP PR here https://github.com/apache/spark/pull/28159
> I'm a novice at Scala, so I'm sure the code isn't idiomatic, but it
> behaves functionally how I'd expect. I've added unit tests to the PR,
> but if you would like to verify the intended functionality, I've
> uploaded a fat jar with my datasource to
> http://mirror.accre.vanderbilt.edu/spark/laurelin-both.jar and an
> example input file to
> https://github.com/spark-root/laurelin/raw/master/testdata/stdvector.root.
> The following in spark-shell successfully chooses the proper plugin
> implementation based on the spark version:
>
> spark.read.format("root").option("tree","tvec").load("stdvector.root")
>
> Additionally, I did a very rough POC for spark2.4, which you can find
> at https://github.com/PerilousApricot/spark/tree/feature/registerv2-24
> . The same jar/inputfile works there as well.
>
> Thanks again,
> Andrew
>
> On Wed, Apr 8, 2020 at 10:27 AM Andrew Melo  wrote:
> >
> > On Wed, Apr 8, 2020 at 8:35 AM Wenchen Fan  wrote:
> > >
> > > It would be good to support your use case, but I'm not sure how to 
> > > accomplish that. Can you open a PR so that we can discuss it in detail? 
> > > How can `public Class getImplementation();` be 
> > > possible in 3.0 as there is no `DataSourceV2`?
> >
> > You're right, that was a typo. Since the whole point is to separate
> > the (stable) registration interface from the (evolving) DSv2 API, it
> > defeats the purpose to then directly reference the DSv2 API within the
> > registration interface.
> >
> > I'll put together a PR.
> >
> > Thanks again,
> > Andrew
> >
> > >
> > > On Wed, Apr 8, 2020 at 1:12 PM Andrew Melo  wrote:
> > >>
> > >> Hello
> > >>
> > >> On Tue, Apr 7, 2020 at 23:16 Wenchen Fan  wrote:
> > >>>
> > >>> Are you going to provide a single artifact for Spark 2.4 and 3.0? I'm 
> > >>> not sure this is possible as the DS V2 API is very different in 3.0, 
> > >>> e.g. there is no `DataSourceV2` anymore, and you should implement 
> > >>> `TableProvider` (if you don't have database/table).
> > >>
> > >>
> > >> Correct, I've got a single jar for both Spark 2.4 and 3.0, with a 
> > >> toplevel Root_v24 (implements DataSourceV2) and Root_v30 (implements 
> > >> TableProvider). I can load this jar in a both pyspark 2.4 and 3.0 and it 
> > >> works well -- as long as I remove the registration from META-INF and 
> > >> pass in the full class name to the DataFrameReader.
> > >>
> > >> Thanks
> > >> Andrew
> > >>
> > >>>
> > >>> On Wed, Apr 8, 2020 at 6:58 AM Andrew Melo  
> > >>> wrote:
> > >>>>
> > >>>> Hi Ryan,
> > >>>>
> > >>>> On Tue, Apr 7, 2020 at 5:21 PM Ryan Blue  wrote:
> > >>>> >
> > >>>> > Hi Andrew,
> > >>>> >
> > >>>> > With DataSourceV2, I recommend plugging in a catalog instead of 
> > >>>> > using DataSource. As you've noticed, the way that you plug in data 
> > >>>> > sources isn't very flexible. That's one of the reasons why we 
> > >>>> > changed the plugin system and made it possible to use named catalogs 
> > >>>> > that load implementations based on configuration properties.
> > >>>> >
> > >>>> > I think it's fine to consider how to patch the registration trait, 
> > >>>> > but I really don't recommend continuing to identify table 
> > >>>> > implementations directly by name.
> > >>>>
> > >>>> Can you be a bit more concrete with what you mean by plugging a
> > >>>> catalog instead of a DataSource? We have been using
> > >>>> sc.read.format("root").load([list of paths]) which works well. Since
> > >>>> we don't have a database or tables, I don't fully understand what's
> > >>>> different between the two interfaces that would make us prefer one or
> > >>>> another.
> > >>>>
> > >>>> That being said, WRT the registration trait, if I'm not misreading
> > >>>> 

Re: DSv2 & DataSourceRegister

2020-04-09 Thread Andrew Melo
Hi all,

I've opened a WIP PR here https://github.com/apache/spark/pull/28159
I'm a novice at Scala, so I'm sure the code isn't idiomatic, but it
behaves functionally how I'd expect. I've added unit tests to the PR,
but if you would like to verify the intended functionality, I've
uploaded a fat jar with my datasource to
http://mirror.accre.vanderbilt.edu/spark/laurelin-both.jar and an
example input file to
https://github.com/spark-root/laurelin/raw/master/testdata/stdvector.root.
The following in spark-shell successfully chooses the proper plugin
implementation based on the spark version:

spark.read.format("root").option("tree","tvec").load("stdvector.root")

Additionally, I did a very rough POC for spark2.4, which you can find
at https://github.com/PerilousApricot/spark/tree/feature/registerv2-24
. The same jar/inputfile works there as well.

Thanks again,
Andrew

On Wed, Apr 8, 2020 at 10:27 AM Andrew Melo  wrote:
>
> On Wed, Apr 8, 2020 at 8:35 AM Wenchen Fan  wrote:
> >
> > It would be good to support your use case, but I'm not sure how to 
> > accomplish that. Can you open a PR so that we can discuss it in detail? How 
> > can `public Class getImplementation();` be 
> > possible in 3.0 as there is no `DataSourceV2`?
>
> You're right, that was a typo. Since the whole point is to separate
> the (stable) registration interface from the (evolving) DSv2 API, it
> defeats the purpose to then directly reference the DSv2 API within the
> registration interface.
>
> I'll put together a PR.
>
> Thanks again,
> Andrew
>
> >
> > On Wed, Apr 8, 2020 at 1:12 PM Andrew Melo  wrote:
> >>
> >> Hello
> >>
> >> On Tue, Apr 7, 2020 at 23:16 Wenchen Fan  wrote:
> >>>
> >>> Are you going to provide a single artifact for Spark 2.4 and 3.0? I'm not 
> >>> sure this is possible as the DS V2 API is very different in 3.0, e.g. 
> >>> there is no `DataSourceV2` anymore, and you should implement 
> >>> `TableProvider` (if you don't have database/table).
> >>
> >>
> >> Correct, I've got a single jar for both Spark 2.4 and 3.0, with a toplevel 
> >> Root_v24 (implements DataSourceV2) and Root_v30 (implements 
> >> TableProvider). I can load this jar in a both pyspark 2.4 and 3.0 and it 
> >> works well -- as long as I remove the registration from META-INF and pass 
> >> in the full class name to the DataFrameReader.
> >>
> >> Thanks
> >> Andrew
> >>
> >>>
> >>> On Wed, Apr 8, 2020 at 6:58 AM Andrew Melo  wrote:
> >>>>
> >>>> Hi Ryan,
> >>>>
> >>>> On Tue, Apr 7, 2020 at 5:21 PM Ryan Blue  wrote:
> >>>> >
> >>>> > Hi Andrew,
> >>>> >
> >>>> > With DataSourceV2, I recommend plugging in a catalog instead of using 
> >>>> > DataSource. As you've noticed, the way that you plug in data sources 
> >>>> > isn't very flexible. That's one of the reasons why we changed the 
> >>>> > plugin system and made it possible to use named catalogs that load 
> >>>> > implementations based on configuration properties.
> >>>> >
> >>>> > I think it's fine to consider how to patch the registration trait, but 
> >>>> > I really don't recommend continuing to identify table implementations 
> >>>> > directly by name.
> >>>>
> >>>> Can you be a bit more concrete with what you mean by plugging a
> >>>> catalog instead of a DataSource? We have been using
> >>>> sc.read.format("root").load([list of paths]) which works well. Since
> >>>> we don't have a database or tables, I don't fully understand what's
> >>>> different between the two interfaces that would make us prefer one or
> >>>> another.
> >>>>
> >>>> That being said, WRT the registration trait, if I'm not misreading
> >>>> createTable() and friends, the "source" parameter is resolved the same
> >>>> way as DataFrameReader.format(), so a solution that helps out
> >>>> registration should help both interfaces.
> >>>>
> >>>> Thanks again,
> >>>> Andrew
> >>>>
> >>>> >
> >>>> > On Tue, Apr 7, 2020 at 12:26 PM Andrew Melo  
> >>>> > wrote:
> >>>> >>
> >>>> >> Hi all,
> >>>> >>
> >>>> >> I posted an improveme

Re: DSv2 & DataSourceRegister

2020-04-08 Thread Andrew Melo
On Wed, Apr 8, 2020 at 8:35 AM Wenchen Fan  wrote:
>
> It would be good to support your use case, but I'm not sure how to accomplish 
> that. Can you open a PR so that we can discuss it in detail? How can `public 
> Class getImplementation();` be possible in 3.0 as 
> there is no `DataSourceV2`?

You're right, that was a typo. Since the whole point is to separate
the (stable) registration interface from the (evolving) DSv2 API, it
defeats the purpose to then directly reference the DSv2 API within the
registration interface.

I'll put together a PR.

Thanks again,
Andrew

>
> On Wed, Apr 8, 2020 at 1:12 PM Andrew Melo  wrote:
>>
>> Hello
>>
>> On Tue, Apr 7, 2020 at 23:16 Wenchen Fan  wrote:
>>>
>>> Are you going to provide a single artifact for Spark 2.4 and 3.0? I'm not 
>>> sure this is possible as the DS V2 API is very different in 3.0, e.g. there 
>>> is no `DataSourceV2` anymore, and you should implement `TableProvider` (if 
>>> you don't have database/table).
>>
>>
>> Correct, I've got a single jar for both Spark 2.4 and 3.0, with a toplevel 
>> Root_v24 (implements DataSourceV2) and Root_v30 (implements TableProvider). 
>> I can load this jar in a both pyspark 2.4 and 3.0 and it works well -- as 
>> long as I remove the registration from META-INF and pass in the full class 
>> name to the DataFrameReader.
>>
>> Thanks
>> Andrew
>>
>>>
>>> On Wed, Apr 8, 2020 at 6:58 AM Andrew Melo  wrote:
>>>>
>>>> Hi Ryan,
>>>>
>>>> On Tue, Apr 7, 2020 at 5:21 PM Ryan Blue  wrote:
>>>> >
>>>> > Hi Andrew,
>>>> >
>>>> > With DataSourceV2, I recommend plugging in a catalog instead of using 
>>>> > DataSource. As you've noticed, the way that you plug in data sources 
>>>> > isn't very flexible. That's one of the reasons why we changed the plugin 
>>>> > system and made it possible to use named catalogs that load 
>>>> > implementations based on configuration properties.
>>>> >
>>>> > I think it's fine to consider how to patch the registration trait, but I 
>>>> > really don't recommend continuing to identify table implementations 
>>>> > directly by name.
>>>>
>>>> Can you be a bit more concrete with what you mean by plugging a
>>>> catalog instead of a DataSource? We have been using
>>>> sc.read.format("root").load([list of paths]) which works well. Since
>>>> we don't have a database or tables, I don't fully understand what's
>>>> different between the two interfaces that would make us prefer one or
>>>> another.
>>>>
>>>> That being said, WRT the registration trait, if I'm not misreading
>>>> createTable() and friends, the "source" parameter is resolved the same
>>>> way as DataFrameReader.format(), so a solution that helps out
>>>> registration should help both interfaces.
>>>>
>>>> Thanks again,
>>>> Andrew
>>>>
>>>> >
>>>> > On Tue, Apr 7, 2020 at 12:26 PM Andrew Melo  
>>>> > wrote:
>>>> >>
>>>> >> Hi all,
>>>> >>
>>>> >> I posted an improvement ticket in JIRA and Hyukjin Kwon requested I
>>>> >> send an email to the dev list for discussion.
>>>> >>
>>>> >> As the DSv2 API evolves, some breaking changes are occasionally made
>>>> >> to the API. It's possible to split a plugin into a "common" part and
>>>> >> multiple version-specific parts and this works OK to have a single
>>>> >> artifact for users, as long as they write out the fully qualified
>>>> >> classname as the DataFrame format(). The one part that can't be
>>>> >> currently worked around is the DataSourceRegister trait. Since classes
>>>> >> which implement DataSourceRegister must also implement DataSourceV2
>>>> >> (and its mixins), changes to those interfaces cause the ServiceLoader
>>>> >> to fail when it attempts to load the "wrong" DataSourceV2 class.
>>>> >> (there's also an additional prohibition against multiple
>>>> >> implementations having the same ShortName in
>>>> >> org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource).
>>>&g

Re: DSv2 & DataSourceRegister

2020-04-07 Thread Andrew Melo
Hello

On Tue, Apr 7, 2020 at 23:16 Wenchen Fan  wrote:

> Are you going to provide a single artifact for Spark 2.4 and 3.0? I'm not
> sure this is possible as the DS V2 API is very different in 3.0, e.g. there
> is no `DataSourceV2` anymore, and you should implement `TableProvider` (if
> you don't have database/table).
>

Correct, I've got a single jar for both Spark 2.4 and 3.0, with a toplevel
Root_v24 (implements DataSourceV2) and Root_v30 (implements TableProvider).
I can load this jar in a both pyspark 2.4 and 3.0 and it works well -- as
long as I remove the registration from META-INF and pass in the full class
name to the DataFrameReader.

Thanks
Andrew


> On Wed, Apr 8, 2020 at 6:58 AM Andrew Melo  wrote:
>
>> Hi Ryan,
>>
>> On Tue, Apr 7, 2020 at 5:21 PM Ryan Blue  wrote:
>> >
>> > Hi Andrew,
>> >
>> > With DataSourceV2, I recommend plugging in a catalog instead of using
>> DataSource. As you've noticed, the way that you plug in data sources isn't
>> very flexible. That's one of the reasons why we changed the plugin system
>> and made it possible to use named catalogs that load implementations based
>> on configuration properties.
>> >
>> > I think it's fine to consider how to patch the registration trait, but
>> I really don't recommend continuing to identify table implementations
>> directly by name.
>>
>> Can you be a bit more concrete with what you mean by plugging a
>> catalog instead of a DataSource? We have been using
>> sc.read.format("root").load([list of paths]) which works well. Since
>> we don't have a database or tables, I don't fully understand what's
>> different between the two interfaces that would make us prefer one or
>> another.
>>
>> That being said, WRT the registration trait, if I'm not misreading
>> createTable() and friends, the "source" parameter is resolved the same
>> way as DataFrameReader.format(), so a solution that helps out
>> registration should help both interfaces.
>>
>> Thanks again,
>> Andrew
>>
>> >
>> > On Tue, Apr 7, 2020 at 12:26 PM Andrew Melo 
>> wrote:
>> >>
>> >> Hi all,
>> >>
>> >> I posted an improvement ticket in JIRA and Hyukjin Kwon requested I
>> >> send an email to the dev list for discussion.
>> >>
>> >> As the DSv2 API evolves, some breaking changes are occasionally made
>> >> to the API. It's possible to split a plugin into a "common" part and
>> >> multiple version-specific parts and this works OK to have a single
>> >> artifact for users, as long as they write out the fully qualified
>> >> classname as the DataFrame format(). The one part that can't be
>> >> currently worked around is the DataSourceRegister trait. Since classes
>> >> which implement DataSourceRegister must also implement DataSourceV2
>> >> (and its mixins), changes to those interfaces cause the ServiceLoader
>> >> to fail when it attempts to load the "wrong" DataSourceV2 class.
>> >> (there's also an additional prohibition against multiple
>> >> implementations having the same ShortName in
>> >>
>> org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource).
>> >> This means users will need to update their notebooks/code/tutorials if
>> >> they run @ a different site whose cluster is a different version.
>> >>
>> >> To solve this, I proposed in SPARK-31363 a new trait who would
>> >> function the same as the existing DataSourceRegister trait, but adds
>> >> an additional method:
>> >>
>> >> public Class getImplementation();
>> >>
>> >> ...which will allow DSv2 plugins to dynamically choose the appropriate
>> >> DataSourceV2 class based on the runtime environment. This would let us
>> >> release a single artifact for different Spark versions and users could
>> >> use the same artifactID & format regardless of where they were
>> >> executing their code. If there was no services registered with this
>> >> new trait, the functionality would remain the same as before.
>> >>
>> >> I think this functionality will be useful as DSv2 continues to evolve,
>> >> please let me know your thoughts.
>> >>
>> >> Thanks
>> >> Andrew
>> >>
>> >> -
>> >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>> >>
>> >
>> >
>> > --
>> > Ryan Blue
>> > Software Engineer
>> > Netflix
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>


Re: DSv2 & DataSourceRegister

2020-04-07 Thread Andrew Melo
Hi Ryan,

On Tue, Apr 7, 2020 at 5:21 PM Ryan Blue  wrote:
>
> Hi Andrew,
>
> With DataSourceV2, I recommend plugging in a catalog instead of using 
> DataSource. As you've noticed, the way that you plug in data sources isn't 
> very flexible. That's one of the reasons why we changed the plugin system and 
> made it possible to use named catalogs that load implementations based on 
> configuration properties.
>
> I think it's fine to consider how to patch the registration trait, but I 
> really don't recommend continuing to identify table implementations directly 
> by name.

Can you be a bit more concrete with what you mean by plugging a
catalog instead of a DataSource? We have been using
sc.read.format("root").load([list of paths]) which works well. Since
we don't have a database or tables, I don't fully understand what's
different between the two interfaces that would make us prefer one or
another.

That being said, WRT the registration trait, if I'm not misreading
createTable() and friends, the "source" parameter is resolved the same
way as DataFrameReader.format(), so a solution that helps out
registration should help both interfaces.

Thanks again,
Andrew

>
> On Tue, Apr 7, 2020 at 12:26 PM Andrew Melo  wrote:
>>
>> Hi all,
>>
>> I posted an improvement ticket in JIRA and Hyukjin Kwon requested I
>> send an email to the dev list for discussion.
>>
>> As the DSv2 API evolves, some breaking changes are occasionally made
>> to the API. It's possible to split a plugin into a "common" part and
>> multiple version-specific parts and this works OK to have a single
>> artifact for users, as long as they write out the fully qualified
>> classname as the DataFrame format(). The one part that can't be
>> currently worked around is the DataSourceRegister trait. Since classes
>> which implement DataSourceRegister must also implement DataSourceV2
>> (and its mixins), changes to those interfaces cause the ServiceLoader
>> to fail when it attempts to load the "wrong" DataSourceV2 class.
>> (there's also an additional prohibition against multiple
>> implementations having the same ShortName in
>> org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource).
>> This means users will need to update their notebooks/code/tutorials if
>> they run @ a different site whose cluster is a different version.
>>
>> To solve this, I proposed in SPARK-31363 a new trait who would
>> function the same as the existing DataSourceRegister trait, but adds
>> an additional method:
>>
>> public Class getImplementation();
>>
>> ...which will allow DSv2 plugins to dynamically choose the appropriate
>> DataSourceV2 class based on the runtime environment. This would let us
>> release a single artifact for different Spark versions and users could
>> use the same artifactID & format regardless of where they were
>> executing their code. If there was no services registered with this
>> new trait, the functionality would remain the same as before.
>>
>> I think this functionality will be useful as DSv2 continues to evolve,
>> please let me know your thoughts.
>>
>> Thanks
>> Andrew
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



DSv2 & DataSourceRegister

2020-04-07 Thread Andrew Melo
Hi all,

I posted an improvement ticket in JIRA and Hyukjin Kwon requested I
send an email to the dev list for discussion.

As the DSv2 API evolves, some breaking changes are occasionally made
to the API. It's possible to split a plugin into a "common" part and
multiple version-specific parts and this works OK to have a single
artifact for users, as long as they write out the fully qualified
classname as the DataFrame format(). The one part that can't be
currently worked around is the DataSourceRegister trait. Since classes
which implement DataSourceRegister must also implement DataSourceV2
(and its mixins), changes to those interfaces cause the ServiceLoader
to fail when it attempts to load the "wrong" DataSourceV2 class.
(there's also an additional prohibition against multiple
implementations having the same ShortName in
org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource).
This means users will need to update their notebooks/code/tutorials if
they run @ a different site whose cluster is a different version.

To solve this, I proposed in SPARK-31363 a new trait who would
function the same as the existing DataSourceRegister trait, but adds
an additional method:

public Class getImplementation();

...which will allow DSv2 plugins to dynamically choose the appropriate
DataSourceV2 class based on the runtime environment. This would let us
release a single artifact for different Spark versions and users could
use the same artifactID & format regardless of where they were
executing their code. If there was no services registered with this
new trait, the functionality would remain the same as before.

I think this functionality will be useful as DSv2 continues to evolve,
please let me know your thoughts.

Thanks
Andrew

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Scala version compatibility

2020-04-06 Thread Andrew Melo
Hello,

On Mon, Apr 6, 2020 at 3:31 PM Koert Kuipers  wrote:

> actually i might be wrong about this. did you declare scala to be a
> provided dependency? so scala is not in your fat/uber jar? if so then maybe
> it will work.
>

I declare spark to be a provided dependency, so Scala's not included in my
artifact except for this single callsite.

Thanks
Andrew


> On Mon, Apr 6, 2020 at 4:16 PM Andrew Melo  wrote:
>
>>
>>
>> On Mon, Apr 6, 2020 at 3:08 PM Koert Kuipers  wrote:
>>
>>> yes it will
>>>
>>>
>> Ooof, I was hoping that wasn't the case. I guess I need to figure out how
>> to get Maven to compile/publish jars with different
>> dependencies/artifactIDs like how sbt does? (or re-implement the
>> functionality in java)
>>
>> Thanks for your help,
>> Andrew
>>
>>
>>> On Mon, Apr 6, 2020 at 3:50 PM Andrew Melo 
>>> wrote:
>>>
>>>> Hello all,
>>>>
>>>> I'm aware that Scala is not binary compatible between revisions. I have
>>>> some Java code whose only Scala dependency is the transitive dependency
>>>> through Spark. This code calls a Spark API which returns a Seq, which
>>>> I then convert into a List with
>>>> JavaConverters.seqAsJavaListConverter. Will this usage cause binary
>>>> incompatibility if the jar is compiled in one Scala version and executed in
>>>> another?
>>>>
>>>> I tried grokking
>>>> https://docs.scala-lang.org/overviews/core/binary-compatibility-of-scala-releases.html,
>>>> and wasn't quite able to make heads or tails of this particular case.
>>>>
>>>> Thanks!
>>>> Andrew
>>>>
>>>>
>>>>


Re: Scala version compatibility

2020-04-06 Thread Andrew Melo
On Mon, Apr 6, 2020 at 3:08 PM Koert Kuipers  wrote:

> yes it will
>
>
Ooof, I was hoping that wasn't the case. I guess I need to figure out how
to get Maven to compile/publish jars with different
dependencies/artifactIDs like how sbt does? (or re-implement the
functionality in java)

Thanks for your help,
Andrew


> On Mon, Apr 6, 2020 at 3:50 PM Andrew Melo  wrote:
>
>> Hello all,
>>
>> I'm aware that Scala is not binary compatible between revisions. I have
>> some Java code whose only Scala dependency is the transitive dependency
>> through Spark. This code calls a Spark API which returns a Seq, which
>> I then convert into a List with
>> JavaConverters.seqAsJavaListConverter. Will this usage cause binary
>> incompatibility if the jar is compiled in one Scala version and executed in
>> another?
>>
>> I tried grokking
>> https://docs.scala-lang.org/overviews/core/binary-compatibility-of-scala-releases.html,
>> and wasn't quite able to make heads or tails of this particular case.
>>
>> Thanks!
>> Andrew
>>
>>
>>


Scala version compatibility

2020-04-06 Thread Andrew Melo
Hello all,

I'm aware that Scala is not binary compatible between revisions. I have
some Java code whose only Scala dependency is the transitive dependency
through Spark. This code calls a Spark API which returns a Seq, which
I then convert into a List with
JavaConverters.seqAsJavaListConverter. Will this usage cause binary
incompatibility if the jar is compiled in one Scala version and executed in
another?

I tried grokking
https://docs.scala-lang.org/overviews/core/binary-compatibility-of-scala-releases.html,
and wasn't quite able to make heads or tails of this particular case.

Thanks!
Andrew


Optimizing LIMIT in DSv2

2020-03-30 Thread Andrew Melo
Hello,

Executing "SELECT Muon_Pt FROM rootDF LIMIT 10", where "rootDF" is a temp
view backed by a DSv2 reader yields the attached plan [1]. It appears that
the initial stage is run over every partition in rootDF, even though each
partition has 200k rows (modulo the last partition which holds the
remainder of rows in a file).

Is there some sort of hinting that can done from the datasource side to
better inform the optimizer or, alternately, am I missing an interface in
the PushDown filters that would let me elide transferring/decompressing
unnecessary partitions?

Thanks!
Andrew

[1]

== Parsed Logical Plan ==
'GlobalLimit 10
+- 'LocalLimit 10
   +- 'Project ['Muon_pt]
  +- 'UnresolvedRelation `rootDF`

== Analyzed Logical Plan ==
Muon_pt: array
GlobalLimit 10
+- LocalLimit 10
   +- Project [Muon_pt#119]
  +- SubqueryAlias `rootdf`
 +- RelationV2 root[run#0L, luminosityBlock#1L, event#2L,
CaloMET_phi#3, CaloMET_pt#4, CaloMET_sumEt#5, nElectron#6,
Electron_deltaEtaSC#7, Electron_dr03EcalRecHitSumEt#8,
Electron_dr03HcalDepth1TowerSumEt#9, Electron_dr03TkSumPt#10,
Electron_dxy#11, Electron_dxyErr#12, Electron_dz#13, Electron_dzErr#14,
Electron_eCorr#15, Electron_eInvMinusPInv#16, Electron_energyErr#17,
Electron_eta#18, Electron_hoe#19, Electron_ip3d#20, Electron_mass#21,
Electron_miniPFRelIso_all#22, Electron_miniPFRelIso_chg#23, ... 787 more
fields] (Options: [tree=Events,paths=["hdfs://cmshdfs/store/data"]])

== Optimized Logical Plan ==
GlobalLimit 10
+- LocalLimit 10
   +- Project [Muon_pt#119]
  +- RelationV2 root[run#0L, luminosityBlock#1L, event#2L,
CaloMET_phi#3, CaloMET_pt#4, CaloMET_sumEt#5, nElectron#6,
Electron_deltaEtaSC#7, Electron_dr03EcalRecHitSumEt#8,
Electron_dr03HcalDepth1TowerSumEt#9, Electron_dr03TkSumPt#10,
Electron_dxy#11, Electron_dxyErr#12, Electron_dz#13, Electron_dzErr#14,
Electron_eCorr#15, Electron_eInvMinusPInv#16, Electron_energyErr#17,
Electron_eta#18, Electron_hoe#19, Electron_ip3d#20, Electron_mass#21,
Electron_miniPFRelIso_all#22, Electron_miniPFRelIso_chg#23, ... 787 more
fields] (Options: [tree=Events,paths=["hdfs://cmshdfs/store/data"]])

== Physical Plan ==
CollectLimit 10
+- *(1) Project [Muon_pt#119]
   +- *(1) ScanV2 root[Muon_pt#119] (Options:
[tree=Events,paths=["hdfs://cmshdfs/store/data"]])


Supporting Kryo registration in DSv2

2020-03-26 Thread Andrew Melo
Hello all,

Is there a way to register classes within a datasourcev2 implementation in
the Kryo serializer?

I've attempted the following in both the constructor and static block of my
toplevel class:

SparkContext context = SparkContext.getOrCreate();
SparkConf conf = context.getConf();
Class[] classesRegistered = new Class[] {
edu.vanderbilt.accre.laurelin.spark_ttree.Reader.class,
edu.vanderbilt.accre.laurelin.spark_ttree.Partition.class,
edu.vanderbilt.accre.laurelin.spark_ttree.SlimTBranch.class
};
conf.registerKryoClasses(classesRegistered);

But (if I'm reading correctly), this is too late, since the config has
already been parsed while initializing the SparkContext, adding classes to
the SparkConf has no effect. From what I can tell, the kryo instance behind
is private, so I can't add the registration manually either.

Any thoughts?
Thanks
Andrew


Re: [DISCUSS] Remove multiple workers on the same host support from Standalone backend

2020-03-14 Thread Andrew Melo
Hi Sean

On Fri, Mar 13, 2020 at 6:46 PM Sean Owen  wrote:

> Do you really need a new cluster per user? and if so, why specify N
> workers > M machines? I am not seeing a need for that. I don't even
> think 2 workers on the same host makes sense, as they are both
> managing the same resources; it only exists for test purposes AFAICT.
>

Sorry, I'm from a completely different field, so I've inherited a
completely different vocabulary. So thanks for bearing with me :)

I think from reading your response, maybe the confusion is that HTCondor is
a completely different resource acquisition model than what industry is
familiar with. Unlike AWS that gives you a whole VM or k8s that gives you a
whole container, condor (and most other batch schedulers) split up a single
bare machine that your job shares with whatever else is on that machine.
You don't get your own machine or even the illusion you have your own
machine (via containerization).

Using these schedulers it's not that you ask for N workers when there's
only M machines, you request N x 8core slots when there are M cores
available, and the scheduler packs them wherever there's free resources.

> What you are trying to do sounds like one cluster, not many. JVMs

> can't be shared across users; JVM = executor. But that's a good thing,
> or else there would be all kinds of collisions.


> What pools are you referring to?


If you're talking about the 2nd half, let's say I'm running two pyspark
notebooks connected to the system above, and batch scheduler gives each of
them 2 cores of slaves. Each notebook will have their own set (which I
called a pool earlier) of slaves, so when you're working in one notebook,
the other notebook of slaves is idle. My comment was about the resources
being idle and the desire to increase utillzation.

Thanks
Andrew

Sean
>
> On Fri, Mar 13, 2020 at 6:33 PM Andrew Melo  wrote:
> >
> > Hi Xingbo, Sean,
> >
> > On Fri, Mar 13, 2020 at 12:31 PM Xingbo Jiang 
> wrote:
> >>
> >> Andrew, could you provide more context of your use case please? Is it
> like you deploy homogeneous containers on hosts with available resources,
> and each container launches one worker? Or you deploy workers directly on
> hosts thus you could have multiple workers from the same application on the
> same host?
> >
> >
> > Sure, I describe a bit more detail about the actual workload below [*],
> but the short version is that our computing resources/infrastructure are
> all built around batch submission into (usually) the HTCondor scheduler,
> and we've got a PoC using pyspark to replace the interactive portion of
> data analysis. To run pyspark on our main resources, we use some scripts
> around the standalone mode to spin up N slaves per-user**, which may or may
> not end up on the same host. I understood Xingbo's original mail to mean
> that wouldn't be allowed in the future, but from Sean's response, it seems
> like I'm incorrect.
> >
> > That being said, our use-case is very bursty, and it would be very good
> if there was a way we could have one pool of JVMs that could be shared
> between N different concurrent users instead of having N different pools of
> JVMs that each serve one person. We're already resource constrained, and
> we're expecting our data rates to increase 10x in 2026, so the less idle
> CPU, the better for us.
> >
> > Andrew
> >
> > * I work for one of the LHC experiments at CERN (https://cms.cern/) and
> there's two main "phases" of our data pipeline: production and analysis.
> The analysis half is currently implemented by having users writing some
> software, splitting the input dataset(s) into N parts and then submitting
> those jobs to the batch system (combining the outputs in a manual
> postprocessing step). In terms of scale, there are currently ~100 users
> running ~900 tasks over ~50k cpus. The use case relevant to this context is
> the terminal analysis phase which involves calculating some additional
> columns, applying calibrations, filtering out the 'interesting' events and
> extracting histograms describing those events. Conceptually, it's an
> iterative process of "extract plots, look at plots, change parameters", but
> running in a batch system means the latency is bad, so it can take a long
> time to converge to the right set of params.
> >
> > ** though we have much smaller, dedicated k8s/mesos/yarn clusters we use
> for prototyping
> >
> >>
> >> Thanks,
> >>
> >> Xingbo
> >>
> >> On Fri, Mar 13, 2020 at 10:23 AM Sean Owen  wrote:
> >>>
> >>> You have multiple workers in one Spark (standalone) app? this wouldn't
> >>> prevent N apps from each having a worker on a m

Re: [DISCUSS] Remove multiple workers on the same host support from Standalone backend

2020-03-13 Thread Andrew Melo
Hi Xingbo, Sean,

On Fri, Mar 13, 2020 at 12:31 PM Xingbo Jiang  wrote:

> Andrew, could you provide more context of your use case please? Is it like
> you deploy homogeneous containers on hosts with available resources, and
> each container launches one worker? Or you deploy workers directly on hosts
> thus you could have multiple workers from the same application on the same
> host?
>

Sure, I describe a bit more detail about the actual workload below [*], but
the short version is that our computing resources/infrastructure are all
built around batch submission into (usually) the HTCondor scheduler, and
we've got a PoC using pyspark to replace the interactive portion of data
analysis. To run pyspark on our main resources, we use some scripts around
the standalone mode to spin up N slaves per-user**, which may or may not
end up on the same host. I understood Xingbo's original mail to mean that
wouldn't be allowed in the future, but from Sean's response, it seems like
I'm incorrect.

That being said, our use-case is very bursty, and it would be very good if
there was a way we could have one pool of JVMs that could be shared between
N different concurrent users instead of having N different pools of JVMs
that each serve one person. We're already resource constrained, and we're
expecting our data rates to increase 10x in 2026, so the less idle CPU, the
better for us.

Andrew

* I work for one of the LHC experiments at CERN (https://cms.cern/)
and there's two main "phases" of our data pipeline: production and
analysis. The analysis half is currently implemented by having users
writing some software, splitting the input dataset(s) into N parts and then
submitting those jobs to the batch system (combining the outputs in a
manual postprocessing step). In terms of scale, there are currently ~100
users running ~900 tasks over ~50k cpus. The use case relevant to this
context is the terminal analysis phase which involves calculating some
additional columns, applying calibrations, filtering out the 'interesting'
events and extracting histograms describing those events. Conceptually,
it's an iterative process of "extract plots, look at plots, change
parameters", but running in a batch system means the latency is bad, so it
can take a long time to converge to the right set of params.

** though we have much smaller, dedicated k8s/mesos/yarn clusters we use
for prototyping


> Thanks,
>
> Xingbo
>
> On Fri, Mar 13, 2020 at 10:23 AM Sean Owen  wrote:
>
>> You have multiple workers in one Spark (standalone) app? this wouldn't
>> prevent N apps from each having a worker on a machine.
>>
>> On Fri, Mar 13, 2020 at 11:51 AM Andrew Melo 
>> wrote:
>> >
>> > Hello,
>> >
>> > On Fri, Feb 28, 2020 at 13:21 Xingbo Jiang 
>> wrote:
>> >>
>> >> Hi all,
>> >>
>> >> Based on my experience, there is no scenario that necessarily requires
>> deploying multiple Workers on the same node with Standalone backend. A
>> worker should book all the resources reserved to Spark on the host it is
>> launched, then it can allocate those resources to one or more executors
>> launched by this worker. Since each executor runs in a separated JVM, we
>> can limit the memory of each executor to avoid long GC pause.
>> >>
>> >> The remaining concern is the local-cluster mode is implemented by
>> launching multiple workers on the local host, we might need to re-implement
>> LocalSparkCluster to launch only one Worker and multiple executors. It
>> should be fine because local-cluster mode is only used in running Spark
>> unit test cases, thus end users should not be affected by this change.
>> >>
>> >> Removing multiple workers on the same host support could simplify the
>> deploy model of Standalone backend, and also reduce the burden to support
>> legacy deploy pattern in the future feature developments. (There is an
>> example in https://issues.apache.org/jira/browse/SPARK-27371 , where we
>> designed a complex approach to coordinate resource requirements from
>> different workers launched on the same host).
>> >>
>> >> The proposal is to update the document to deprecate the support of
>> system environment `SPARK_WORKER_INSTANCES` in Spark 3.0, and remove the
>> support in the next major version (Spark 3.1).
>> >>
>> >> Please kindly let me know if you have use cases relying on this
>> feature.
>> >
>> >
>> > When deploying spark on batch systems (by wrapping the standalone
>> deployment in scripts that can be consumed by the batch scheduler), we
>> typically end up with >1 worker per host. If I understand correctly, this
>> proposal would make our use case unsupported.
>> >
>> > Thanks,
>> > Andrew
>> >
>> >
>> >
>> >>
>> >> Thanks!
>> >>
>> >> Xingbo
>> >
>> > --
>> > It's dark in this basement.
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>


Re: [DISCUSS] Remove multiple workers on the same host support from Standalone backend

2020-03-13 Thread Andrew Melo
Hello,

On Fri, Feb 28, 2020 at 13:21 Xingbo Jiang  wrote:

> Hi all,
>
> Based on my experience, there is no scenario that necessarily requires
> deploying multiple Workers on the same node with Standalone backend. A
> worker should book all the resources reserved to Spark on the host it is
> launched, then it can allocate those resources to one or more executors
> launched by this worker. Since each executor runs in a separated JVM, we
> can limit the memory of each executor to avoid long GC pause.
>
> The remaining concern is the local-cluster mode is implemented by
> launching multiple workers on the local host, we might need to re-implement
> LocalSparkCluster to launch only one Worker and multiple executors. It
> should be fine because local-cluster mode is only used in running Spark
> unit test cases, thus end users should not be affected by this change.
>
> Removing multiple workers on the same host support could simplify the
> deploy model of Standalone backend, and also reduce the burden to support
> legacy deploy pattern in the future feature developments. (There is an
> example in https://issues.apache.org/jira/browse/SPARK-27371 , where we
> designed a complex approach to coordinate resource requirements from
> different workers launched on the same host).
>
> The proposal is to update the document to deprecate the support of system
> environment `SPARK_WORKER_INSTANCES` in Spark 3.0, and remove the support
> in the next major version (Spark 3.1).
>
> Please kindly let me know if you have use cases relying on this feature.
>

When deploying spark on batch systems (by wrapping the standalone
deployment in scripts that can be consumed by the batch scheduler), we
typically end up with >1 worker per host. If I understand correctly, this
proposal would make our use case unsupported.

Thanks,
Andrew




> Thanks!
>
> Xingbo
>
-- 
It's dark in this basement.


Re: (java) Producing an in-memory Arrow buffer from a file

2020-01-24 Thread Andrew Melo
Hi Micah,

On Fri, Jan 24, 2020 at 6:17 AM Micah Kornfield 
wrote:

> Hi Andrew,
> It might help to provide a little more detail on where you are starting
> from and what you want to do once you have the data in arrow format.
>

Of course! Like I mentioned, particle physics data is processed in ROOT,
which is a whole-stack solution -- from file I/O all the way up to plotting
routines. There are a few different groups working on adopting non-physics
tools like Spark or the scientific python ecosystem to process these data
(so, still reading ROOT files, but doing the higher level interaction with
different applications). I want to analyze these data with Spark, so I've
implemented a (java-based) Spark DataSource which reads ROOT files. Some of
my colleagues are experimenting with Kafka and were wondering if the same
code could be re-used for them (they would like to put ROOT data into kafka
topics, as I understand it).

Currently, I parse the ROOT metadata to find where the value/offset buffers
are within the file, then decompress the buffers and store them in an
object hierarchy which I then use to implement the Spark API. I'd like to
replace the intermediate object hierarchy with Arrow because

1) I could re-use the existing Spark code[1] to do the trudgework of
extracting values from the buffers. That code is ~25% of my codebase
2) Adapting this code for different java-based applications becomes quite a
bit easier. For example, Kafka supports Arrow-based sources, so adding
kafka support would be relatively straightforward.


>
>  If you have the data already available in some sort of off-heap
> datastructure you can potentially avoid copies wrap with the existing
> ArrowBuf machinery [1].  If you have an iterator over the data you can also
> directly build a ListVector [2].
>

I have the data stored in a heirarchy that is roughly table->columns->row
ranges->ByteBuffer, so I presume ArrowBuf is the right direction. Since
each column's row range is stored and compressed separately, I could
decompress them directly into an ArrowBuf (?) and then skip having to
iterate over the values.


>
> Depending on your end goal, you might want to stream the values through a
> VectorSchemaRoot instead.
>

It appears (?) that this option also involves iterating over all the values


>
> There was some documentation written that will be published with the next
> release that gives an overview of the Java libraries [3] that might be
> helpful.
>
>
I'll take a look at that, thanks!

Looking at your examples and thinking about it conceptually, is there much
of a difference between constructing a large ByteBuffer (or ArrowBuf) with
the various messages inside it, and handing that to Arrow to parse or
building the java-object-representation myself?

Thanks for your patience,
Andrew

[1]
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java


> Cheers,
> Micah
>
> [1]
> https://javadoc.io/static/org.apache.arrow/arrow-memory/0.15.1/io/netty/buffer/ArrowBuf.html
> [2]
> https://github.com/apache/arrow/blob/master/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java
> [3] https://github.com/apache/arrow/tree/master/docs/source/java
>
> On Thu, Jan 23, 2020 at 5:02 AM Andrew Melo  wrote:
>
>> Hello all,
>>
>> I work in particle physics, which has standardized on the ROOT (
>> http://root.cern) file format to store/process our data. The format
>> itself is quite complicated, but the relevant part here is that after
>> parsing/decompression, we end up with value and offset buffers holding our
>> data.
>>
>> What I'd like to do is represent these data in-memory in the Arrow
>> format. I've written a very rough POC where I manually put an Arrow stream
>> into a ByteBuffer, then replaced the values and offset buffers with the
>> bytes from my files., and I'm wondering what's the "proper" way to do this
>> is. From my reading of the code, it appears (?) that what I want to do is
>> produce a org.apache.arrow.vector.types.pojo.Schema object, and N
>> ArrowRecordBatch objects, then use MessageSerializer to stick them into a
>> ByteBuffer one after each other.
>>
>> Is this correct? Or, is there another API I'm missing?
>>
>> Thanks!
>> Andrew
>>
>


(java) Producing an in-memory Arrow buffer from a file

2020-01-23 Thread Andrew Melo
Hello all,

I work in particle physics, which has standardized on the ROOT (
http://root.cern) file format to store/process our data. The format itself
is quite complicated, but the relevant part here is that after
parsing/decompression, we end up with value and offset buffers holding our
data.

What I'd like to do is represent these data in-memory in the Arrow format.
I've written a very rough POC where I manually put an Arrow stream into a
ByteBuffer, then replaced the values and offset buffers with the bytes from
my files., and I'm wondering what's the "proper" way to do this is. From my
reading of the code, it appears (?) that what I want to do is produce a
org.apache.arrow.vector.types.pojo.Schema object, and N ArrowRecordBatch
objects, then use MessageSerializer to stick them into a ByteBuffer one
after each other.

Is this correct? Or, is there another API I'm missing?

Thanks!
Andrew


Re: Reading 7z file in spark

2020-01-14 Thread Andrew Melo
It only makes sense if the underlying file is also splittable, and even
then, it doesn't really do anything for you if you don't explicitly tell
spark about the split boundaries

On Tue, Jan 14, 2020 at 7:36 PM Someshwar Kale  wrote:

> I would suggest to use other compression technique which is splittable for
> eg. Bzip2, lzo, lz4.
>
> On Wed, Jan 15, 2020, 1:32 AM Enrico Minack 
> wrote:
>
>> Hi,
>>
>> Spark does not support 7z natively, but you can read any file in Spark:
>>
>> def read(stream: PortableDataStream): Iterator[String] = { 
>> Seq(stream.getPath()).iterator }
>>
>> spark.sparkContext
>>   .binaryFiles("*.7z")
>>   .flatMap(file => read(file._2))
>>   .toDF("path")
>>   .show(false)
>>
>> This scales with the number of files. A single large 7z file would not
>> scale well (a single partition).
>>
>> Any file that matches *.7z will be loaded via the read(stream:
>> PortableDataStream) method, which returns an iterator over the rows.
>> This method is executed on the executor and can implement the 7z specific
>> code, which is independent of Spark and should not be too hard (here it
>> does not open the input stream but returns the path only).
>>
>> If you are planning to read the same files more than once, then it would
>> be worth to first uncompress and convert them into files Spark supports.
>> Then Spark can scale much better.
>>
>> Regards,
>> Enrico
>>
>>
>> Am 13.01.20 um 13:31 schrieb HARSH TAKKAR:
>>
>> Hi,
>>
>>
>> Is it possible to read 7z compressed file in spark?
>>
>>
>> Kind Regards
>> Harsh Takkar
>>
>>
>>


Re: how to get partition column info in Data Source V2 writer

2019-12-17 Thread Andrew Melo
Hi Aakash

On Tue, Dec 17, 2019 at 12:42 PM aakash aakash 
wrote:

> Hi Spark dev folks,
>
> First of all kudos on this new Data Source v2, API looks simple and it
> makes easy to develop a new data source and use it.
>
> With my current work, I am trying to implement a new data source V2 writer
> with Spark 2.3 and I was wondering how I will get the info about partition
> by columns. I see that it has been passed to Data Source V1 from
> DataFrameWriter but not for V2.
>

Not directly related to your Q, but just so you're aware, the DSv2 API
evolved from 2.3->2.4 and then again for 2.4->3.0.

Cheers
Andrew


>
>
> Thanks,
> Aakash
>


Re: DSv2 reader lifecycle

2019-11-06 Thread Andrew Melo
Hi Ryan,

Thanks for the pointers

On Thu, Nov 7, 2019 at 8:13 AM Ryan Blue  wrote:

> Hi Andrew,
>
> This is expected behavior for DSv2 in 2.4. A separate reader is configured
> for each operation because the configuration will change. A count, for
> example, doesn't need to project any columns, but a count distinct will.
> Similarly, if your read has different filters we need to apply those to a
> separate reader for each scan.
>

Ah, I presumed that the interaction was slightly different, there was a
single reader configured and (e.g.) pruneSchema was called repeatedly to
change the desired output schema. I guess for 2.4 it's best for me to
cache/memoize the metadata for paths/files to keep them from being
repeatedly calculated.


>
> The newer API that we are releasing in Spark 3.0 addresses the concern
> that each reader is independent by using Catalog and Table interfaces. In
> the new version, Spark will load a table by name from a persistent catalog
> (loaded once) and use the table to create a reader for each operation. That
> way, you can load common metadata in the table, cache the table, and pass
> its info to readers as they are created.
>

That's good to know, I'll search around JIRA for docs describing that
functionality.

Thanks again,
Andrew


>
> rb
>
> On Tue, Nov 5, 2019 at 4:58 PM Andrew Melo  wrote:
>
>> Hello,
>>
>> During testing of our DSv2 implementation (on 2.4.3 FWIW), it appears
>> that our DataSourceReader is being instantiated multiple times for the same
>> dataframe. For example, the following snippet
>>
>> Dataset df = spark
>> .read()
>> .format("edu.vanderbilt.accre.laurelin.Root")
>> .option("tree",  "Events")
>> .load("testdata/pristine/2018nanoaod1june2019.root");
>>
>> Constructs edu.vanderbilt.accre.laurelin.Root twice and then calls
>> createReader once (as an aside, this seems like a lot for 1000 columns?
>> "CodeGenerator: Code generated in 8162.847517 ms")
>>
>> but then running operations on that dataframe (e.g. df.count()) calls
>> createReader for each call, instead of holding the existing
>> DataSourceReader.
>>
>> Is that the expected behavior? Because of the file format, it's quite
>> expensive to deserialize all the various metadata, so I was holding the
>> deserialized version in the DataSourceReader, but if Spark is repeatedly
>> constructing new ones, then that doesn't help. If this is the expected
>> behavior, how should I handle this as a consumer of the API?
>>
>> Thanks!
>> Andrew
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


DSv2 reader lifecycle

2019-11-05 Thread Andrew Melo
Hello,

During testing of our DSv2 implementation (on 2.4.3 FWIW), it appears that
our DataSourceReader is being instantiated multiple times for the same
dataframe. For example, the following snippet

Dataset df = spark
.read()
.format("edu.vanderbilt.accre.laurelin.Root")
.option("tree",  "Events")
.load("testdata/pristine/2018nanoaod1june2019.root");

Constructs edu.vanderbilt.accre.laurelin.Root twice and then calls
createReader once (as an aside, this seems like a lot for 1000 columns?
"CodeGenerator: Code generated in 8162.847517 ms")

but then running operations on that dataframe (e.g. df.count()) calls
createReader for each call, instead of holding the existing
DataSourceReader.

Is that the expected behavior? Because of the file format, it's quite
expensive to deserialize all the various metadata, so I was holding the
deserialized version in the DataSourceReader, but if Spark is repeatedly
constructing new ones, then that doesn't help. If this is the expected
behavior, how should I handle this as a consumer of the API?

Thanks!
Andrew


Re: Exposing functions to pyspark

2019-10-08 Thread Andrew Melo
Hello again,

Is it possible to grab a handle to the underlying DataSourceReader
backing a DataFrame? I see that there's no nice way to add extra
methods to Dataset, so being able to grab the DataSource backing
the dataframe would be a good escape hatch.

Cheers
Andrew

On Mon, Sep 30, 2019 at 3:48 PM Andrew Melo  wrote:
>
> Hello,
>
> I'm working on a DSv2 implementation with a userbase that is 100% pyspark 
> based.
>
> There's some interesting additional DS-level functionality I'd like to
> expose from the Java side to pyspark -- e.g. I/O metrics, which source
> site provided the data, etc...
>
> Does someone have an example of how to expose that to pyspark? We
> provide a python library for scientists to use, so I can also provide
> the python half, I just don't know where to begin. Part of the mental
> issue I'm having is that when a user does the following in pyspark:
>
> df = spark.read.format('edu.vanderbilt.accre.laurelin.Root') \
> .option("tree", "tree") \
> .load('small-flat-tree.root')
>
> They don't have a reference to any of my DS objects -- "df" is a
> DataFrame object, which I don't own.
>
> Does anyone have a tip?
> Thanks
> Andrew

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Driver vs master

2019-10-07 Thread Andrew Melo
On Mon, Oct 7, 2019 at 20:49 ayan guha  wrote:

> HI
>
> I think you are mixing terminologies here. Loosely speaking, Master
> manages worker machines. Each worker machine can run one or more processes.
> A process can be a driver or executor. You submit applications to the
> master. Each application will have driver and executors. Master will decide
> where to put each of them. In cluster mode, master will distribute the
> drivers across the cluster. In client mode, master will try to run the
> driver processes within master's own process. You can launch multiple
> master processes as well and use them for a set of applications - this
> happens when you use YARN. I am not sure how Mesos or K8 works in that
> score though.
>

Right, that's why I initially had the caveat  "This depends on what
master/deploy mode you're using: if it's "local" master and "client mode"
then yes tasks execute in the same JVM as the driver".

The answer depends on the exact setup Amit has and how the application is
configured


> HTH...
>
> Ayan
>
>
>
> On Tue, Oct 8, 2019 at 12:11 PM Andrew Melo  wrote:
>
>>
>>
>> On Mon, Oct 7, 2019 at 19:20 Amit Sharma  wrote:
>>
>>> Thanks Andrew but I am asking specific to driver memory not about
>>> executors memory. We have just one master and if each jobs driver.memory=4g
>>> and master nodes total memory is 16gb then we can not execute more than 4
>>> jobs at a time.
>>
>>
>> I understand that. I think there's a misunderstanding with the
>> terminology, though. Are you running multiple separate spark instances on a
>> single machine or one instance with multiple jobs inside.
>>
>>
>>>
>>> On Monday, October 7, 2019, Andrew Melo  wrote:
>>>
>>>> Hi Amit
>>>>
>>>> On Mon, Oct 7, 2019 at 18:33 Amit Sharma  wrote:
>>>>
>>>>> Can you please help me understand this. I believe driver programs runs
>>>>> on master node
>>>>
>>>> If we are running 4 spark job and driver memory config is 4g then total
>>>>> 16 6b would be used of master node.
>>>>
>>>>
>>>> This depends on what master/deploy mode you're using: if it's "local"
>>>> master and "client mode" then yes tasks execute in the same JVM as the
>>>> driver. In this case though, the driver JVM uses whatever much space is
>>>> allocated for the driver regardless of how many threads you have.
>>>>
>>>>
>>>> So if we will run more jobs then we need more memory on master. Please
>>>>> correct me if I am wrong.
>>>>>
>>>>
>>>> This depends on your application, but in general more threads will
>>>> require more memory.
>>>>
>>>>
>>>>
>>>>>
>>>>> Thanks
>>>>> Amit
>>>>>
>>>> --
>>>> It's dark in this basement.
>>>>
>>> --
>> It's dark in this basement.
>>
>
>
> --
> Best Regards,
> Ayan Guha
>
-- 
It's dark in this basement.


Re: Driver vs master

2019-10-07 Thread Andrew Melo
Hi

On Mon, Oct 7, 2019 at 19:20 Amit Sharma  wrote:

> Thanks Andrew but I am asking specific to driver memory not about
> executors memory. We have just one master and if each jobs driver.memory=4g
> and master nodes total memory is 16gb then we can not execute more than 4
> jobs at a time.


I understand that. I think there's a misunderstanding with the terminology,
though. Are you running multiple separate spark instances on a single
machine or one instance with multiple jobs inside.


>
> On Monday, October 7, 2019, Andrew Melo  wrote:
>
>> Hi Amit
>>
>> On Mon, Oct 7, 2019 at 18:33 Amit Sharma  wrote:
>>
>>> Can you please help me understand this. I believe driver programs runs
>>> on master node
>>
>> If we are running 4 spark job and driver memory config is 4g then total
>>> 16 6b would be used of master node.
>>
>>
>> This depends on what master/deploy mode you're using: if it's "local"
>> master and "client mode" then yes tasks execute in the same JVM as the
>> driver. In this case though, the driver JVM uses whatever much space is
>> allocated for the driver regardless of how many threads you have.
>>
>>
>> So if we will run more jobs then we need more memory on master. Please
>>> correct me if I am wrong.
>>>
>>
>> This depends on your application, but in general more threads will
>> require more memory.
>>
>>
>>
>>>
>>> Thanks
>>> Amit
>>>
>> --
>> It's dark in this basement.
>>
> --
It's dark in this basement.


Re: Driver vs master

2019-10-07 Thread Andrew Melo
Hi Amit

On Mon, Oct 7, 2019 at 18:33 Amit Sharma  wrote:

> Can you please help me understand this. I believe driver programs runs on
> master node

If we are running 4 spark job and driver memory config is 4g then total 16
> 6b would be used of master node.


This depends on what master/deploy mode you're using: if it's "local"
master and "client mode" then yes tasks execute in the same JVM as the
driver. In this case though, the driver JVM uses whatever much space is
allocated for the driver regardless of how many threads you have.


So if we will run more jobs then we need more memory on master. Please
> correct me if I am wrong.
>

This depends on your application, but in general more threads will require
more memory.



>
> Thanks
> Amit
>
-- 
It's dark in this basement.


Exposing functions to pyspark

2019-09-30 Thread Andrew Melo
Hello,

I'm working on a DSv2 implementation with a userbase that is 100% pyspark based.

There's some interesting additional DS-level functionality I'd like to
expose from the Java side to pyspark -- e.g. I/O metrics, which source
site provided the data, etc...

Does someone have an example of how to expose that to pyspark? We
provide a python library for scientists to use, so I can also provide
the python half, I just don't know where to begin. Part of the mental
issue I'm having is that when a user does the following in pyspark:

df = spark.read.format('edu.vanderbilt.accre.laurelin.Root') \
.option("tree", "tree") \
.load('small-flat-tree.root')

They don't have a reference to any of my DS objects -- "df" is a
DataFrame object, which I don't own.

Does anyone have a tip?
Thanks
Andrew

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Thoughts on Spark 3 release, or a preview release

2019-09-13 Thread Andrew Melo
Hi Spark Aficionados-

On Fri, Sep 13, 2019 at 15:08 Ryan Blue  wrote:

> +1 for a preview release.
>
> DSv2 is quite close to being ready. I can only think of a couple issues
> that we need to merge, like getting a fix for stats estimation done. I'll
> have a better idea once I've caught up from being away for ApacheCon and
> I'll add this to the agenda for our next DSv2 sync on Wednesday.
>

What does 3.0 mean for the DSv2 API? Does the API freeze at that point, or
would it still be allowed to change? I'm writing a DSv2 plug-in
(GitHub.com/spark-root/laurelin) and there's a couple little API things I
think could be useful, I've just not had time to write here/open a JIRA
about.

Thanks
Andrew


> On Fri, Sep 13, 2019 at 12:26 PM Dongjoon Hyun 
> wrote:
>
>> Ur, Sean.
>>
>> I prefer a full release like 2.0.0-preview.
>>
>> https://archive.apache.org/dist/spark/spark-2.0.0-preview/
>>
>> And, thank you, Xingbo!
>> Could you take a look at website generation? It seems to be broken on
>> `master`.
>>
>> Bests,
>> Dongjoon.
>>
>>
>> On Fri, Sep 13, 2019 at 11:30 AM Xingbo Jiang 
>> wrote:
>>
>>> Hi all,
>>>
>>> I would like to volunteer to be the release manager of Spark 3 preview,
>>> thanks!
>>>
>>> Sean Owen  于2019年9月13日周五 上午11:21写道:
>>>
 Well, great to hear the unanimous support for a Spark 3 preview
 release. Now, I don't know how to make releases myself :) I would
 first open it up to our revered release managers: would anyone be
 interested in trying to make one? sounds like it's not too soon to get
 what's in master out for evaluation, as there aren't any major
 deficiencies left, although a number of items to consider for the
 final release.

 I think we just need one release, targeting Hadoop 3.x / Hive 2.x in
 order to make it possible to test with JDK 11. (We're only on Scala
 2.12 at this point.)

 On Thu, Sep 12, 2019 at 7:32 PM Reynold Xin 
 wrote:
 >
 > +1! Long due for a preview release.
 >
 >
 > On Thu, Sep 12, 2019 at 5:26 PM, Holden Karau 
 wrote:
 >>
 >> I like the idea from the PoV of giving folks something to start
 testing against and exploring so they can raise issues with us earlier in
 the process and we have more time to make calls around this.
 >>
 >> On Thu, Sep 12, 2019 at 4:15 PM John Zhuge 
 wrote:
 >>>
 >>> +1  Like the idea as a user and a DSv2 contributor.
 >>>
 >>> On Thu, Sep 12, 2019 at 4:10 PM Jungtaek Lim 
 wrote:
 
  +1 (as a contributor) from me to have preview release on Spark 3
 as it would help to test the feature. When to cut preview release is
 questionable, as major works are ideally to be done before that - if we are
 intended to introduce new features before official release, that should
 work regardless of this, but if we are intended to have opportunity to test
 earlier, ideally it should.
 
  As a one of contributors in structured streaming area, I'd like to
 add some items for Spark 3.0, both "must be done" and "better to have". For
 "better to have", I pick some items for new features which committers
 reviewed couple of rounds and dropped off without soft-reject (No valid
 reason to stop). For Spark 2.4 users, only added feature for structured
 streaming is Kafka delegation token. (given we assume revising Kafka
 consumer pool as improvement) I hope we provide some gifts for structured
 streaming users in Spark 3.0 envelope.
 
  > must be done
  * SPARK-26154 Stream-stream joins - left outer join gives
 inconsistent output
  It's a correctness issue with multiple users reported, being
 reported at Nov. 2018. There's a way to reproduce it consistently, and we
 have a patch submitted at Jan. 2019 to fix it.
 
  > better to have
  * SPARK-23539 Add support for Kafka headers in Structured Streaming
  * SPARK-26848 Introduce new option to Kafka source - specify
 timestamp to start and end offset
  * SPARK-20568 Delete files after processing in structured streaming
 
  There're some more new features/improvements items in SS, but
 given we're talking about ramping-down, above list might be realistic one.
 
 
 
  On Thu, Sep 12, 2019 at 9:53 AM Jean Georges Perrin 
 wrote:
 >
 > As a user/non committer, +1
 >
 > I love the idea of an early 3.0.0 so we can test current dev
 against it, I know the final 3.x will probably need another round of
 testing when it gets out, but less for sure... I know I could checkout and
 compile, but having a “packaged” preversion is great if it does not take
 too much time to the team...
 >
 > jg
 >
 >
 > On Sep 11, 2019, at 20:40, Hyukjin Kwon 
 wrote:
 >
 > +1 from me too but 

DSV2 API Question

2019-06-25 Thread Andrew Melo
Hello,

I've (nearly) implemented a DSV2-reader interface to read particle physics
data stored in the ROOT (https://root.cern.ch/) file format. You can think
of these ROOT files as roughly parquet-like: column-wise and nested (i.e. a
column can be of type "float[]", meaning each row in the column is a
variable-length  array of floats). The overwhelming majority of our columns
are these variable-length arrays, since they represent physical quantities
that vary widely with each particle collision*.

Exposing these columns via the "SupportsScanColumnarBatch" interface has
raised a question I have about the DSV2 API. I know the interface is
currently Evolving, but I don't know if this is the appropriate place to
ask about it (I presume JIRA is a good place as well, but I had trouble
finding exactly where the best place to join is)

There is no provision in the org.apache.spark.sql.vectorized.ColumnVector
interface to return multiple rows of arrays (i.e. no "getArrays" analogue
to "getArray"). A big use case we have is to pipe these data through UDFs,
so it would be nice to be able to get the data from the file into a UDF
batch without having to convert to an intermediate row-wise representation.
Looking into ColumnarArray, however, it seems like instead of storing a
single offset and length, it could be extended to arrays of "offsets" and
"lengths". The public interface could remain the same by adding a 2nd
constructor which accepts arrays and keeping the existing constructor as a
degenerate case of a 1-length array.


* e.g. "electron_momentum" column will have a different number of entries
each row, one for each electron that is produced in a collision.


Re: Detect executor core count

2019-06-18 Thread Andrew Melo
On Tue, Jun 18, 2019 at 5:40 PM Steve Loughran 
wrote:

> be aware that older java 8 versions count the #of cores in the host, not
> those allocated for the container they run in
> https://bugs.openjdk.java.net/browse/JDK-8140793
>
>
Ergh, that's good to know. I suppose, though, that in any case, there
should be a SparkSession available if I'm in the executor context, so I can
fallback to something sensible just in case.

Thanks for the help, everyone!


> On Tue, Jun 18, 2019 at 8:13 PM Ilya Matiach 
> wrote:
>
>> Hi Andrew,
>>
>> I tried to do something similar to that in the LightGBM
>> classifier/regressor/ranker in mmlspark package, I try to use the spark
>> conf and if not configured I get the processors from the JVM directly:
>>
>>
>> https://github.com/Azure/mmlspark/blob/master/src/lightgbm/src/main/scala/LightGBMUtils.scala#L172
>>
>>
>>
>> If you know of a better way, please let me know!
>>
>>
>>
>> val spark = dataset.sparkSession
>>
>> try {
>>
>>   val confCores = spark.sparkContext.getConf
>>
>> .get("spark.executor.cores").toInt
>>
>>   val confTaskCpus = spark.sparkContext.getConf
>>
>> .get("spark.task.cpus", "1").toInt
>>
>>   confCores / confTaskCpus
>>
>> } catch {
>>
>>   case _: NoSuchElementException =>
>>
>> // If spark.executor.cores is not defined, get the cores per JVM
>>
>>     import spark.implicits._
>>
>> val numMachineCores = spark.range(0, 1)
>>
>>   .map(_ =>
>> java.lang.Runtime.getRuntime.availableProcessors).collect.head
>>
>> numMachineCores
>>
>> }
>>
>>
>>
>> Thank you, Ilya
>>
>>
>>
>> *From:* Andrew Melo 
>> *Sent:* Tuesday, June 18, 2019 11:32 AM
>> *To:* dev 
>> *Subject:* Detect executor core count
>>
>>
>>
>> Hello,
>>
>>
>>
>> Is there a way to detect the number of cores allocated for an executor
>> within a java-based InputPartitionReader?
>>
>>
>>
>> Thanks!
>>
>> Andrew
>>
>


Detect executor core count

2019-06-18 Thread Andrew Melo
Hello,

Is there a way to detect the number of cores allocated for an executor
within a java-based InputPartitionReader?

Thanks!
Andrew


Re: DataSourceV2Reader Q

2019-05-21 Thread Andrew Melo
Hi Ryan,

On Tue, May 21, 2019 at 2:48 PM Ryan Blue  wrote:
>
> Are you sure that your schema conversion is correct? If you're running with a 
> recent Spark version, then that line is probably `name.hashCode()`. That file 
> was last updated 6 months ago so I think it is likely that `name` is the null 
> in your version.

Thanks for taking a look -- in my traceback, "line 264" of
attributeReference.hashCode() is:

h = h * 37 + metadata.hashCode()

If I look within the StructType at the top of the schema, each
StructField indeed has null for the metadata, which I improperly
passing in instead of Metadata.empty()

Thanks again,
Andrew

>
> On Tue, May 21, 2019 at 11:39 AM Andrew Melo  wrote:
>>
>> Hello,
>>
>> I'm developing a DataSourceV2 reader for the ROOT (https://root.cern/)
>> file format to replace a previous DSV1 source that was in use before.
>>
>> I have a bare skeleton of the reader, which can properly load the
>> files and pass their schema into Spark 2.4.3, but any operation on the
>> resulting DataFrame (except for printSchema()) causes an NPE deep in
>> the guts of spark [1]. I'm baffled, though, since both logging
>> statements and coverage says that neither planBatchInputPartitions nor
>> any of the methods in my partition class are called -- the only thing
>> called is readSchema and the constructors.
>>
>> I followed the pattern from "JavaBatchDataSourceV2.java" -- is it
>> possible that test-case isn't up to date? Are there any other example
>> Java DSV2 readers out in the wild I could compare against?
>>
>> Thanks!
>> Andrew
>>
>> [1]
>>
>> java.lang.NullPointerException
>> at 
>> org.apache.spark.sql.catalyst.expressions.AttributeReference.hashCode(namedExpressions.scala:264)
>> at 
>> scala.collection.mutable.FlatHashTable$class.findElemImpl(FlatHashTable.scala:129)
>> at 
>> scala.collection.mutable.FlatHashTable$class.containsElem(FlatHashTable.scala:124)
>> at scala.collection.mutable.HashSet.containsElem(HashSet.scala:40)
>> at scala.collection.mutable.HashSet.contains(HashSet.scala:57)
>> at scala.collection.GenSetLike$class.apply(GenSetLike.scala:44)
>> at scala.collection.mutable.AbstractSet.apply(Set.scala:46)
>> at scala.collection.SeqLike$$anonfun$distinct$1.apply(SeqLike.scala:506)
>> at scala.collection.immutable.List.foreach(List.scala:392)
>> at scala.collection.SeqLike$class.distinct(SeqLike.scala:505)
>> at scala.collection.AbstractSeq.distinct(Seq.scala:41)
>> at 
>> org.apache.spark.sql.catalyst.expressions.package$AttributeSeq$$anonfun$unique$1.apply(package.scala:147)
>> at 
>> org.apache.spark.sql.catalyst.expressions.package$AttributeSeq$$anonfun$unique$1.apply(package.scala:147)
>> at 
>> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>> at 
>> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>> at 
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>> at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
>> at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
>> at 
>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>> at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> at 
>> org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.unique(package.scala:147)
>> at 
>> org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.direct$lzycompute(package.scala:152)
>> at 
>> org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.direct(package.scala:151)
>> at 
>> org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:229)
>> at 
>> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:101)
>> at 
>> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$40.apply(Analyzer.scala:890)
>> at 
>> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$40.apply(Analyzer.scala:892)
>> at 
>> org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:53)
>> at 
>> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve(Analyzer.scala:889)
>> at 
>> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRefe

DataSourceV2Reader Q

2019-05-21 Thread Andrew Melo
Hello,

I'm developing a DataSourceV2 reader for the ROOT (https://root.cern/)
file format to replace a previous DSV1 source that was in use before.

I have a bare skeleton of the reader, which can properly load the
files and pass their schema into Spark 2.4.3, but any operation on the
resulting DataFrame (except for printSchema()) causes an NPE deep in
the guts of spark [1]. I'm baffled, though, since both logging
statements and coverage says that neither planBatchInputPartitions nor
any of the methods in my partition class are called -- the only thing
called is readSchema and the constructors.

I followed the pattern from "JavaBatchDataSourceV2.java" -- is it
possible that test-case isn't up to date? Are there any other example
Java DSV2 readers out in the wild I could compare against?

Thanks!
Andrew

[1]

java.lang.NullPointerException
at 
org.apache.spark.sql.catalyst.expressions.AttributeReference.hashCode(namedExpressions.scala:264)
at 
scala.collection.mutable.FlatHashTable$class.findElemImpl(FlatHashTable.scala:129)
at 
scala.collection.mutable.FlatHashTable$class.containsElem(FlatHashTable.scala:124)
at scala.collection.mutable.HashSet.containsElem(HashSet.scala:40)
at scala.collection.mutable.HashSet.contains(HashSet.scala:57)
at scala.collection.GenSetLike$class.apply(GenSetLike.scala:44)
at scala.collection.mutable.AbstractSet.apply(Set.scala:46)
at scala.collection.SeqLike$$anonfun$distinct$1.apply(SeqLike.scala:506)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.SeqLike$class.distinct(SeqLike.scala:505)
at scala.collection.AbstractSeq.distinct(Seq.scala:41)
at 
org.apache.spark.sql.catalyst.expressions.package$AttributeSeq$$anonfun$unique$1.apply(package.scala:147)
at 
org.apache.spark.sql.catalyst.expressions.package$AttributeSeq$$anonfun$unique$1.apply(package.scala:147)
at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at scala.collection.MapLike$MappedValues.foreach(MapLike.scala:245)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.unique(package.scala:147)
at 
org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.direct$lzycompute(package.scala:152)
at 
org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.direct(package.scala:151)
at 
org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:229)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:101)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$40.apply(Analyzer.scala:890)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$40.apply(Analyzer.scala:892)
at 
org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:53)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve(Analyzer.scala:889)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve$2.apply(Analyzer.scala:898)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve$2.apply(Analyzer.scala:898)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:326)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:324)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$resolve(Analyzer.scala:898)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$35.apply(Analyzer.scala:958)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$35.apply(Analyzer.scala:958)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:105)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:104)
at 

Re: Anaconda installation with Pyspark/Pyarrow (2.3.0+) on cloudera managed server

2019-05-06 Thread Andrew Melo
Hi,

On Mon, May 6, 2019 at 11:59 AM Gourav Sengupta
 wrote:
>
> Hence, what I mentioned initially does sound correct ?

I don't agree at all - we've had a significant boost from moving to
regular UDFs to pandas UDFs. YMMV, of course.

>
> On Mon, May 6, 2019 at 5:43 PM Andrew Melo  wrote:
>>
>> Hi,
>>
>> On Mon, May 6, 2019 at 11:41 AM Patrick McCarthy
>>  wrote:
>> >
>> > Thanks Gourav.
>> >
>> > Incidentally, since the regular UDF is row-wise, we could optimize that a 
>> > bit by taking the convert() closure and simply making that the UDF.
>> >
>> > Since there's that MGRS object that we have to create too, we could 
>> > probably optimize it further by applying the UDF via rdd.mapPartitions, 
>> > which would allow the UDF to instantiate objects once per-partition 
>> > instead of per-row and then iterate element-wise through the rows of the 
>> > partition.
>> >
>> > All that said, having done the above on prior projects I find the pandas 
>> > abstractions to be very elegant and friendly to the end-user so I haven't 
>> > looked back :)
>> >
>> > (The common memory model via Arrow is a nice boost too!)
>>
>> And some tentative SPIPs that want to use columnar representations
>> internally in Spark should also add some good performance in the
>> future.
>>
>> Cheers
>> Andrew
>>
>> >
>> > On Mon, May 6, 2019 at 11:13 AM Gourav Sengupta 
>> >  wrote:
>> >>
>> >> The proof is in the pudding
>> >>
>> >> :)
>> >>
>> >>
>> >>
>> >> On Mon, May 6, 2019 at 2:46 PM Gourav Sengupta 
>> >>  wrote:
>> >>>
>> >>> Hi Patrick,
>> >>>
>> >>> super duper, thanks a ton for sharing the code. Can you please confirm 
>> >>> that this runs faster than the regular UDF's?
>> >>>
>> >>> Interestingly I am also running same transformations using another geo 
>> >>> spatial library in Python, where I am passing two fields and getting 
>> >>> back an array.
>> >>>
>> >>>
>> >>> Regards,
>> >>> Gourav Sengupta
>> >>>
>> >>> On Mon, May 6, 2019 at 2:00 PM Patrick McCarthy 
>> >>>  wrote:
>> >>>>
>> >>>> Human time is considerably more expensive than computer time, so in 
>> >>>> that regard, yes :)
>> >>>>
>> >>>> This took me one minute to write and ran fast enough for my needs. If 
>> >>>> you're willing to provide a comparable scala implementation I'd be 
>> >>>> happy to compare them.
>> >>>>
>> >>>> @F.pandas_udf(T.StringType(), F.PandasUDFType.SCALAR)
>> >>>>
>> >>>> def generate_mgrs_series(lat_lon_str, level):
>> >>>>
>> >>>>
>> >>>> import mgrs
>> >>>>
>> >>>> m = mgrs.MGRS()
>> >>>>
>> >>>>
>> >>>> precision_level = 0
>> >>>>
>> >>>> levelval = level[0]
>> >>>>
>> >>>>
>> >>>> if levelval == 1000:
>> >>>>
>> >>>>precision_level = 2
>> >>>>
>> >>>> if levelval == 100:
>> >>>>
>> >>>>precision_level = 3
>> >>>>
>> >>>>
>> >>>> def convert(ll_str):
>> >>>>
>> >>>>   lat, lon = ll_str.split('_')
>> >>>>
>> >>>>
>> >>>>   return m.toMGRS(lat, lon,
>> >>>>
>> >>>>   MGRSPrecision = precision_level)
>> >>>>
>> >>>>
>> >>>> return lat_lon_str.apply(lambda x: convert(x))
>> >>>>
>> >>>> On Mon, May 6, 2019 at 8:23 AM Gourav Sengupta 
>> >>>>  wrote:
>> >>>>>
>> >>>>> And you found the PANDAS UDF more performant ? Can you share your code 
>> >>>>> and prove it?
>> >>>>>
>> >>>>> On Sun, May 5, 2019 at 9:24 PM Patrick McCarthy 
>> >>>>>  wrote:
>> >>>>>>
>> >>>>>> I

Re: Anaconda installation with Pyspark/Pyarrow (2.3.0+) on cloudera managed server

2019-05-06 Thread Andrew Melo
Hi,

On Mon, May 6, 2019 at 11:41 AM Patrick McCarthy
 wrote:
>
> Thanks Gourav.
>
> Incidentally, since the regular UDF is row-wise, we could optimize that a bit 
> by taking the convert() closure and simply making that the UDF.
>
> Since there's that MGRS object that we have to create too, we could probably 
> optimize it further by applying the UDF via rdd.mapPartitions, which would 
> allow the UDF to instantiate objects once per-partition instead of per-row 
> and then iterate element-wise through the rows of the partition.
>
> All that said, having done the above on prior projects I find the pandas 
> abstractions to be very elegant and friendly to the end-user so I haven't 
> looked back :)
>
> (The common memory model via Arrow is a nice boost too!)

And some tentative SPIPs that want to use columnar representations
internally in Spark should also add some good performance in the
future.

Cheers
Andrew

>
> On Mon, May 6, 2019 at 11:13 AM Gourav Sengupta  
> wrote:
>>
>> The proof is in the pudding
>>
>> :)
>>
>>
>>
>> On Mon, May 6, 2019 at 2:46 PM Gourav Sengupta  
>> wrote:
>>>
>>> Hi Patrick,
>>>
>>> super duper, thanks a ton for sharing the code. Can you please confirm that 
>>> this runs faster than the regular UDF's?
>>>
>>> Interestingly I am also running same transformations using another geo 
>>> spatial library in Python, where I am passing two fields and getting back 
>>> an array.
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Mon, May 6, 2019 at 2:00 PM Patrick McCarthy  
>>> wrote:

 Human time is considerably more expensive than computer time, so in that 
 regard, yes :)

 This took me one minute to write and ran fast enough for my needs. If 
 you're willing to provide a comparable scala implementation I'd be happy 
 to compare them.

 @F.pandas_udf(T.StringType(), F.PandasUDFType.SCALAR)

 def generate_mgrs_series(lat_lon_str, level):


 import mgrs

 m = mgrs.MGRS()


 precision_level = 0

 levelval = level[0]


 if levelval == 1000:

precision_level = 2

 if levelval == 100:

precision_level = 3


 def convert(ll_str):

   lat, lon = ll_str.split('_')


   return m.toMGRS(lat, lon,

   MGRSPrecision = precision_level)


 return lat_lon_str.apply(lambda x: convert(x))

 On Mon, May 6, 2019 at 8:23 AM Gourav Sengupta  
 wrote:
>
> And you found the PANDAS UDF more performant ? Can you share your code 
> and prove it?
>
> On Sun, May 5, 2019 at 9:24 PM Patrick McCarthy  
> wrote:
>>
>> I disagree that it's hype. Perhaps not 1:1 with pure scala 
>> performance-wise, but for python-based data scientists or others with a 
>> lot of python expertise it allows one to do things that would otherwise 
>> be infeasible at scale.
>>
>> For instance, I recently had to convert latitude / longitude pairs to 
>> MGRS strings 
>> (https://en.wikipedia.org/wiki/Military_Grid_Reference_System). Writing 
>> a pandas UDF (and putting the mgrs python package into a conda 
>> environment) was _significantly_ easier than any alternative I found.
>>
>> @Rishi - depending on your network is constructed, some lag could come 
>> from just uploading the conda environment. If you load it from hdfs with 
>> --archives does it improve?
>>
>> On Sun, May 5, 2019 at 2:15 PM Gourav Sengupta 
>>  wrote:
>>>
>>> hi,
>>>
>>> Pandas UDF is a bit of hype. One of their blogs shows the used case of 
>>> adding 1 to a field using Pandas UDF which is pretty much pointless. So 
>>> you go beyond the blog and realise that your actual used case is more 
>>> than adding one :) and the reality hits you
>>>
>>> Pandas UDF in certain scenarios is actually slow, try using apply for a 
>>> custom or pandas function. In fact in certain scenarios I have found 
>>> general UDF's work much faster and use much less memory. Therefore test 
>>> out your used case (with at least 30 million records) before trying to 
>>> use the Pandas UDF option.
>>>
>>> And when you start using GroupMap then you realise after reading 
>>> https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#pandas-udfs-aka-vectorized-udfs
>>>  that "Oh!! now I can run into random OOM errors and the maxrecords 
>>> options does not help at all"
>>>
>>> Excerpt from the above link:
>>> Note that all data for a group will be loaded into memory before the 
>>> function is applied. This can lead to out of memory exceptions, 
>>> especially if the group sizes are skewed. The configuration for 
>>> maxRecordsPerBatch is not applied on groups and it is up to the user to 
>>> ensure that the grouped data will 

Re: can't download 2.4.1 sourcecode

2019-04-22 Thread Andrew Melo
On Mon, Apr 22, 2019 at 10:54 PM yutaochina  wrote:
>
> 
> 
>
>
> when i want download the sourcecode find it dosenot work
>

In the interim -- https://github.com/apache/spark/archive/v2.4.1.tar.gz

>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Connecting to Spark cluster remotely

2019-04-22 Thread Andrew Melo
Hi Rishkesh

On Mon, Apr 22, 2019 at 4:26 PM Rishikesh Gawade
 wrote:
>
> To put it simply, what are the configurations that need to be done on the 
> client machine so that it can run driver on itself and executors on 
> spark-yarn cluster nodes?

TBH, if it were me, I would simply SSH to the cluster and start the
spark-shell there. I don't think there's any special spark
configuration you need, but depending on what address space your
cluster is using/where you're connecting from, it might be really hard
to get all the networking components lined up.

>
> On Mon, Apr 22, 2019, 8:22 PM Rishikesh Gawade  
> wrote:
>>
>> Hi.
>> I have been experiencing trouble while trying to connect to a Spark cluster 
>> remotely. This Spark cluster is configured to run using YARN.
>> Can anyone guide me or provide any step-by-step instructions for connecting 
>> remotely via spark-shell?
>> Here's the setup that I am using:
>> The Spark cluster is running with each node as a docker container hosted on 
>> a VM. It is using YARN for scheduling resources for computations.
>> I have a dedicated docker container acting as a spark client, on which i 
>> have the spark-shell installed(spark binary in standalone setup) and also 
>> the Hadoop and Yarn config directories set so that spark-shell can 
>> coordinate with the RM for resources.
>> With all of this set, i tried using the following command:
>>
>> spark-shell --master yarn --deploy-mode client
>>
>> This results in the spark-shell giving me a scala-based console, however, 
>> when I check the Resource Manager UI on the cluster, there seems to be no 
>> application/spark session running.
>> I have been expecting the driver to be running on the client machine and the 
>> executors running in the cluster. But that doesn't seem to happen.
>>
>> How can I achieve this?
>> Is whatever I am trying feasible, and if so, a good practice?
>>
>> Thanks & Regards,
>> Rishikesh

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



DataSourceV2 exceptions

2019-04-08 Thread Andrew Melo
Hello,

I'm developing a (java) DataSourceV2 to read a columnar fileformat
popular in a number of physical sciences (https://root.cern.ch/). (I
also understand that the API isn't fixed and subject to change).

My question is -- what is the expected way to transmit exceptions from
the DataSource up to Spark? The DSV2 interface (unless I'm misreading
it) doesn't specify any caught exceptions that can be thrown in the
DS, so should I instead catch/rethrow any exceptions as uncaught
exceptions? If so, is there a recommended hierarchy to throw from?

thanks!
Andrew

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [ANNOUNCE] Announcing Apache Spark 2.4.1

2019-04-05 Thread Andrew Melo
On Fri, Apr 5, 2019 at 9:41 AM Jungtaek Lim  wrote:
>
> Thanks Andrew for reporting this. I just submitted the fix. 
> https://github.com/apache/spark/pull/24304

Thanks!

>
> On Fri, Apr 5, 2019 at 3:21 PM Andrew Melo  wrote:
>>
>> Hello,
>>
>> I'm not sure if this is the proper place to report it, but the 2.4.1
>> version of the config docs apparently didn't render right into HTML
>> (scroll down to "Compression and Serialization")
>>
>> https://spark.apache.org/docs/2.4.1/configuration.html#available-properties
>>
>> By comparison, the 2.4.0 version of the docs renders correctly.
>>
>> Cheers
>> Andrew
>>
>> On Fri, Apr 5, 2019 at 7:59 AM DB Tsai  wrote:
>> >
>> > +user list
>> >
>> > We are happy to announce the availability of Spark 2.4.1!
>> >
>> > Apache Spark 2.4.1 is a maintenance release, based on the branch-2.4
>> > maintenance branch of Spark. We strongly recommend all 2.4.0 users to
>> > upgrade to this stable release.
>> >
>> > In Apache Spark 2.4.1, Scala 2.12 support is GA, and it's no longer
>> > experimental.
>> > We will drop Scala 2.11 support in Spark 3.0, so please provide us 
>> > feedback.
>> >
>> > To download Spark 2.4.1, head over to the download page:
>> > http://spark.apache.org/downloads.html
>> >
>> > To view the release notes:
>> > https://spark.apache.org/releases/spark-release-2-4-1.html
>> >
>> > One more thing: to add a little color to this release, it's the
>> > largest RC ever (RC9)!
>> > We tried to incorporate many critical fixes at the last minute, and
>> > hope you all enjoy it.
>> >
>> > We would like to acknowledge all community members for contributing to
>> > this release. This release would not have been possible without you.
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>
>
> --
> Name : Jungtaek Lim
> Blog : http://medium.com/@heartsavior
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [ANNOUNCE] Announcing Apache Spark 2.4.1

2019-04-05 Thread Andrew Melo
Hello,

I'm not sure if this is the proper place to report it, but the 2.4.1
version of the config docs apparently didn't render right into HTML
(scroll down to "Compression and Serialization")

https://spark.apache.org/docs/2.4.1/configuration.html#available-properties

By comparison, the 2.4.0 version of the docs renders correctly.

Cheers
Andrew

On Fri, Apr 5, 2019 at 7:59 AM DB Tsai  wrote:
>
> +user list
>
> We are happy to announce the availability of Spark 2.4.1!
>
> Apache Spark 2.4.1 is a maintenance release, based on the branch-2.4
> maintenance branch of Spark. We strongly recommend all 2.4.0 users to
> upgrade to this stable release.
>
> In Apache Spark 2.4.1, Scala 2.12 support is GA, and it's no longer
> experimental.
> We will drop Scala 2.11 support in Spark 3.0, so please provide us feedback.
>
> To download Spark 2.4.1, head over to the download page:
> http://spark.apache.org/downloads.html
>
> To view the release notes:
> https://spark.apache.org/releases/spark-release-2-4-1.html
>
> One more thing: to add a little color to this release, it's the
> largest RC ever (RC9)!
> We tried to incorporate many critical fixes at the last minute, and
> hope you all enjoy it.
>
> We would like to acknowledge all community members for contributing to
> this release. This release would not have been possible without you.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Where does the Driver run?

2019-03-25 Thread Andrew Melo
Hi Pat,

Indeed, I don't think that it's possible to use cluster mode w/o
spark-submit. All the docs I see appear to always describe needing to use
spark-submit for cluster mode -- it's not even compatible with spark-shell.
But it makes sense to me -- if you want Spark to run your application's
driver, you need to package it up and send it to the cluster manager. You
can't start spark one place and then later migrate it to the cluster. It's
also why you can't use spark-shell in cluster mode either, I think.

Cheers
Andrew

On Mon, Mar 25, 2019 at 11:22 AM Pat Ferrel  wrote:

> In the GUI while the job is running the app-id link brings up logs to both
> executors, The “name” link goes to 4040 of the machine that launched the
> job but is not resolvable right now so the page is not shown. I’ll try the
> netstat but the use of port 4040 was a good clue.
>
> By what you say below this indicates the Driver is running on the
> launching machine, the client to the Spark Cluster. This should be the case
> in deployMode = client.
>
> Can someone explain what us going on? The Evidence seems to say that
> deployMode = cluster *does not work *as described unless you use
> spark-submit (and I’m only guessing at that).
>
> Further; if we don’t use spark-submit we can’t use deployMode = cluster ???
>
>
> From: Akhil Das  
> Reply: Akhil Das  
> Date: March 24, 2019 at 7:45:07 PM
> To: Pat Ferrel  
> Cc: user  
> Subject:  Re: Where does the Driver run?
>
> There's also a driver ui (usually available on port 4040), after running
> your code, I assume you are running it on your machine, visit
> localhost:4040 and you will get the driver UI.
>
> If you think the driver is running on your master/executor nodes, login to
> those machines and do a
>
>netstat -napt | grep -I listen
>
> You will see the driver listening on 404x there, this won't be the case
> mostly as you are not doing Spark-submit or using the deployMode=cluster.
>
> On Mon, 25 Mar 2019, 01:03 Pat Ferrel,  wrote:
>
>> Thanks, I have seen this many times in my research. Paraphrasing docs:
>> “in deployMode ‘cluster' the Driver runs on a Worker in the cluster”
>>
>> When I look at logs I see 2 executors on the 2 slaves (executor 0 and 1
>> with addresses that match slaves). When I look at memory usage while the
>> job runs I see virtually identical usage on the 2 Workers. This would
>> support your claim and contradict Spark docs for deployMode = cluster.
>>
>> The evidence seems to contradict the docs. I am now beginning to wonder
>> if the Driver only runs in the cluster if we use spark-submit
>>
>>
>>
>> From: Akhil Das  
>> Reply: Akhil Das  
>> Date: March 23, 2019 at 9:26:50 PM
>> To: Pat Ferrel  
>> Cc: user  
>> Subject:  Re: Where does the Driver run?
>>
>> If you are starting your "my-app" on your local machine, that's where the
>> driver is running.
>>
>> [image: image.png]
>>
>> Hope this helps.
>> 
>>
>> On Sun, Mar 24, 2019 at 4:13 AM Pat Ferrel  wrote:
>>
>>> I have researched this for a significant amount of time and find answers
>>> that seem to be for a slightly different question than mine.
>>>
>>> The Spark 2.3.3 cluster is running fine. I see the GUI on “
>>> http://master-address:8080;, there are 2 idle workers, as configured.
>>>
>>> I have a Scala application that creates a context and starts execution
>>> of a Job. I *do not use spark-submit*, I start the Job programmatically and
>>> this is where many explanations forks from my question.
>>>
>>> In "my-app" I create a new SparkConf, with the following code (slightly
>>> abbreviated):
>>>
>>>   conf.setAppName(“my-job")
>>>   conf.setMaster(“spark://master-address:7077”)
>>>   conf.set(“deployMode”, “cluster”)
>>>   // other settings like driver and executor memory requests
>>>   // the driver and executor memory requests are for all mem on the
>>> slaves, more than
>>>   // mem available on the launching machine with “my-app"
>>>   val jars = listJars(“/path/to/lib")
>>>   conf.setJars(jars)
>>>   …
>>>
>>> When I launch the job I see 2 executors running on the 2 workers/slaves.
>>> Everything seems to run fine and sometimes completes successfully. Frequent
>>> failures are the reason for this question.
>>>
>>> Where is the Driver running? I don’t see it in the GUI, I see 2
>>> Executors taking all cluster resources. With a Yarn cluster I would expect
>>> the “Driver" to run on/in the Yarn Master but I am using the Spark
>>> Standalone Master, where is the Drive part of the Job running?
>>>
>>> If is is running in the Master, we are in trouble because I start the
>>> Master on one of my 2 Workers sharing resources with one of the Executors.
>>> Executor mem + driver mem is > available mem on a Worker. I can change this
>>> but need so understand where the Driver part of the Spark Job runs. Is it
>>> in the Spark Master, or inside and Executor, or ???
>>>
>>> The “Driver” creates and 

Re: Where does the Driver run?

2019-03-24 Thread Andrew Melo
Hi Pat,

On Sun, Mar 24, 2019 at 1:03 PM Pat Ferrel  wrote:

> Thanks, I have seen this many times in my research. Paraphrasing docs: “in
> deployMode ‘cluster' the Driver runs on a Worker in the cluster”
>
> When I look at logs I see 2 executors on the 2 slaves (executor 0 and 1
> with addresses that match slaves). When I look at memory usage while the
> job runs I see virtually identical usage on the 2 Workers. This would
> support your claim and contradict Spark docs for deployMode = cluster.
>
> The evidence seems to contradict the docs. I am now beginning to wonder if
> the Driver only runs in the cluster if we use spark-submit
>

Where/how are you starting "./sbin/start-master.sh"?

Cheers
Andrew


>
>
>
> From: Akhil Das  
> Reply: Akhil Das  
> Date: March 23, 2019 at 9:26:50 PM
> To: Pat Ferrel  
> Cc: user  
> Subject:  Re: Where does the Driver run?
>
> If you are starting your "my-app" on your local machine, that's where the
> driver is running.
>
> [image: image.png]
>
> Hope this helps.
> 
>
> On Sun, Mar 24, 2019 at 4:13 AM Pat Ferrel  wrote:
>
>> I have researched this for a significant amount of time and find answers
>> that seem to be for a slightly different question than mine.
>>
>> The Spark 2.3.3 cluster is running fine. I see the GUI on “
>> http://master-address:8080;, there are 2 idle workers, as configured.
>>
>> I have a Scala application that creates a context and starts execution of
>> a Job. I *do not use spark-submit*, I start the Job programmatically and
>> this is where many explanations forks from my question.
>>
>> In "my-app" I create a new SparkConf, with the following code (slightly
>> abbreviated):
>>
>>   conf.setAppName(“my-job")
>>   conf.setMaster(“spark://master-address:7077”)
>>   conf.set(“deployMode”, “cluster”)
>>   // other settings like driver and executor memory requests
>>   // the driver and executor memory requests are for all mem on the
>> slaves, more than
>>   // mem available on the launching machine with “my-app"
>>   val jars = listJars(“/path/to/lib")
>>   conf.setJars(jars)
>>   …
>>
>> When I launch the job I see 2 executors running on the 2 workers/slaves.
>> Everything seems to run fine and sometimes completes successfully. Frequent
>> failures are the reason for this question.
>>
>> Where is the Driver running? I don’t see it in the GUI, I see 2 Executors
>> taking all cluster resources. With a Yarn cluster I would expect the
>> “Driver" to run on/in the Yarn Master but I am using the Spark Standalone
>> Master, where is the Drive part of the Job running?
>>
>> If is is running in the Master, we are in trouble because I start the
>> Master on one of my 2 Workers sharing resources with one of the Executors.
>> Executor mem + driver mem is > available mem on a Worker. I can change this
>> but need so understand where the Driver part of the Spark Job runs. Is it
>> in the Spark Master, or inside and Executor, or ???
>>
>> The “Driver” creates and broadcasts some large data structures so the
>> need for an answer is more critical than with more typical tiny Drivers.
>>
>> Thanks for you help!
>>
>
>
> --
> Cheers!
>
>


Re: SPIP: Accelerator-aware Scheduling

2019-03-01 Thread Andrew Melo
Hi,

On Fri, Mar 1, 2019 at 9:48 AM Xingbo Jiang  wrote:
>
> Hi Sean,
>
> To support GPU scheduling with YARN cluster, we have to update the hadoop 
> version to 3.1.2+. However, if we decide to not upgrade hadoop to beyond that 
> version for Spark 3.0, then we just have to disable/fallback the GPU 
> scheduling with YARN, users shall still be able to have that feature with 
> Standalone or Kubernetes cluster.
>
> We didn't include the Mesos support in current SPIP because we didn't receive 
> use cases that require GPU scheduling on Mesos cluster, however, we can still 
> add Mesos support in the future if we observe valid use cases.

First time caller, long time listener. We have GPUs in our Mesos-based
Spark cluster, and it would be nice to use them with Spark-based
GPU-enabled frameworks (our use case is deep learning applications).

Cheers
Andrew

>
> Thanks!
>
> Xingbo
>
> Sean Owen  于2019年3月1日周五 下午10:39写道:
>>
>> Two late breaking questions:
>>
>> This basically requires Hadoop 3.1 for YARN support?
>> Mesos support is listed as a non goal but it already has support for 
>> requesting GPUs in Spark. That would be 'harmonized' with this 
>> implementation even if it's not extended?
>>
>> On Fri, Mar 1, 2019, 7:48 AM Xingbo Jiang  wrote:
>>>
>>> I think we are aligned on the commitment, I'll start a vote thread for this 
>>> shortly.
>>>
>>> Xiangrui Meng  于2019年2月27日周三 上午6:47写道:

 In case there are issues visiting Google doc, I attached PDF files to the 
 JIRA.

 On Tue, Feb 26, 2019 at 7:41 AM Xingbo Jiang  wrote:
>
> Hi all,
>
> I want send a revised SPIP on implementing Accelerator(GPU)-aware 
> Scheduling. It improves Spark by making it aware of GPUs exposed by 
> cluster managers, and hence Spark can match GPU resources with user task 
> requests properly. If you have scenarios that need to run 
> workloads(DL/ML/Signal Processing etc.) on Spark cluster with GPU nodes, 
> please help review and check how it fits into your use cases. Your 
> feedback would be greatly appreciated!
>
> # Links to SPIP and Product doc:
>
> * Jira issue for the SPIP: 
> https://issues.apache.org/jira/browse/SPARK-24615
> * Google Doc: 
> https://docs.google.com/document/d/1C4J_BPOcSCJc58HL7JfHtIzHrjU0rLRdQM3y7ejil64/edit?usp=sharing
> * Product Doc: 
> https://docs.google.com/document/d/12JjloksHCdslMXhdVZ3xY5l1Nde3HRhIrqvzGnK_bNE/edit?usp=sharing
>
> Thank you!
>
> Xingbo

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Feature request: split dataset based on condition

2019-02-04 Thread Andrew Melo
Hi Ryan,

On Mon, Feb 4, 2019 at 12:17 PM Ryan Blue  wrote:
>
> To partition by a condition, you would need to create a column with the 
> result of that condition. Then you would partition by that column. The sort 
> option would also work here.

We actually do something similar to filter based on physics properties
by running a python UDF to create a column then filtering on that
column. Doing something similar to sort/partition would also require a
shuffle though, right?

>
> I don't think that there is much of a use case for this. You have a set of 
> conditions on which to partition your data, and partitioning is already 
> supported. The idea to use conditions to create separate data frames would 
> actually make that harder because you'd need to create and name tables for 
> each one.

At the end, however, we do need separate dataframes for each of these
subsamples, unless there's something basic I'm missing in how the
partitioning works. After the input datasets are split into signal and
background regions, we still need to perform further (different)
computations on each of the subsamples. e.g. for subsamples with
exactly 2 electrons, we'll need to calculate the sum of their 4-d
momenta, while samples with <2 electrons will need subtract two
different physical quantities -- several more steps before we get to
the point where we'll histogram the different subsamples for the
outputs.

Cheers
Andrew


>
> On Mon, Feb 4, 2019 at 9:16 AM Andrew Melo  wrote:
>>
>> Hello Ryan,
>>
>> On Mon, Feb 4, 2019 at 10:52 AM Ryan Blue  wrote:
>> >
>> > Andrew, can you give us more information about why partitioning the output 
>> > data doesn't work for your use case?
>> >
>> > It sounds like all you need to do is to create a table partitioned by A 
>> > and B, then you would automatically get the divisions you want. If what 
>> > you're looking for is a way to scale the number of combinations then you 
>> > can use formats that support more partitions, or you could sort by the 
>> > fields and rely on Parquet row group pruning to filter out data you don't 
>> > want.
>> >
>>
>> TBH, I don't understand what that would look like in pyspark and what
>> the consequences would be. Looking at the docs, it doesn't appear to
>> be the syntax for partitioning on a condition (most of our conditions
>> are of the form 'X > 30'). The use of Spark is still somewhat new in
>> our field, so it's possible we're not using it correctly.
>>
>> Cheers
>> Andrew
>>
>> > rb
>> >
>> > On Mon, Feb 4, 2019 at 8:33 AM Andrew Melo  wrote:
>> >>
>> >> Hello
>> >>
>> >> On Sat, Feb 2, 2019 at 12:19 AM Moein Hosseini  wrote:
>> >> >
>> >> > I've seen many application need to split dataset to multiple datasets 
>> >> > based on some conditions. As there is no method to do it in one place, 
>> >> > developers use filter method multiple times. I think it can be useful 
>> >> > to have method to split dataset based on condition in one iteration, 
>> >> > something like partition method of scala (of-course scala partition 
>> >> > just split list into two list, but something more general can be more 
>> >> > useful).
>> >> > If you think it can be helpful, I can create Jira issue and work on it 
>> >> > to send PR.
>> >>
>> >> This would be a really useful feature for our use case (processing
>> >> collision data from the LHC). We typically want to take some sort of
>> >> input and split into multiple disjoint outputs based on some
>> >> conditions. E.g. if we have two conditions A and B, we'll end up with
>> >> 4 outputs (AB, !AB, A!B, !A!B). As we add more conditions, the
>> >> combinatorics explode like n^2, when we could produce them all up
>> >> front with this "multi filter" (or however it would be called).
>> >>
>> >> Cheers
>> >> Andrew
>> >>
>> >> >
>> >> > Best Regards
>> >> > Moein
>> >> >
>> >> > --
>> >> >
>> >> > Moein Hosseini
>> >> > Data Engineer
>> >> > mobile: +98 912 468 1859
>> >> > site: www.moein.xyz
>> >> > email: moein...@gmail.com
>> >> >
>> >>
>> >> -
>> >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>> >>
>> >
>> >
>> > --
>> > Ryan Blue
>> > Software Engineer
>> > Netflix
>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Feature request: split dataset based on condition

2019-02-04 Thread Andrew Melo
Hello Ryan,

On Mon, Feb 4, 2019 at 10:52 AM Ryan Blue  wrote:
>
> Andrew, can you give us more information about why partitioning the output 
> data doesn't work for your use case?
>
> It sounds like all you need to do is to create a table partitioned by A and 
> B, then you would automatically get the divisions you want. If what you're 
> looking for is a way to scale the number of combinations then you can use 
> formats that support more partitions, or you could sort by the fields and 
> rely on Parquet row group pruning to filter out data you don't want.
>

TBH, I don't understand what that would look like in pyspark and what
the consequences would be. Looking at the docs, it doesn't appear to
be the syntax for partitioning on a condition (most of our conditions
are of the form 'X > 30'). The use of Spark is still somewhat new in
our field, so it's possible we're not using it correctly.

Cheers
Andrew

> rb
>
> On Mon, Feb 4, 2019 at 8:33 AM Andrew Melo  wrote:
>>
>> Hello
>>
>> On Sat, Feb 2, 2019 at 12:19 AM Moein Hosseini  wrote:
>> >
>> > I've seen many application need to split dataset to multiple datasets 
>> > based on some conditions. As there is no method to do it in one place, 
>> > developers use filter method multiple times. I think it can be useful to 
>> > have method to split dataset based on condition in one iteration, 
>> > something like partition method of scala (of-course scala partition just 
>> > split list into two list, but something more general can be more useful).
>> > If you think it can be helpful, I can create Jira issue and work on it to 
>> > send PR.
>>
>> This would be a really useful feature for our use case (processing
>> collision data from the LHC). We typically want to take some sort of
>> input and split into multiple disjoint outputs based on some
>> conditions. E.g. if we have two conditions A and B, we'll end up with
>> 4 outputs (AB, !AB, A!B, !A!B). As we add more conditions, the
>> combinatorics explode like n^2, when we could produce them all up
>> front with this "multi filter" (or however it would be called).
>>
>> Cheers
>> Andrew
>>
>> >
>> > Best Regards
>> > Moein
>> >
>> > --
>> >
>> > Moein Hosseini
>> > Data Engineer
>> > mobile: +98 912 468 1859
>> > site: www.moein.xyz
>> > email: moein...@gmail.com
>> >
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Feature request: split dataset based on condition

2019-02-04 Thread Andrew Melo
Hello

On Sat, Feb 2, 2019 at 12:19 AM Moein Hosseini  wrote:
>
> I've seen many application need to split dataset to multiple datasets based 
> on some conditions. As there is no method to do it in one place, developers 
> use filter method multiple times. I think it can be useful to have method to 
> split dataset based on condition in one iteration, something like partition 
> method of scala (of-course scala partition just split list into two list, but 
> something more general can be more useful).
> If you think it can be helpful, I can create Jira issue and work on it to 
> send PR.

This would be a really useful feature for our use case (processing
collision data from the LHC). We typically want to take some sort of
input and split into multiple disjoint outputs based on some
conditions. E.g. if we have two conditions A and B, we'll end up with
4 outputs (AB, !AB, A!B, !A!B). As we add more conditions, the
combinatorics explode like n^2, when we could produce them all up
front with this "multi filter" (or however it would be called).

Cheers
Andrew

>
> Best Regards
> Moein
>
> --
>
> Moein Hosseini
> Data Engineer
> mobile: +98 912 468 1859
> site: www.moein.xyz
> email: moein...@gmail.com
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Please stop asking to unsubscribe

2019-01-31 Thread Andrew Melo
The correct way to unsubscribe is to mail

user-unsubscr...@spark.apache.org

Just mailing the list with "unsubscribe" doesn't actually do anything...

Thanks
Andrew

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: What are the alternatives to nested DataFrames?

2018-12-28 Thread Andrew Melo
Could you join() the DFs on a common key?

On Fri, Dec 28, 2018 at 18:35  wrote:

> Shabad , I am not sure what you are trying to say. Could you please give
> me an example? The result of the Query is a Dataframe that is created after
> iterating, so I am not sure how could I map that to a column without
> iterating and getting the values.
>
>
>
> I have a Dataframe that contains a list of cities for which I would like
> to iterate over and search in Elasticsearch.  This list is stored in
> Dataframe because it contains hundreds of thousands of elements with
> multiple properties that would not fit in a single machine.
>
>
>
> The issue is that the elastic-spark connector returns a Dataframe as well
> which leads to a dataframe creation within a Dataframe
>
>
>
> The only solution I found is to store the list of cities in a a regular
> scala Seq and iterate over that, but as far as I know this would make Seq
> centralized instead of distributed (run at the executor only?)
>
>
>
> Full example :
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *val cities= Seq("New York","Michigan")cities.foreach(r => {  val qb =
> QueryBuilders.matchQuery("name", r).operator(Operator.AND)
> print(qb.toString)  val dfs = sqlContext.esDF("cities/docs", qb.toString)
> // Returns a dataframe for each city  dfs.show() // Works as expected. It
> prints the individual dataframe with the result of the query})*
>
>
>
>
>
> *val cities = Seq("New York","Michigan").toDF()*
>
>
>
> *cities.foreach(r => {*
>
>
>
> *  val city  = r.getString(0)*
>
>
>
> *  val qb = QueryBuilders.matchQuery("name",
> city).operator(Operator.AND)*
>
> *  print(qb.toString)*
>
>
>
> *  val dfs = sqlContext.esDF("cities/docs", qb.toString) // null
> pointer*
>
>
>
> *  dfs.show()*
>
>
>
> *})*
>
>
>
>
>
> *From:* Shahab Yunus 
> *Sent:* Friday, December 28, 2018 12:34 PM
> *To:* em...@yeikel.com
> *Cc:* user 
> *Subject:* Re: What are the alternatives to nested DataFrames?
>
>
>
> Can you have a dataframe with a column which stores json (type string)? Or
> you can also have a column of array type in which you store all cities
> matching your query.
>
>
>
>
>
>
>
> On Fri, Dec 28, 2018 at 2:48 AM  wrote:
>
> Hi community ,
>
>
>
> As shown in other answers online , Spark does not support the nesting of
> DataFrames , but what are the options?
>
>
>
> I have the following scenario :
>
>
>
> dataFrame1 = List of Cities
>
>
>
> dataFrame2 = Created after searching in ElasticSearch for each city in
> dataFrame1
>
>
>
> I've tried :
>
>
>
>  val cities= sc.parallelize(Seq("New York")).toDF()
>
>cities.foreach(r => {
>
> val companyName = r.getString(0)
>
> println(companyName)
>
> val dfs = sqlContext.esDF("cities/docs", "?q=" + companyName)
>  //returns a DataFrame consisting of all the cities matching the entry in
> cities
>
> })
>
>
>
> Which triggers the expected null pointer exception
>
>
>
> java.lang.NullPointerException
>
> at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:53)
>
> at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:51)
>
> at
> org.elasticsearch.spark.sql.package$SQLContextFunctions.esDF(package.scala:37)
>
> at Main$$anonfun$main$1.apply(Main.scala:43)
>
> at Main$$anonfun$main$1.apply(Main.scala:39)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)
>
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
>
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:109)
>
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> at java.lang.Thread.run(Thread.java:748)
>
> 2018-12-28 02:01:00 ERROR TaskSetManager:70 - Task 7 in stage 0.0 failed 1
> times; aborting job
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 7 in stage 0.0 failed 1 times, most recent
> failure: Lost task 7.0 in stage 0.0 (TID 7, localhost, executor driver):
> java.lang.NullPointerException
>
>
>
> What options do I have?
>
>
>
> Thank you.
>
> --
It's dark in this basement.


Questions about caching

2018-12-11 Thread Andrew Melo
Greetings, Spark Aficionados-

I'm working on a project to (ab-)use PySpark to do particle physics
analysis, which involves iterating with a lot of transformations (to
apply weights and select candidate events) and reductions (to produce
histograms of relevant physics objects). We have a basic version
working, but I'm looking to exploit some of Spark's caching behavior
to speed up the interactive computation portion of the analysis,
probably by writing a thin convenience wrapper. I have a couple
questions I've been unable to find definitive answers to, which would
help me design this wrapper an efficient way:

1) When cache()-ing a dataframe where only a subset of the columns are
used, is the entire dataframe placed into the cache, or only the used
columns. E.G. does "df2" end up caching only "a", or all three
columns?

df1 = sc.read.load('test.parquet') # Has columns a, b, c
df2 = df1.cache()
df2.select('a').collect()

2) Are caches reference-based, or is there some sort of de-duplication
based on the logical/physical plans. So, for instance, does spark take
advantage of the fact that these two dataframes should have the same
content:

df1 = sc.read.load('test.parquet').cache()
df2 = sc.read.load('test.parquet').cache()

...or are df1 and df2 totally independent WRT caching behavior?

2a) If the cache is reference-based, is it sufficient to hold a
weakref to the python object to keep the cache in-scope?

3) Finally, the spark.externalBlockStore.blockManager is intriguing in
our environment where we have multiple users concurrently analyzing
mostly the same input datasets. We have enough RAM in our clusters to
cache a high percentage of the very common datasets, but only if users
could somehow share their caches (which, conveniently, are the larger
datasets), We also have very large edge SSD cache servers we use to
cache trans-oceanic I/O we could throw at this as well.

It looks, however, like that API was removed in 2.0.0 and there wasn't
a replacement. There are products like Alluxio, but they aren't
transparent, requiring the user to manually cache their dataframes by
doing save/loads to external files using "alluxio://" URIs. Is there
no way around this behavior now?

Sorry for the long email, and thanks!
Andrew

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: SparkContext singleton get w/o create?

2018-08-27 Thread Andrew Melo
Hi,

I'm a long-time listener, first-time committer to spark, so this is
good to get my feet wet. I'm particularly interested in SPARK-23836,
which is an itch I may want to dive into and scratch myself in the
next month or so since it's pretty painful for our use-case.

Thanks!
Andrew

On Mon, Aug 27, 2018 at 2:20 PM, Holden Karau  wrote:
> Sure, I don't think you should wait on that being merged in. If you want to
> take the JIRA go ahead (although if you're already familiar with the Spark
> code base it might make sense to leave it as a starter issue for someone who
> is just getting started).
>
> On Mon, Aug 27, 2018 at 12:18 PM Andrew Melo  wrote:
>>
>> Hi Holden,
>>
>> I'm agnostic to the approach (though it seems cleaner to have an
>> explicit API for it). If you would like, I can take that JIRA and
>> implement it (should be a 3-line function).
>>
>> Cheers
>> Andrew
>>
>> On Mon, Aug 27, 2018 at 2:14 PM, Holden Karau 
>> wrote:
>> > Seems reasonable. We should probably add `getActiveSession` to the
>> > PySpark
>> > API (filed a starter JIRA
>> > https://issues.apache.org/jira/browse/SPARK-25255
>> > )
>> >
>> > On Mon, Aug 27, 2018 at 12:09 PM Andrew Melo 
>> > wrote:
>> >>
>> >> Hello Sean, others -
>> >>
>> >> Just to confirm, is it OK for client applications to access
>> >> SparkContext._active_spark_context, if it wraps the accesses in `with
>> >> SparkContext._lock:`?
>> >>
>> >> If that's acceptable to Spark, I'll implement the modifications in the
>> >> Jupyter extensions.
>> >>
>> >> thanks!
>> >> Andrew
>> >>
>> >> On Tue, Aug 7, 2018 at 5:52 PM, Andrew Melo 
>> >> wrote:
>> >> > Hi Sean,
>> >> >
>> >> > On Tue, Aug 7, 2018 at 5:44 PM, Sean Owen  wrote:
>> >> >> Ah, python.  How about SparkContext._active_spark_context then?
>> >> >
>> >> > Ah yes, that looks like the right member, but I'm a bit wary about
>> >> > depending on functionality of objects with leading underscores. I
>> >> > assumed that was "private" and subject to change. Is that something I
>> >> > should be unconcerned about.
>> >> >
>> >> > The other thought is that the accesses with SparkContext are
>> >> > protected
>> >> > by "SparkContext._lock" -- should I also use that lock?
>> >> >
>> >> > Thanks for your help!
>> >> > Andrew
>> >> >
>> >> >>
>> >> >> On Tue, Aug 7, 2018 at 5:34 PM Andrew Melo 
>> >> >> wrote:
>> >> >>>
>> >> >>> Hi Sean,
>> >> >>>
>> >> >>> On Tue, Aug 7, 2018 at 5:16 PM, Sean Owen  wrote:
>> >> >>> > Is SparkSession.getActiveSession what you're looking for?
>> >> >>>
>> >> >>> Perhaps -- though there's not a corresponding python function, and
>> >> >>> I'm
>> >> >>> not exactly sure how to call the scala getActiveSession without
>> >> >>> first
>> >> >>> instantiating the python version and causing a JVM to start.
>> >> >>>
>> >> >>> Is there an easy way to call getActiveSession that doesn't start a
>> >> >>> JVM?
>> >> >>>
>> >> >>> Cheers
>> >> >>> Andrew
>> >> >>>
>> >> >>> >
>> >> >>> > On Tue, Aug 7, 2018 at 5:11 PM Andrew Melo
>> >> >>> > 
>> >> >>> > wrote:
>> >> >>> >>
>> >> >>> >> Hello,
>> >> >>> >>
>> >> >>> >> One pain point with various Jupyter extensions [1][2] that
>> >> >>> >> provide
>> >> >>> >> visual feedback about running spark processes is the lack of a
>> >> >>> >> public
>> >> >>> >> API to introspect the web URL. The notebook server needs to know
>> >> >>> >> the
>> >> >>> >> URL to find information about the current SparkContext.
>> >> >>> >>
>> >> >&g

Re: SparkContext singleton get w/o create?

2018-08-27 Thread Andrew Melo
Hi Holden,

I'm agnostic to the approach (though it seems cleaner to have an
explicit API for it). If you would like, I can take that JIRA and
implement it (should be a 3-line function).

Cheers
Andrew

On Mon, Aug 27, 2018 at 2:14 PM, Holden Karau  wrote:
> Seems reasonable. We should probably add `getActiveSession` to the PySpark
> API (filed a starter JIRA https://issues.apache.org/jira/browse/SPARK-25255
> )
>
> On Mon, Aug 27, 2018 at 12:09 PM Andrew Melo  wrote:
>>
>> Hello Sean, others -
>>
>> Just to confirm, is it OK for client applications to access
>> SparkContext._active_spark_context, if it wraps the accesses in `with
>> SparkContext._lock:`?
>>
>> If that's acceptable to Spark, I'll implement the modifications in the
>> Jupyter extensions.
>>
>> thanks!
>> Andrew
>>
>> On Tue, Aug 7, 2018 at 5:52 PM, Andrew Melo  wrote:
>> > Hi Sean,
>> >
>> > On Tue, Aug 7, 2018 at 5:44 PM, Sean Owen  wrote:
>> >> Ah, python.  How about SparkContext._active_spark_context then?
>> >
>> > Ah yes, that looks like the right member, but I'm a bit wary about
>> > depending on functionality of objects with leading underscores. I
>> > assumed that was "private" and subject to change. Is that something I
>> > should be unconcerned about.
>> >
>> > The other thought is that the accesses with SparkContext are protected
>> > by "SparkContext._lock" -- should I also use that lock?
>> >
>> > Thanks for your help!
>> > Andrew
>> >
>> >>
>> >> On Tue, Aug 7, 2018 at 5:34 PM Andrew Melo 
>> >> wrote:
>> >>>
>> >>> Hi Sean,
>> >>>
>> >>> On Tue, Aug 7, 2018 at 5:16 PM, Sean Owen  wrote:
>> >>> > Is SparkSession.getActiveSession what you're looking for?
>> >>>
>> >>> Perhaps -- though there's not a corresponding python function, and I'm
>> >>> not exactly sure how to call the scala getActiveSession without first
>> >>> instantiating the python version and causing a JVM to start.
>> >>>
>> >>> Is there an easy way to call getActiveSession that doesn't start a
>> >>> JVM?
>> >>>
>> >>> Cheers
>> >>> Andrew
>> >>>
>> >>> >
>> >>> > On Tue, Aug 7, 2018 at 5:11 PM Andrew Melo 
>> >>> > wrote:
>> >>> >>
>> >>> >> Hello,
>> >>> >>
>> >>> >> One pain point with various Jupyter extensions [1][2] that provide
>> >>> >> visual feedback about running spark processes is the lack of a
>> >>> >> public
>> >>> >> API to introspect the web URL. The notebook server needs to know
>> >>> >> the
>> >>> >> URL to find information about the current SparkContext.
>> >>> >>
>> >>> >> Simply looking for "localhost:4040" works most of the time, but
>> >>> >> fails
>> >>> >> if multiple spark notebooks are being run on the same host -- spark
>> >>> >> increments the port for each new context, leading to confusion when
>> >>> >> the notebooks are trying to probe the web interface for
>> >>> >> information.
>> >>> >>
>> >>> >> I'd like to implement an analog to SparkContext.getOrCreate(),
>> >>> >> perhaps
>> >>> >> called "getIfExists()" that returns the current singleton if it
>> >>> >> exists, or None otherwise. The Jupyter code would then be able to
>> >>> >> use
>> >>> >> this entrypoint to query Spark for an active Spark context, which
>> >>> >> it
>> >>> >> could use to probe the web URL.
>> >>> >>
>> >>> >> It's a minor change, but this would be my first contribution to
>> >>> >> Spark,
>> >>> >> and I want to make sure my plan was kosher before I implemented it.
>> >>> >>
>> >>> >> Thanks!
>> >>> >> Andrew
>> >>> >>
>> >>> >>
>> >>> >>
>> >>> >>
>> >>> >>
>> >>> >> [1] https://krishnan-r.github.io/sparkmonitor/
>> >>> >>
>> >>> >> [2] https://github.com/mozilla/jupyter-spark
>> >>> >>
>> >>> >>
>> >>> >> -
>> >>> >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>> >>> >>
>> >>> >
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>
>
> --
> Twitter: https://twitter.com/holdenkarau
> Books (Learning Spark, High Performance Spark, etc.):
> https://amzn.to/2MaRAG9
> YouTube Live Streams: https://www.youtube.com/user/holdenkarau

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: SparkContext singleton get w/o create?

2018-08-27 Thread Andrew Melo
Hello Sean, others -

Just to confirm, is it OK for client applications to access
SparkContext._active_spark_context, if it wraps the accesses in `with
SparkContext._lock:`?

If that's acceptable to Spark, I'll implement the modifications in the
Jupyter extensions.

thanks!
Andrew

On Tue, Aug 7, 2018 at 5:52 PM, Andrew Melo  wrote:
> Hi Sean,
>
> On Tue, Aug 7, 2018 at 5:44 PM, Sean Owen  wrote:
>> Ah, python.  How about SparkContext._active_spark_context then?
>
> Ah yes, that looks like the right member, but I'm a bit wary about
> depending on functionality of objects with leading underscores. I
> assumed that was "private" and subject to change. Is that something I
> should be unconcerned about.
>
> The other thought is that the accesses with SparkContext are protected
> by "SparkContext._lock" -- should I also use that lock?
>
> Thanks for your help!
> Andrew
>
>>
>> On Tue, Aug 7, 2018 at 5:34 PM Andrew Melo  wrote:
>>>
>>> Hi Sean,
>>>
>>> On Tue, Aug 7, 2018 at 5:16 PM, Sean Owen  wrote:
>>> > Is SparkSession.getActiveSession what you're looking for?
>>>
>>> Perhaps -- though there's not a corresponding python function, and I'm
>>> not exactly sure how to call the scala getActiveSession without first
>>> instantiating the python version and causing a JVM to start.
>>>
>>> Is there an easy way to call getActiveSession that doesn't start a JVM?
>>>
>>> Cheers
>>> Andrew
>>>
>>> >
>>> > On Tue, Aug 7, 2018 at 5:11 PM Andrew Melo 
>>> > wrote:
>>> >>
>>> >> Hello,
>>> >>
>>> >> One pain point with various Jupyter extensions [1][2] that provide
>>> >> visual feedback about running spark processes is the lack of a public
>>> >> API to introspect the web URL. The notebook server needs to know the
>>> >> URL to find information about the current SparkContext.
>>> >>
>>> >> Simply looking for "localhost:4040" works most of the time, but fails
>>> >> if multiple spark notebooks are being run on the same host -- spark
>>> >> increments the port for each new context, leading to confusion when
>>> >> the notebooks are trying to probe the web interface for information.
>>> >>
>>> >> I'd like to implement an analog to SparkContext.getOrCreate(), perhaps
>>> >> called "getIfExists()" that returns the current singleton if it
>>> >> exists, or None otherwise. The Jupyter code would then be able to use
>>> >> this entrypoint to query Spark for an active Spark context, which it
>>> >> could use to probe the web URL.
>>> >>
>>> >> It's a minor change, but this would be my first contribution to Spark,
>>> >> and I want to make sure my plan was kosher before I implemented it.
>>> >>
>>> >> Thanks!
>>> >> Andrew
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> [1] https://krishnan-r.github.io/sparkmonitor/
>>> >>
>>> >> [2] https://github.com/mozilla/jupyter-spark
>>> >>
>>> >> -
>>> >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>> >>
>>> >

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: SparkContext singleton get w/o create?

2018-08-07 Thread Andrew Melo
Hi Sean,

On Tue, Aug 7, 2018 at 5:44 PM, Sean Owen  wrote:
> Ah, python.  How about SparkContext._active_spark_context then?

Ah yes, that looks like the right member, but I'm a bit wary about
depending on functionality of objects with leading underscores. I
assumed that was "private" and subject to change. Is that something I
should be unconcerned about.

The other thought is that the accesses with SparkContext are protected
by "SparkContext._lock" -- should I also use that lock?

Thanks for your help!
Andrew

>
> On Tue, Aug 7, 2018 at 5:34 PM Andrew Melo  wrote:
>>
>> Hi Sean,
>>
>> On Tue, Aug 7, 2018 at 5:16 PM, Sean Owen  wrote:
>> > Is SparkSession.getActiveSession what you're looking for?
>>
>> Perhaps -- though there's not a corresponding python function, and I'm
>> not exactly sure how to call the scala getActiveSession without first
>> instantiating the python version and causing a JVM to start.
>>
>> Is there an easy way to call getActiveSession that doesn't start a JVM?
>>
>> Cheers
>> Andrew
>>
>> >
>> > On Tue, Aug 7, 2018 at 5:11 PM Andrew Melo 
>> > wrote:
>> >>
>> >> Hello,
>> >>
>> >> One pain point with various Jupyter extensions [1][2] that provide
>> >> visual feedback about running spark processes is the lack of a public
>> >> API to introspect the web URL. The notebook server needs to know the
>> >> URL to find information about the current SparkContext.
>> >>
>> >> Simply looking for "localhost:4040" works most of the time, but fails
>> >> if multiple spark notebooks are being run on the same host -- spark
>> >> increments the port for each new context, leading to confusion when
>> >> the notebooks are trying to probe the web interface for information.
>> >>
>> >> I'd like to implement an analog to SparkContext.getOrCreate(), perhaps
>> >> called "getIfExists()" that returns the current singleton if it
>> >> exists, or None otherwise. The Jupyter code would then be able to use
>> >> this entrypoint to query Spark for an active Spark context, which it
>> >> could use to probe the web URL.
>> >>
>> >> It's a minor change, but this would be my first contribution to Spark,
>> >> and I want to make sure my plan was kosher before I implemented it.
>> >>
>> >> Thanks!
>> >> Andrew
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> [1] https://krishnan-r.github.io/sparkmonitor/
>> >>
>> >> [2] https://github.com/mozilla/jupyter-spark
>> >>
>> >> -
>> >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>> >>
>> >

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: SparkContext singleton get w/o create?

2018-08-07 Thread Andrew Melo
Hi Sean,

On Tue, Aug 7, 2018 at 5:16 PM, Sean Owen  wrote:
> Is SparkSession.getActiveSession what you're looking for?

Perhaps -- though there's not a corresponding python function, and I'm
not exactly sure how to call the scala getActiveSession without first
instantiating the python version and causing a JVM to start.

Is there an easy way to call getActiveSession that doesn't start a JVM?

Cheers
Andrew

>
> On Tue, Aug 7, 2018 at 5:11 PM Andrew Melo  wrote:
>>
>> Hello,
>>
>> One pain point with various Jupyter extensions [1][2] that provide
>> visual feedback about running spark processes is the lack of a public
>> API to introspect the web URL. The notebook server needs to know the
>> URL to find information about the current SparkContext.
>>
>> Simply looking for "localhost:4040" works most of the time, but fails
>> if multiple spark notebooks are being run on the same host -- spark
>> increments the port for each new context, leading to confusion when
>> the notebooks are trying to probe the web interface for information.
>>
>> I'd like to implement an analog to SparkContext.getOrCreate(), perhaps
>> called "getIfExists()" that returns the current singleton if it
>> exists, or None otherwise. The Jupyter code would then be able to use
>> this entrypoint to query Spark for an active Spark context, which it
>> could use to probe the web URL.
>>
>> It's a minor change, but this would be my first contribution to Spark,
>> and I want to make sure my plan was kosher before I implemented it.
>>
>> Thanks!
>> Andrew
>>
>>
>>
>>
>>
>> [1] https://krishnan-r.github.io/sparkmonitor/
>>
>> [2] https://github.com/mozilla/jupyter-spark
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



SparkContext singleton get w/o create?

2018-08-07 Thread Andrew Melo
Hello,

One pain point with various Jupyter extensions [1][2] that provide
visual feedback about running spark processes is the lack of a public
API to introspect the web URL. The notebook server needs to know the
URL to find information about the current SparkContext.

Simply looking for "localhost:4040" works most of the time, but fails
if multiple spark notebooks are being run on the same host -- spark
increments the port for each new context, leading to confusion when
the notebooks are trying to probe the web interface for information.

I'd like to implement an analog to SparkContext.getOrCreate(), perhaps
called "getIfExists()" that returns the current singleton if it
exists, or None otherwise. The Jupyter code would then be able to use
this entrypoint to query Spark for an active Spark context, which it
could use to probe the web URL.

It's a minor change, but this would be my first contribution to Spark,
and I want to make sure my plan was kosher before I implemented it.

Thanks!
Andrew





[1] https://krishnan-r.github.io/sparkmonitor/

[2] https://github.com/mozilla/jupyter-spark

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



[slurm-users] Stagein/Stageout

2018-01-05 Thread Andrew Melo
Hi all,

Does SLURM support similar functionality to the PBS options

#PBS -w stagein
#PBS -w stageout

Looking through the docs and even the qsub wrapper commands, I don't
see an analogous way to implement the same with SLURM via sbatch. I
see the documentation about burst buffers, but that doesn't seem to be
the same.

Any suggestions?

Andrew

-- 
--
Andrew Melo



Re: [foreman-users] Strange behavior connecting to proxy

2017-02-13 Thread Andrew Melo
Hello,

Sorry for the delay responding. That was just it. I was confused
because telnet on the same machine worked.

Cheers,
Andrew

On Thu, Jan 26, 2017 at 2:22 AM, Dominic Cleal <domi...@cleal.org> wrote:
> On 24/01/17 16:57, Andrew Melo wrote:
>> Hi all,
>>
>> I have a proof-of-concept foreman instance running in a VM with a proxy
>> on the machine connected to our IPMI network. For some reason, trying to
>> add the proxy to my foreman instance yields:
>>
>> Unable to communicate with the proxy: ERF12-2530
>> [ProxyAPI::ProxyException]: Unable to detect features ([Errno::EACCES]:
>> Permission denied - connect(2) for "10.0.5.33" port 8123) for proxy
>> http://10.0.5.33:8123/features
>> Please check the proxy is configured and running on the host.
>
> Is SELinux or another security system enabled and enforcing on your
> Foreman server? To receive a permission denied during an outbound
> connection suggests a security system.
>
> If SELinux, the standard policy does not include 8123 as a smart proxy
> port. You can add ports with `semanage port -a -t foreman_proxy_port_t
> -p tcp 8124`, but 8123 is usually defined as a web cache port so you'd
> probably need to load new SELinux policy to allow foreman_proxy_t to
> connect to http_cache_port_t (audit2allow may help).
>
> --
> Dominic Cleal
> domi...@cleal.org
>
> --
> You received this message because you are subscribed to the Google Groups 
> "Foreman users" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to foreman-users+unsubscr...@googlegroups.com.
> To post to this group, send email to foreman-users@googlegroups.com.
> Visit this group at https://groups.google.com/group/foreman-users.
> For more options, visit https://groups.google.com/d/optout.



-- 
--
Andrew Melo

-- 
You received this message because you are subscribed to the Google Groups 
"Foreman users" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to foreman-users+unsubscr...@googlegroups.com.
To post to this group, send email to foreman-users@googlegroups.com.
Visit this group at https://groups.google.com/group/foreman-users.
For more options, visit https://groups.google.com/d/optout.


[foreman-users] Strange behavior connecting to proxy

2017-01-24 Thread Andrew Melo
Hi all,

I have a proof-of-concept foreman instance running in a VM with a proxy on 
the machine connected to our IPMI network. For some reason, trying to add 
the proxy to my foreman instance yields:

Unable to communicate with the proxy: ERF12-2530 
[ProxyAPI::ProxyException]: Unable to detect features ([Errno::EACCES]: 
Permission denied - connect(2) for "10.0.5.33" port 8123) for proxy 
http://10.0.5.33:8123/features
Please check the proxy is configured and running on the host.

But I don't understand what's happening. I can curl that URL manually from 
the foreman machine:

[root@foreman ~]# curl http://10.0.5.33:8123/features
["bmc"]

Additionally, if I watch the logs on the proxy, I can see my manual 
attempts, but I never see anything when I try to connect through the web 
interface. It makes me think something on the foreman side is happening 
that causes it to never try, but I can't think of what that would be.

Does anyone have an idea of what I'm doing wrong?

Thanks!
Andrew

-- 
You received this message because you are subscribed to the Google Groups 
"Foreman users" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to foreman-users+unsubscr...@googlegroups.com.
To post to this group, send email to foreman-users@googlegroups.com.
Visit this group at https://groups.google.com/group/foreman-users.
For more options, visit https://groups.google.com/d/optout.


Re: [ANNOUNCE] New Parquet PMC member - Wes McKinney

2016-08-23 Thread Andrew Melo
Congrats, Wes!

On Tue, Aug 23, 2016 at 11:12 AM, Julien Le Dem <jul...@dremio.com> wrote:
> On behalf of the Apache Parquet PMC I'm pleased to announce that
> Wes McKinney has accepted to join the PMC.
>
> Welcome Wes!
>
> --
> Julien



-- 
--
Andrew Melo


Re: [CMake] Does Makefile generated by CMake support make -jN?

2016-07-12 Thread Andrew Melo
On Tue, Jul 12, 2016 at 10:48 PM, Chaos Zhang <zcsd2...@gmail.com> wrote:
> Thanks for your reply Raymond, by 'If you got 96% both times, then I would
> say there's a problem.', did you mean if make performed like this, it means
> this project can't use make -jN?

If "make -j1" and "make -j8" ran in the same amount of time, there
would be a problem. Since "make -j1" uses 96% CPU and "make -j8" uses
690% CPU, it implies that the "-j" options make compilation time
scale, which is what you want.

> BTW, does CMake support some options to
> turn on hardware acceleration?

What do you mean by "hardware acceleration"?



>
> Raymond Wan-2 wrote
>> Hi Chao,
>>
>>
>> On Wed, Jul 13, 2016 at 10:54 AM, Chaos Zhang 
>
>> zcsd2012@
>
>>  wrote:
>>> I was trying to compile my project using CMake, after CMake generated
>>> Makefile.
>>> I used `/usr/bin/time -v make` to make the Makefile,  got the result:
>>> 'Percent of CPU this job got: 96%'.
>>> Then i used `/usr/bin/time -v make -j8` to make the Makefile, the result
>>> of
>>> CPU used is  'Percent of CPU this job got: 648%'.
>>> So i wonder if Makefile generated by CMake support make -jN, BTW my CPU
>>> is
>>> I7-4790, and after i use 'cat /proc/cpuinfo', there are 8 processors.
>>
>>
>> I'm not sure what you are asking...
>>
>> You ran make with -j 8 and you got a percentage greater than 96% ...
>> this implies that the Makefile does support -jN, doesn't it?  If you
>> got 96% both times, then I would say there's a problem.
>>
>> Is what you're wondering why it is 648% and not 800%?  I think that
>> depends on the dependencies in your Makefile.  It's possible that some
>> of them depend on each other in such a way that 8 parallel threads is
>> not possible.
>>
>> Also, I can't remember the value returned by /usr/bin/time -v, but I
>> guess 648% is the average and not maximum.  So, perhaps there are
>> parts within the Makefile that only one thread could be used and then
>> it gets averaged out?
>>
>> Ray
>> --
>>
>> Powered by www.kitware.com
>>
>> Please keep messages on-topic and check the CMake FAQ at:
>> http://www.cmake.org/Wiki/CMake_FAQ
>>
>> Kitware offers various services to support the CMake community. For more
>> information on each offering, please visit:
>>
>> CMake Support: http://cmake.org/cmake/help/support.html
>> CMake Consulting: http://cmake.org/cmake/help/consulting.html
>> CMake Training Courses: http://cmake.org/cmake/help/training.html
>>
>> Visit other Kitware open-source projects at
>> http://www.kitware.com/opensource/opensource.html
>>
>> Follow this link to subscribe/unsubscribe:
>> http://public.kitware.com/mailman/listinfo/cmake
>
>
>
>
>
> --
> View this message in context: 
> http://cmake.3232098.n2.nabble.com/Does-Makefile-generated-by-CMake-support-make-jN-tp7593949p7593951.html
> Sent from the CMake mailing list archive at Nabble.com.
> --
>
> Powered by www.kitware.com
>
> Please keep messages on-topic and check the CMake FAQ at: 
> http://www.cmake.org/Wiki/CMake_FAQ
>
> Kitware offers various services to support the CMake community. For more 
> information on each offering, please visit:
>
> CMake Support: http://cmake.org/cmake/help/support.html
> CMake Consulting: http://cmake.org/cmake/help/consulting.html
> CMake Training Courses: http://cmake.org/cmake/help/training.html
>
> Visit other Kitware open-source projects at 
> http://www.kitware.com/opensource/opensource.html
>
> Follow this link to subscribe/unsubscribe:
> http://public.kitware.com/mailman/listinfo/cmake



-- 
--
Andrew Melo
-- 

Powered by www.kitware.com

Please keep messages on-topic and check the CMake FAQ at: 
http://www.cmake.org/Wiki/CMake_FAQ

Kitware offers various services to support the CMake community. For more 
information on each offering, please visit:

CMake Support: http://cmake.org/cmake/help/support.html
CMake Consulting: http://cmake.org/cmake/help/consulting.html
CMake Training Courses: http://cmake.org/cmake/help/training.html

Visit other Kitware open-source projects at 
http://www.kitware.com/opensource/opensource.html

Follow this link to subscribe/unsubscribe:
http://public.kitware.com/mailman/listinfo/cmake


Re: Custom metrics

2016-07-11 Thread Andrew Melo


On Friday, June 10, 2016 at 5:42:14 PM UTC-5, Matt Stave wrote:
>
> You can add build parameters while the build's in flight, with 
> [jenkinsURL]/cli/command/set-build-parameter
> You should be able to refer to those in subsequent builds
>
> See also https://wiki.jenkins-ci.org/display/JENKINS/Plot+Plugin  (though 
> I've never used it)
>

This works great! Thanks.
 

>
> --- Matt
>
> On Sunday, May 22, 2016 at 9:22:27 PM UTC-7, Andrew Melo wrote:
>>
>> Hello, 
>>
>> I had two short questions about what was possible with pipeline jobs: 
>>
>> 1) Suppose I want to track some arbitrary values between builds and 
>> fail if they have decreased. Is there a way to stash integers/values 
>> between builds? For example, let's say I want to enforce that the lint 
>> of the codebase gets better with each commit or github pull request. 
>> Is there a step/command that could be used within groovy to extract 
>> the previous value to make the comparison? Or, am I stuck basically 
>> manually copying results to "magic" places on the master that later 
>> builds can suck down. 
>>
>> 2) Let's suppose that I want to publish a trend of these values as a 
>> graph on the main job page. I know there's the HTML publisher that can 
>> publish arbitrary pages, but that's buried another click down. Is 
>> there a way to have custom charts/HTML show up on the job/build pages 
>> themselves? 
>>
>> Thanks! 
>> Andrew 
>>
>> -- 
>> -- 
>> Andrew Melo 
>>
>

-- 
You received this message because you are subscribed to the Google Groups 
"Jenkins Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to jenkinsci-users+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/jenkinsci-users/a16c28fc-3037-4e10-b9de-8643e486d35c%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


New NPE with ghprb?

2016-07-11 Thread Andrew Melo
Hello all,

I have a repository that is configured to have GH send webhooks for
PRs, which trigger builds of a multibranch configuration. The most
recent updates I pulled causes the following to fire after the job
runs:

Jul 11, 2016 8:31:59 PM SEVERE
org.jenkinsci.plugins.ghprb.upstream.GhprbUpstreamStatusListener
returnEnvironmentVars
Unable to connect to GitHub repo
java.lang.NullPointerException
at org.kohsuke.github.GitHub.getRepository(GitHub.java:337)
at 
org.jenkinsci.plugins.ghprb.upstream.GhprbUpstreamStatusListener.returnEnvironmentVars(GhprbUpstreamStatusListener.java:52)
at 
org.jenkinsci.plugins.ghprb.upstream.GhprbUpstreamStatusListener.onCompleted(GhprbUpstreamStatusListener.java:115)
at 
org.jenkinsci.plugins.ghprb.upstream.GhprbUpstreamStatusListener.onCompleted(GhprbUpstreamStatusListener.java:33)
at hudson.model.listeners.RunListener.fireCompleted(RunListener.java:211)
at hudson.model.Run.execute(Run.java:1765)
at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43)
at hudson.model.ResourceController.execute(ResourceController.java:98)
at hudson.model.Executor.run(Executor.java:410)

 this causes the status on GH to never get updated, which prevents
any merging of code.

What can be done to get more information about this? Everything else
works up until that point, so I assume there's not a connectivity
problem between my host and GH, so I'm unsure of where to start
searching.

Thanks!
Andrew

-- 
--
Andrew Melo

-- 
You received this message because you are subscribed to the Google Groups 
"Jenkins Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to jenkinsci-users+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/jenkinsci-users/CAJY4aWFy1JbyFPLGaR3wqs3mx0tct5H%3DwSz-MFCKo0WBczuHaQ%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.


Re: [CMake] CPack: setting multiple RPM pre/post (un)install scripts

2016-05-29 Thread Andrew Melo
Hi

On Sunday, May 29, 2016, Thomas Teo  wrote:

> Hi All,
> In building an RPM package, I'd like to set multiple (un)install scripts -
> an application specific one which starts services that are installed by the
> RPM, and also to call ldconfig so that the shared library list is updated.
> The CMake/CPack documentation states this is supported (eg
> https://cmake.org/cmake/help/v3.0/module/CPackRPM.html):
>
>May be used to embed a pre (un)installation script in the spec file.
>The refered (sic) script file*(s)* will be read...
>(my emphasis)
>
> However the example doesn't indicate how one would specify multiple script
> files. I've tried several different methods:
>
>SET(CPACK_RPM_POST_INSTALL_SCRIPT_FILE
>"${CMAKE_SOURCE_DIR}/a.sh;${CMAKE_SOURCE_DIR}/b.sh" )
>SET(CPACK_RPM_POST_INSTALL_SCRIPT_FILE
>"${CMAKE_SOURCE_DIR}/a.sh,${CMAKE_SOURCE_DIR}/b.sh" )
>SET(CPACK_RPM_POST_INSTALL_SCRIPT_FILE "${CMAKE_SOURCE_DIR}/a.sh"
>"${CMAKE_SOURCE_DIR}/b.sh" )
>SET(CPACK_RPM_POST_INSTALL_SCRIPT_FILE "${CMAKE_SOURCE_DIR}/a.sh
>${CMAKE_SOURCE_DIR}/b.sh" )
>
>
> Which often result in the error:
>
>CMake Error at /usr/share/cmake-3.4/Modules/CPackRPM.cmake:1377 (if):
>   if given arguments:
>
> "EXISTS" "/home/thomas/src//a.sh"
>"/home/thomas/src//b.sh"
>
> Looking at the CMake code in the referenced file (CPackRPM.cmake) it
> appears that it could never handle multiple script files as there is no
> iteration.
> Is it possible the documentation is in error and only one script for each
> type of action (pre/post, un/install) can be specified?


FWIW, I make one script with all the desired actions and it works great.



>
> All assistance appreciated.
>
> Cheers,
> Thomas
>
> --
> Thomas Teo
> Australian Centre for Field Robotics
> The University of Sydney
>
> t@acfr.usyd.edu.au
> www.acfr.usyd.edu.au
>
> --
>
> Powered by www.kitware.com
>
> Please keep messages on-topic and check the CMake FAQ at:
> http://www.cmake.org/Wiki/CMake_FAQ
>
> Kitware offers various services to support the CMake community. For more
> information on each offering, please visit:
>
> CMake Support: http://cmake.org/cmake/help/support.html
> CMake Consulting: http://cmake.org/cmake/help/consulting.html
> CMake Training Courses: http://cmake.org/cmake/help/training.html
>
> Visit other Kitware open-source projects at
> http://www.kitware.com/opensource/opensource.html
>
> Follow this link to subscribe/unsubscribe:
> http://public.kitware.com/mailman/listinfo/cmake
>


-- 
It's dark in this basement.
-- 

Powered by www.kitware.com

Please keep messages on-topic and check the CMake FAQ at: 
http://www.cmake.org/Wiki/CMake_FAQ

Kitware offers various services to support the CMake community. For more 
information on each offering, please visit:

CMake Support: http://cmake.org/cmake/help/support.html
CMake Consulting: http://cmake.org/cmake/help/consulting.html
CMake Training Courses: http://cmake.org/cmake/help/training.html

Visit other Kitware open-source projects at 
http://www.kitware.com/opensource/opensource.html

Follow this link to subscribe/unsubscribe:
http://public.kitware.com/mailman/listinfo/cmake

Custom metrics

2016-05-22 Thread Andrew Melo
Hello,

I had two short questions about what was possible with pipeline jobs:

1) Suppose I want to track some arbitrary values between builds and
fail if they have decreased. Is there a way to stash integers/values
between builds? For example, let's say I want to enforce that the lint
of the codebase gets better with each commit or github pull request.
Is there a step/command that could be used within groovy to extract
the previous value to make the comparison? Or, am I stuck basically
manually copying results to "magic" places on the master that later
builds can suck down.

2) Let's suppose that I want to publish a trend of these values as a
graph on the main job page. I know there's the HTML publisher that can
publish arbitrary pages, but that's buried another click down. Is
there a way to have custom charts/HTML show up on the job/build pages
themselves?

Thanks!
Andrew

-- 
--
Andrew Melo

-- 
You received this message because you are subscribed to the Google Groups 
"Jenkins Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to jenkinsci-users+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/jenkinsci-users/CAJY4aWHCpRWdaigvsBSJnobD2MEE3%3DgJNB57C5Ftr3dHGc6aqA%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.


Re: Pipeline: "Branch not mergable"

2016-04-26 Thread Andrew Melo
Hi-

It appears to be an issue with the GH webhook firing before the 
mergeability is computed.

When that happens, GH gives "null" to the mergeability field, when Jenkins 
is expecting true/false. Then Jenkins treats the branch as unmergeable and 
refuses to build it.

I posted a comment to GH for the branch source and the GH Api plugins -- 
hopefully that will close the loop.

Thanks,
Andrew

On Monday, April 25, 2016 at 12:40:36 PM UTC-5, Andrew Melo wrote:
>
> Hi all, 
>
> Fairly often (~50% of the time), when I push a feature branch up to 
> GH, my multibranch configuration job will get the notification, poll 
> GH, then bomb with: 
>
> Checking pull request #PR-47 
> Not mergeable, skipping 
>
> Even though the branch is mergable. If I force a build manually, it bombs 
> with: 
>
> ERROR: Could not determine exact tip revision of PR-47; falling back 
> to nondeterministic checkout 
>  
> java.io.IOException: Cannot retrieve Git metadata for the build 
> at 
> org.jenkinsci.plugins.github.util.BuildDataHelper.getCommitSHA1(BuildDataHelper.java:34)
>  
>
> at 
> com.cloudbees.jenkins.GitHubCommitNotifier.updateCommitStatus(GitHubCommitNotifier.java:132)
>  
>
>
> This happens until I repeatedly force the branch indexing. Eventually 
> something clicks over, and jenkins is aware of the proper commit. 
>
> Now, I've seen JENKINS-34120, but this appears to be different. The 
> commit is mergeable against master (I'm the only one commiting to 
> master currently, no rebases of the feature branch, etc..), and 
> there's no other status tests that would be keeping it from building. 
> It seems like the actual commit notification is getting scrambled. 
>
> Has anyone else seen this? 
> Cheers, 
> Andrew 
>
> -- 
> -- 
> Andrew Melo 
>

-- 
You received this message because you are subscribed to the Google Groups 
"Jenkins Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to jenkinsci-users+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/jenkinsci-users/928cbf90-05fe-4a8d-9e5f-ef5e6179d1a4%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Re: Trigger arbitrary build/post-build steps from pipeline

2016-04-25 Thread Andrew Melo
ping? Is the shiny new pipeline functionality simply incompatible with all 
of the existing plugins?

On Thursday, April 21, 2016 at 2:34:00 PM UTC-5, Andrew Melo wrote:
>
> Hi,
>
> I've dug a bit more, perhaps that will help find a solution. It appears 
> the "step" groovy function wants a class that implements SimpleBuildStep, 
> but the plugins I'm looking at implements/extends hudson.tasks.Recorder. 
> I'm confused about the issue. If I look at the javadoc:
>
> http://javadoc.jenkins-ci.org/hudson/tasks/Recorder.html
>
> I see that Recorder implements BuildStep, but not SimpleBuildStep. But 
> (and my java is rusty, so forgive the terminology), since BuildStep is a 
> superinterface of SimpleBuild step, shouldn't it "count" for when step() 
> searches for a class implementing the interface?
>
> Thanks
> Andrew
>
> On Sunday, April 17, 2016 at 8:24:43 PM UTC-5, Andrew Melo wrote:
>>
>> Hi again,
>>
>> I poked at it some more and it appears I'll have to explicitly change the 
>> plugins I want to use in a pipeline. The following:
>>
>> step([$class: 'TapPublisher', testResults: 'tap.log']) 
>>
>> yields the following error:
>>
>> no known implementation of interface jenkins.tasks.SimpleBuildStep is 
>> named TapPublisher
>>
>> Or, perhaps a new groovy step that consumes classes of type 'Recorder' is 
>> the better idea?
>>
>> Thanks again!
>> Andrew
>>
>> On Sunday, April 17, 2016 at 1:40:28 PM UTC-5, Andrew Melo wrote:
>>>
>>> Hello all,
>>>
>>> I would like to trigger build/post-build steps that don't appear in the 
>>> snippet viewer (e.g. publish TAP results) from a pipeline. Is there a way 
>>> to trigger arbitrary build/post-build steps that don't appear in the 
>>> snippet browser, or will the plugin need to be updated to explicitly 
>>> support being triggered from pipelines?
>>>
>>> Thanks!
>>> Andrew
>>>
>>

-- 
You received this message because you are subscribed to the Google Groups 
"Jenkins Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to jenkinsci-users+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/jenkinsci-users/cbb83db6-22f6-4d8a-be19-1fb33fe05bf7%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


Pipeline: "Branch not mergable"

2016-04-25 Thread Andrew Melo
Hi all,

Fairly often (~50% of the time), when I push a feature branch up to
GH, my multibranch configuration job will get the notification, poll
GH, then bomb with:

Checking pull request #PR-47
Not mergeable, skipping

Even though the branch is mergable. If I force a build manually, it bombs with:

ERROR: Could not determine exact tip revision of PR-47; falling back
to nondeterministic checkout

java.io.IOException: Cannot retrieve Git metadata for the build
at 
org.jenkinsci.plugins.github.util.BuildDataHelper.getCommitSHA1(BuildDataHelper.java:34)
at 
com.cloudbees.jenkins.GitHubCommitNotifier.updateCommitStatus(GitHubCommitNotifier.java:132)

This happens until I repeatedly force the branch indexing. Eventually
something clicks over, and jenkins is aware of the proper commit.

Now, I've seen JENKINS-34120, but this appears to be different. The
commit is mergeable against master (I'm the only one commiting to
master currently, no rebases of the feature branch, etc..), and
there's no other status tests that would be keeping it from building.
It seems like the actual commit notification is getting scrambled.

Has anyone else seen this?
Cheers,
Andrew

-- 
--
Andrew Melo

-- 
You received this message because you are subscribed to the Google Groups 
"Jenkins Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to jenkinsci-users+unsubscr...@googlegroups.com.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/jenkinsci-users/CAJY4aWHmAscJ4gPkMcjuOB9Zbw3za6ro4owMdrTtB31%3DossjZA%40mail.gmail.com.
For more options, visit https://groups.google.com/d/optout.


  1   2   3   4   >