Re: [Spark Sql] Global Setting for Case-Insensitive String Compare

2022-11-21 Thread Andrew Melo
I think this is the right place, just a hard question :) As far as I
know, there's no "case insensitive flag", so YMMV

On Mon, Nov 21, 2022 at 5:40 PM Patrick Tucci  wrote:
>
> Is this the wrong list for this type of question?
>
> On 2022/11/12 16:34:48 Patrick Tucci wrote:
>  > Hello,
>  >
>  > Is there a way to set string comparisons to be case-insensitive
> globally? I
>  > understand LOWER() can be used, but my codebase contains 27k lines of SQL
>  > and many string comparisons. I would need to apply LOWER() to each string
>  > literal in the code base. I'd also need to change all the ETL/import code
>  > to apply LOWER() to each string value on import.
>  >
>  > Current behavior:
>  >
>  > SELECT 'ABC' = 'abc';
>  > false
>  > Time taken: 5.466 seconds, Fetched 1 row(s)
>  >
>  > SELECT 'ABC' IN ('AbC', 'abc');
>  > false
>  > Time taken: 5.498 seconds, Fetched 1 row(s)
>  >
>  > SELECT 'ABC' like 'Ab%'
>  > false
>  > Time taken: 5.439 seconds, Fetched 1 row(s)
>  >
>  > Desired behavior would be true for all of the above with the proposed
>  > case-insensitive flag set.
>  >
>  > Thanks,
>  >
>  > Patrick
>  >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Profiling PySpark Pandas UDF

2022-08-25 Thread Andrew Melo
Hi Gourav,

Since Koalas needs the same round-trip to/from JVM and Python, I
expect that the performance should be nearly the same for UDFs in
either API

Cheers
Andrew

On Thu, Aug 25, 2022 at 11:22 AM Gourav Sengupta
 wrote:
>
> Hi,
>
> May be I am jumping to conclusions and making stupid guesses, but have you 
> tried koalas now that it is natively integrated with pyspark??
>
> Regards
> Gourav
>
> On Thu, 25 Aug 2022, 11:07 Subash Prabanantham,  
> wrote:
>>
>> Hi All,
>>
>> I was wondering if we have any best practices on using pandas UDF ? 
>> Profiling UDF is not an easy task and our case requires some drilling down 
>> on the logic of the function.
>>
>>
>> Our use case:
>> We are using func(Dataframe) => Dataframe as interface to use Pandas UDF, 
>> while running locally only the function, it runs faster but when executed in 
>> Spark environment - the processing time is more than expected. We have one 
>> column where the value is large (BinaryType -> 600KB), wondering whether 
>> this could make the Arrow computation slower ?
>>
>> Is there any profiling or best way to debug the cost incurred using pandas 
>> UDF ?
>>
>>
>> Thanks,
>> Subash
>>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



PySpark cores

2022-07-28 Thread Andrew Melo
Hello,

Is there a way to tell Spark that PySpark (arrow) functions use
multiple cores? If we have an executor with 8 cores, we would like to
have a single PySpark function use all 8 cores instead of having 8
single core python functions run.

Thanks!
Andrew

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [EXTERNAL] RDD.pipe() for binary data

2022-07-16 Thread Andrew Melo
I'm curious about using shared memory to speed up the JVM->Python round
trip. Is there any sane way to do anonymous shared memory in Java/scale?

On Sat, Jul 16, 2022 at 16:10 Sebastian Piu  wrote:

> Other alternatives are to look at how PythonRDD does it in spark, you
> could also try to go for a more traditional setup where you expose your
> python functions behind a local/remote service and call that from scala -
> say over thrift/grpc/http/local socket etc.
> Another option, but I've never done it so I'm not sure if it would work,
> is to maybe look if arrow could help by sharing a piece of memory with the
> data you need from scala and then read it from python
>
> On Sat, Jul 16, 2022 at 9:56 PM Sean Owen  wrote:
>
>> Use GraphFrames?
>>
>> On Sat, Jul 16, 2022 at 3:54 PM Yuhao Zhang 
>> wrote:
>>
>>> Hi Shay,
>>>
>>> Thanks for your reply! I would very much like to use pyspark. However,
>>> my project depends on GraphX, which is only available in the Scala API as
>>> far as I know. So I'm locked with Scala and trying to find a way out. I
>>> wonder if there's a way to go around it.
>>>
>>> Best regards,
>>> Yuhao Zhang
>>>
>>>
>>> On Sun, Jul 10, 2022 at 5:36 AM Shay Elbaz  wrote:
>>>
 Yuhao,


 You can use pyspark as entrypoint to your application. With py4j you
 can call Java/Scala functions from the python application. There's no need
 to use the pipe() function for that.


 Shay
 --
 *From:* Yuhao Zhang 
 *Sent:* Saturday, July 9, 2022 4:13:42 AM
 *To:* user@spark.apache.org
 *Subject:* [EXTERNAL] RDD.pipe() for binary data


 *ATTENTION:* This email originated from outside of GM.


 Hi All,

 I'm currently working on a project involving transferring between
 Spark 3.x (I use Scala) and a Python runtime. In Spark, data is stored in
 an RDD as floating-point number arrays/vectors and I have custom routines
 written in Python to process them. On the Spark side, I also have some
 operations specific to Spark Scala APIs, so I need to use both runtimes.

 Now to achieve data transfer I've been using the RDD.pipe() API, by 1.
 converting the arrays to strings in Spark and calling RDD.pipe(script.py)
 2. Then Python receives the strings and casts them as Python's data
 structures and conducts operations. 3. Python converts the arrays into
 strings and prints them back to Spark. 4. Spark gets the strings and cast
 them back as arrays.

 Needless to say, this feels unnatural and slow to me, and there are
 some potential floating-point number precision issues, as I think the
 floating number arrays should have been transmitted as raw bytes. I found
 no way to use the RDD.pipe() for this purpose, as written in
 https://github.com/apache/spark/blob/3331d4ccb7df9aeb1972ed86472269a9dbd261ff/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala#L139,
 .pipe() seems to be locked with text-based streaming.

 Can anyone shed some light on how I can achieve this? I'm trying to
 come up with a way that does not involve modifying the core Spark myself.
 One potential solution I can think of is saving/loading the RDD as binary
 files but I'm hoping to find a streaming-based solution. Any help is much
 appreciated, thanks!


 Best regards,
 Yuhao

>>> --
It's dark in this basement.


Re: How is union() implemented? Need to implement column bind

2022-04-20 Thread Andrew Melo
It would certainly be useful for our domain to have some sort of native
cbind(). Is there a fundamental disapproval of adding that functionality,
or is it just a matter of nobody implementing it?

On Wed, Apr 20, 2022 at 16:28 Sean Owen  wrote:

> Good lead, pandas on Spark concat() is worth trying. It looks like it uses
> a join, but not 100% sure from the source.
> The SQL concat() function is indeed a different thing.
>
> On Wed, Apr 20, 2022 at 3:24 PM Bjørn Jørgensen 
> wrote:
>
>> Sorry for asking. But why does`t concat work?
>>
>> Pandas on spark have ps.concat
>> 
>>  which
>> takes 2 dataframes and concat them to 1 dataframe.
>> It seems
>> 
>> like the pyspark version takes 2 columns and concat it to one column.
>>
>> ons. 20. apr. 2022 kl. 21:04 skrev Sean Owen :
>>
>>> cbind? yeah though the answer is typically a join. I don't know if
>>> there's a better option in a SQL engine, as SQL doesn't have anything to
>>> offer except join and pivot either (? right?)
>>> Certainly, the dominant data storage paradigm is wide tables, whereas
>>> you're starting with effectively a huge number of tiny slim tables, which
>>> is the impedance mismatch here.
>>>
>>> On Wed, Apr 20, 2022 at 1:51 PM Andrew Davidson 
>>> wrote:
>>>
 Thanks Sean



 I imagine this is a fairly common problem in data science. Any idea how
 other solve?  For example I wonder if running join something like BigQuery
 might work better? I do not know much about the implementation.



 No one tool will  solve all problems. Once I get the matrix I think it
 spark will work well for our need



 Kind regards



 Andy



 *From: *Sean Owen 
 *Date: *Monday, April 18, 2022 at 6:58 PM
 *To: *Andrew Davidson 
 *Cc: *"user @spark" 
 *Subject: *Re: How is union() implemented? Need to implement column
 bind



 A join is the natural answer, but this is a 10114-way join, which
 probably chokes readily just to even plan it, let alone all the shuffling
 and shuffling of huge data. You could tune your way out of it maybe, but
 not optimistic. It's just huge.



 You could go off-road and lower-level to take advantage of the
 structure of the data. You effectively want "column bind". There is no such
 operation in Spark. (union is 'row bind'.) You could do this with
 zipPartition, which is in the RDD API, and to my surprise, not in the
 Python API but exists in Scala. And R (!). If you can read several RDDs of
 data, you can use this method to pair all their corresponding values and
 ultimately get rows of 10114 values out. In fact that is how sparklyr
 implements cbind on Spark, FWIW:
 https://rdrr.io/cran/sparklyr/man/sdf_fast_bind_cols.html



 The issue I see is that you can only zip a few at a time; you don't
 want to zip 10114 of them. Perhaps you have to do that iteratively, and I
 don't know if that is going to face the same issues with huge huge plans.



 I like the pivot idea. If you can read the individual files as data
 rows (maybe list all the file names, parallelize with Spark, write a UDF
 that reads the data for that file to generate the rows). If you can emit
 (file, index, value) and groupBy index, pivot on file (I think?) that
 should be about it? I think it doesn't need additional hashing or whatever.
 Not sure how fast it is but that seems more direct than the join, as well.



 On Mon, Apr 18, 2022 at 8:27 PM Andrew Davidson
  wrote:

 Hi have a hard problem



 I have  10114 column vectors each in a separate file. The file has 2
 columns, the row id, and numeric values. The row ids are identical and in
 sort order. All the column vectors have the same number of rows. There are
 over 5 million rows.  I need to combine them into a single table. The row
 ids are very long strings. The column names are about 20 chars long.



 My current implementation uses join. This takes a long time on a
 cluster with 2 works totaling 192 vcpu and 2.8 tb of memory. It often
 crashes. I mean totally dead start over. Checkpoints do not seem  help, It
 still crashes and need to be restarted from scratch. What is really
 surprising is the final file size is only 213G ! The way got the file
  was to copy all the column vectors to a single BIG IRON machine and used
 unix cut and paste. Took about 44 min to run once I got all the data moved
 around. It was very tedious and error prone. I had to move a lot data
 around. Not a particularly reproducible 

Re: Grabbing the current MemoryManager in a plugin

2022-04-13 Thread Andrew Melo
Hello,

Any wisdom on the question below?

Thanks
Andrew

On Fri, Apr 8, 2022 at 16:04 Andrew Melo  wrote:

> Hello,
>
> I've implemented support for my DSv2 plugin to back its storage with
> ArrowColumnVectors, which necessarily means using off-heap memory. Is
> it possible to somehow grab either a reference to the current
> MemoryManager so that the off-heap memory usage is properly accounted
> for and to prevent inadvertently OOM-ing the system?
>
> Thanks
> Andrew
>
-- 
It's dark in this basement.


Re: cannot access class sun.nio.ch.DirectBuffer

2022-04-13 Thread Andrew Melo
Gotcha. Seeing as there's a lot of large projects who used the unsafe API
either directly or indirectly (via netty, etc..) it's a bit surprising that
it was so thoroughly closed off without an escape hatch, but I'm sure there
was a lively discussion around it...

Cheers
Andrew

On Wed, Apr 13, 2022 at 09:07 Sean Owen  wrote:

> It is intentionally closed by the JVM going forward, as direct access is
> discouraged. But it's still necessary for Spark. In some cases, like direct
> mem access, there is a new API but it's in Java 17 I think, and we can't
> assume Java 17 any time soon.
>
> On Wed, Apr 13, 2022 at 9:05 AM Andrew Melo  wrote:
>
>> Hi Sean,
>>
>> Out of curiosity, will Java 11+ always require special flags to access
>> the unsafe direct memory interfaces, or is this something that will either
>> be addressed by the spec (by making an "approved" interface) or by
>> libraries (with some other workaround)?
>>
>> Thanks
>> Andrew
>>
>> On Tue, Apr 12, 2022 at 08:45 Sean Owen  wrote:
>>
>>> In Java 11+, you will need to tell the JVM to allow access to internal
>>> packages in some cases, for any JVM application. You will need flags like
>>> "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", which you can see in
>>> the pom.xml file for the project.
>>>
>>> Spark 3.2 does not necessarily work with Java 17 (3.3 should have
>>> support), but it may well work after you address those flags.
>>>
>>> On Tue, Apr 12, 2022 at 7:05 AM Arunachalam Sibisakkaravarthi <
>>> arunacha...@mcruncher.com> wrote:
>>>
>>>> Hi guys,
>>>>
>>>> spark-sql_2.12:3.2.1 is used in our application.
>>>>
>>>> It throws following exceptions when the app runs using JRE17
>>>>
>>>> java.lang.IllegalAccessError: class org.apache.spark.storage.StorageUtils$ 
>>>> (in unnamed module @0x451f1bd4) cannot access class 
>>>> sun.nio.ch.DirectBuffer (in module java.base) because module java.base 
>>>> does not export sun.nio.ch to unnamed module @0x451f1bd43at 
>>>> org.apache.spark.storage.StorageUtils$.(StorageUtils.scala:213)4 
>>>>   at org.apache.spark.storage.StorageUtils$.(StorageUtils.scala)5 
>>>> at 
>>>> org.apache.spark.storage.BlockManagerMasterEndpoint.(BlockManagerMasterEndpoint.scala:110)6
>>>> at org.apache.spark.SparkEnv$.$anonfun$create$9(SparkEnv.scala:348)7   
>>>>  at 
>>>> org.apache.spark.SparkEnv$.registerOrLookupEndpoint$1(SparkEnv.scala:287)8 
>>>>   at org.apache.spark.SparkEnv$.create(SparkEnv.scala:336)9   at 
>>>> org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:191)10 at 
>>>> org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:277)11 
>>>>   at org.apache.spark.SparkContext.(SparkContext.scala:460)12   
>>>> at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2690)13   
>>>>  at 
>>>> org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:949)14
>>>>at scala.Option.getOrElse(Option.scala:189)15   at 
>>>> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:943)
>>>>
>>>> How do we fix this?
>>>>
>>>>
>>>>
>>>>
>>>> *Thanks And RegardsSibi.ArunachalammCruncher*
>>>>
>>> --
>> It's dark in this basement.
>>
> --
It's dark in this basement.


Re: cannot access class sun.nio.ch.DirectBuffer

2022-04-13 Thread Andrew Melo
Hi Sean,

Out of curiosity, will Java 11+ always require special flags to access the
unsafe direct memory interfaces, or is this something that will either be
addressed by the spec (by making an "approved" interface) or by libraries
(with some other workaround)?

Thanks
Andrew

On Tue, Apr 12, 2022 at 08:45 Sean Owen  wrote:

> In Java 11+, you will need to tell the JVM to allow access to internal
> packages in some cases, for any JVM application. You will need flags like
> "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", which you can see in the
> pom.xml file for the project.
>
> Spark 3.2 does not necessarily work with Java 17 (3.3 should have
> support), but it may well work after you address those flags.
>
> On Tue, Apr 12, 2022 at 7:05 AM Arunachalam Sibisakkaravarthi <
> arunacha...@mcruncher.com> wrote:
>
>> Hi guys,
>>
>> spark-sql_2.12:3.2.1 is used in our application.
>>
>> It throws following exceptions when the app runs using JRE17
>>
>> java.lang.IllegalAccessError: class org.apache.spark.storage.StorageUtils$ 
>> (in unnamed module @0x451f1bd4) cannot access class sun.nio.ch.DirectBuffer 
>> (in module java.base) because module java.base does not export sun.nio.ch to 
>> unnamed module @0x451f1bd43  at 
>> org.apache.spark.storage.StorageUtils$.(StorageUtils.scala:213)4   
>> at org.apache.spark.storage.StorageUtils$.(StorageUtils.scala)5 at 
>> org.apache.spark.storage.BlockManagerMasterEndpoint.(BlockManagerMasterEndpoint.scala:110)6
>> at org.apache.spark.SparkEnv$.$anonfun$create$9(SparkEnv.scala:348)7
>> at 
>> org.apache.spark.SparkEnv$.registerOrLookupEndpoint$1(SparkEnv.scala:287)8   
>> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:336)9   at 
>> org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:191)10 at 
>> org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:277)11   
>> at org.apache.spark.SparkContext.(SparkContext.scala:460)12   at 
>> org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2690)13
>> at 
>> org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:949)14
>>at scala.Option.getOrElse(Option.scala:189)15   at 
>> org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:943)
>>
>> How do we fix this?
>>
>>
>>
>>
>> *Thanks And RegardsSibi.ArunachalammCruncher*
>>
> --
It's dark in this basement.


Grabbing the current MemoryManager in a plugin

2022-04-08 Thread Andrew Melo
Hello,

I've implemented support for my DSv2 plugin to back its storage with
ArrowColumnVectors, which necessarily means using off-heap memory. Is
it possible to somehow grab either a reference to the current
MemoryManager so that the off-heap memory usage is properly accounted
for and to prevent inadvertently OOM-ing the system?

Thanks
Andrew

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Merge two dataframes

2021-05-17 Thread Andrew Melo
On Mon, May 17, 2021 at 2:31 PM Lalwani, Jayesh  wrote:
>
> If the UDFs are computationally expensive, I wouldn't solve this problem with 
>  UDFs at all. If they are working in an iterative manner, and assuming each 
> iteration is independent of other iterations (yes, I know that's a big 
> assumptiuon), I would think about exploding your dataframe to have a row per 
> iteration, and working on each row separately, and then aggregating in the 
> end. This allows you to scale your computation much better.

Ah, in this case, I mean "iterative" in the sense of the
"code/run/examine" sense of the word, not that the UDF itself is
performing an iterative computation.

>
> I know not all computations can be map-reducable like that. However, most can.
>
> Split and merge data workflows in Spark don't work like their DAG 
> representations, unless you add costly caches. Without caching, each split 
> will result in Spark rereading data from the source, even if the splits are 
> getting merged together. The only way to avoid it is by caching at the split 
> point, which depending on the amount of data can become costly. Also, joins 
> result in shuffles. Avoiding splits and merges is better.
>
> To give you an example, we had an application that applied a series of rules 
> to rows. The output required was a dataframe with an additional column that 
> indicated which rule the row satisfied. In our initial implementation, we had 
> a series of r one per rule. For N rules, we created N dataframes that had the 
> rows that satisfied the rules. The we unioned the N data frames. Horrible 
> performance that didn't scale with N. We reimplemented to add N Boolean 
> columns; one per rule; that indicated if the rule was satisfied. We just kept 
> adding the boolen columns to the dataframe. After iterating over the rules, 
> we added another column that indicated out which rule was satisfied, and then 
> dropped the Boolean columns. Much better performance that scaled with N. 
> Spark read from datasource just once, and since there were no joins/unions, 
> there was no shuffle

The hitch in your example, and what we're trying to avoid, is that if
you need to change one of these boolean columns, you end up needing to
recompute everything "afterwards" in the DAG (AFAICT), even if the
"latter" stages don't have a true dependency on the changed column. We
do explorations of very large physics datasets, and one of the
disadvantages of our bespoke analysis software is that any change to
the analysis code involves re-computing everything from scratch. A big
goal of mine is to make it so that what was changed is recomputed, and
no more, which will speed up the rate at which we can find new
physics.

Cheers
Andrew

>
> On 5/17/21, 2:56 PM, "Andrew Melo"  wrote:
>
> CAUTION: This email originated from outside of the organization. Do not 
> click links or open attachments unless you can confirm the sender and know 
> the content is safe.
>
>
>
> In our case, these UDFs are quite expensive and worked on in an
> iterative manner, so being able to cache the two "sides" of the graphs
> independently will speed up the development cycle. Otherwise, if you
> modify foo() here, then you have to recompute bar and baz, even though
> they're unchanged.
>
> df.withColumn('a', foo('x')).withColumn('b', bar('x')).withColumn('c', 
> baz('x'))
>
> Additionally, a longer goal would be to be able to persist/cache these
> columns to disk so a downstream user could later mix and match several
> (10s) of these columns together as their inputs w/o having to
> explicitly compute them themselves.
>
>     Cheers
> Andrew
>
> On Mon, May 17, 2021 at 1:10 PM Sean Owen  wrote:
> >
> > Why join here - just add two columns to the DataFrame directly?
> >
> > On Mon, May 17, 2021 at 1:04 PM Andrew Melo  
> wrote:
> >>
> >> Anyone have ideas about the below Q?
> >>
> >> It seems to me that given that "diamond" DAG, that spark could see
> >> that the rows haven't been shuffled/filtered, it could do some type of
> >> "zip join" to push them together, but I've not been able to get a plan
> >> that doesn't do a hash/sort merge join
> >>
> >> Cheers
> >> Andrew
> >>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Merge two dataframes

2021-05-17 Thread Andrew Melo
In our case, these UDFs are quite expensive and worked on in an
iterative manner, so being able to cache the two "sides" of the graphs
independently will speed up the development cycle. Otherwise, if you
modify foo() here, then you have to recompute bar and baz, even though
they're unchanged.

df.withColumn('a', foo('x')).withColumn('b', bar('x')).withColumn('c', baz('x'))

Additionally, a longer goal would be to be able to persist/cache these
columns to disk so a downstream user could later mix and match several
(10s) of these columns together as their inputs w/o having to
explicitly compute them themselves.

Cheers
Andrew

On Mon, May 17, 2021 at 1:10 PM Sean Owen  wrote:
>
> Why join here - just add two columns to the DataFrame directly?
>
> On Mon, May 17, 2021 at 1:04 PM Andrew Melo  wrote:
>>
>> Anyone have ideas about the below Q?
>>
>> It seems to me that given that "diamond" DAG, that spark could see
>> that the rows haven't been shuffled/filtered, it could do some type of
>> "zip join" to push them together, but I've not been able to get a plan
>> that doesn't do a hash/sort merge join
>>
>> Cheers
>> Andrew
>>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Merge two dataframes

2021-05-17 Thread Andrew Melo
Anyone have ideas about the below Q?

It seems to me that given that "diamond" DAG, that spark could see
that the rows haven't been shuffled/filtered, it could do some type of
"zip join" to push them together, but I've not been able to get a plan
that doesn't do a hash/sort merge join

Cheers
Andrew

On Wed, May 12, 2021 at 11:32 AM Andrew Melo  wrote:
>
> Hi,
>
> In the case where the left and right hand side share a common parent like:
>
> df = spark.read.someDataframe().withColumn('rownum', row_number())
> df1 = df.withColumn('c1', expensive_udf1('foo')).select('c1', 'rownum')
> df2 = df.withColumn('c2', expensive_udf2('bar')).select('c2', 'rownum')
> df_joined = df1.join(df2, 'rownum', 'inner')
>
> (or maybe replacing row_number() with monotonically_increasing_id())
>
> Is there some hint/optimization that can be done to let Spark know
> that the left and right hand-sides of the join share the same
> ordering, and a sort/hash merge doesn't need to be done?
>
> Thanks
> Andrew
>
> On Wed, May 12, 2021 at 11:07 AM Sean Owen  wrote:
> >
> > Yeah I don't think that's going to work - you aren't guaranteed to get 1, 
> > 2, 3, etc. I think row_number() might be what you need to generate a join 
> > ID.
> >
> > RDD has a .zip method, but (unless I'm forgetting!) DataFrame does not. You 
> > could .zip two RDDs you get from DataFrames and manually convert the Rows 
> > back to a single Row and back to DataFrame.
> >
> >
> > On Wed, May 12, 2021 at 10:47 AM kushagra deep  
> > wrote:
> >>
> >> Thanks Raghvendra
> >>
> >> Will the ids for corresponding columns  be same always ? Since 
> >> monotonic_increasing_id() returns a number based on partitionId and the 
> >> row number of the partition  ,will it be same for corresponding columns? 
> >> Also is it guaranteed that the two dataframes will be divided into logical 
> >> spark partitions with the same cardinality for each partition ?
> >>
> >> Reg,
> >> Kushagra Deep
> >>
> >> On Wed, May 12, 2021, 21:00 Raghavendra Ganesh  
> >> wrote:
> >>>
> >>> You can add an extra id column and perform an inner join.
> >>>
> >>> val df1_with_id = df1.withColumn("id", monotonically_increasing_id())
> >>>
> >>> val df2_with_id = df2.withColumn("id", monotonically_increasing_id())
> >>>
> >>> df1_with_id.join(df2_with_id, Seq("id"), "inner").drop("id").show()
> >>>
> >>> +-+-+
> >>>
> >>> |amount_6m|amount_9m|
> >>>
> >>> +-+-+
> >>>
> >>> |  100|  500|
> >>>
> >>> |  200|  600|
> >>>
> >>> |  300|  700|
> >>>
> >>> |  400|  800|
> >>>
> >>> |  500|  900|
> >>>
> >>> +-+-+
> >>>
> >>>
> >>> --
> >>> Raghavendra
> >>>
> >>>
> >>> On Wed, May 12, 2021 at 6:20 PM kushagra deep  
> >>> wrote:
> >>>>
> >>>> Hi All,
> >>>>
> >>>> I have two dataframes
> >>>>
> >>>> df1
> >>>>
> >>>> amount_6m
> >>>>  100
> >>>>  200
> >>>>  300
> >>>>  400
> >>>>  500
> >>>>
> >>>> And a second data df2 below
> >>>>
> >>>>  amount_9m
> >>>>   500
> >>>>   600
> >>>>   700
> >>>>   800
> >>>>   900
> >>>>
> >>>> The number of rows is same in both dataframes.
> >>>>
> >>>> Can I merge the two dataframes to achieve below df
> >>>>
> >>>> df3
> >>>>
> >>>> amount_6m | amount_9m
> >>>> 100   500
> >>>>  200  600
> >>>>  300  700
> >>>>  400  800
> >>>>  500  900
> >>>>
> >>>> Thanks in advance
> >>>>
> >>>> Reg,
> >>>> Kushagra Deep
> >>>>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Merge two dataframes

2021-05-12 Thread Andrew Melo
Hi,

In the case where the left and right hand side share a common parent like:

df = spark.read.someDataframe().withColumn('rownum', row_number())
df1 = df.withColumn('c1', expensive_udf1('foo')).select('c1', 'rownum')
df2 = df.withColumn('c2', expensive_udf2('bar')).select('c2', 'rownum')
df_joined = df1.join(df2, 'rownum', 'inner')

(or maybe replacing row_number() with monotonically_increasing_id())

Is there some hint/optimization that can be done to let Spark know
that the left and right hand-sides of the join share the same
ordering, and a sort/hash merge doesn't need to be done?

Thanks
Andrew

On Wed, May 12, 2021 at 11:07 AM Sean Owen  wrote:
>
> Yeah I don't think that's going to work - you aren't guaranteed to get 1, 2, 
> 3, etc. I think row_number() might be what you need to generate a join ID.
>
> RDD has a .zip method, but (unless I'm forgetting!) DataFrame does not. You 
> could .zip two RDDs you get from DataFrames and manually convert the Rows 
> back to a single Row and back to DataFrame.
>
>
> On Wed, May 12, 2021 at 10:47 AM kushagra deep  
> wrote:
>>
>> Thanks Raghvendra
>>
>> Will the ids for corresponding columns  be same always ? Since 
>> monotonic_increasing_id() returns a number based on partitionId and the row 
>> number of the partition  ,will it be same for corresponding columns? Also is 
>> it guaranteed that the two dataframes will be divided into logical spark 
>> partitions with the same cardinality for each partition ?
>>
>> Reg,
>> Kushagra Deep
>>
>> On Wed, May 12, 2021, 21:00 Raghavendra Ganesh  
>> wrote:
>>>
>>> You can add an extra id column and perform an inner join.
>>>
>>> val df1_with_id = df1.withColumn("id", monotonically_increasing_id())
>>>
>>> val df2_with_id = df2.withColumn("id", monotonically_increasing_id())
>>>
>>> df1_with_id.join(df2_with_id, Seq("id"), "inner").drop("id").show()
>>>
>>> +-+-+
>>>
>>> |amount_6m|amount_9m|
>>>
>>> +-+-+
>>>
>>> |  100|  500|
>>>
>>> |  200|  600|
>>>
>>> |  300|  700|
>>>
>>> |  400|  800|
>>>
>>> |  500|  900|
>>>
>>> +-+-+
>>>
>>>
>>> --
>>> Raghavendra
>>>
>>>
>>> On Wed, May 12, 2021 at 6:20 PM kushagra deep  
>>> wrote:

 Hi All,

 I have two dataframes

 df1

 amount_6m
  100
  200
  300
  400
  500

 And a second data df2 below

  amount_9m
   500
   600
   700
   800
   900

 The number of rows is same in both dataframes.

 Can I merge the two dataframes to achieve below df

 df3

 amount_6m | amount_9m
 100   500
  200  600
  300  700
  400  800
  500  900

 Thanks in advance

 Reg,
 Kushagra Deep


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark DataFrame Creation

2020-07-22 Thread Andrew Melo
Hi Mark,

On Wed, Jul 22, 2020 at 4:49 PM Mark Bidewell  wrote:
>
> Sorry if this is the wrong place for this.  I am trying to debug an issue 
> with this library:
> https://github.com/springml/spark-sftp
>
> When I attempt to create a dataframe:
>
> spark.read.
> format("com.springml.spark.sftp").
> option("host", "...").
> option("username", "...").
> option("password", "...").
> option("fileType", "csv").
> option("inferSchema", "true").
> option("tempLocation","/srv/spark/tmp").
> option("hdfsTempLocation","/srv/spark/tmp");
>  .load("...")
>
> What I am seeing is that the download is occurring on the spark driver not 
> the spark worker,  This leads to a failure when spark tries to create the 
> DataFrame on the worker.
>
> I'm confused by the behavior.  my understanding was that load() was lazily 
> executed on the Spark worker.  Why would some elements be executing on the 
> driver?

Looking at the code, it appears that your sftp plugin downloads the
file to a local location and opens from there.

https://github.com/springml/spark-sftp/blob/090917547001574afa93cddaf2a022151a3f4260/src/main/scala/com/springml/spark/sftp/DefaultSource.scala#L38

You may have more luck with an sftp hadoop filesystem plugin that can
read sftp:// URLs directly.

Cheers
Andrew
>
> Thanks for your help
> --
> Mark Bidewell
> http://www.linkedin.com/in/markbidewell

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



PySpark aggregation w/pandas_udf

2020-07-15 Thread Andrew Melo
Hi all,

For our use case, we would like to perform an aggregation using a
pandas_udf with dataframes that have O(100m) rows and a few 10s of
columns. Conceptually, this looks a bit like pyspark.RDD.aggregate,
where the user provides:

* A "seqOp" which accepts pandas series(*) and outputs an intermediate output
* A "combOp" which combines the intermediate outputs into a final output

There's no direct DataFrame equivalent to RDD.aggregate(), but you can
somewhat emulate the functionality with

df.groupBy().applyInPandas(seqOp).agg(combOp)

However, it seems like using groupBy() w/o any columns isn't the
intended use. The docs for groupBy().applyInPandas() has the following
note:

> Note This function requires a full shuffle. All the data of a group will be 
> loaded into > memory, so the user should be aware of the potential OOM risk 
> if data is skewed > and certain groups are too large to fit in memory.

The Spark SQL guide has the following note as well:

> The configuration for maxRecordsPerBatch is not applied on groups and it is 
> up to > the user to ensure that the grouped data will fit into the available 
> memory.

Since we want to perform this aggregation over the entire DataFrame,
we end up with one group who is entirely loaded into memory which
immediately OOMs (requiring a shuffle probably doesn't help either).

To work around this, we make smaller groups by repartitioning, adding
a new column with the partition ID, then do the groupBy against that
column to make smaller groups, but that's not great -- it's a lot of
extra data movement.

Am I missing something obvious? Or is this simply a part of the API
that's not fleshed out yet?

Thanks
Andrew


* Unfortunately, Pandas' data model is less rich than spark/arrow/our
code, so the JVM composes an arrow stream and transmits it to the
python worker who then converts it to pandas before passing it to the
UDF. We then have to undo the conversion to get the original data
back. It'd be nice to have more control over that intermediate
conversion.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: REST Structured Steaming Sink

2020-07-01 Thread Andrew Melo
On Wed, Jul 1, 2020 at 8:13 PM Burak Yavuz  wrote:
>
> I'm not sure having a built-in sink that allows you to DDOS servers is the 
> best idea either. foreachWriter is typically used for such use cases, not 
> foreachBatch. It's also pretty hard to guarantee exactly-once, rate limiting, 
> etc.

If you control the machines and can run arbitrary code, you can DDOS
whatever you want. What's the difference between this proposal and
writing a UDF that opens 1,000 connections to a target machine?

> Best,
> Burak
>
> On Wed, Jul 1, 2020 at 5:54 PM Holden Karau  wrote:
>>
>> I think adding something like this (if it doesn't already exist) could help 
>> make structured streaming easier to use, foreachBatch is not the best API.
>>
>> On Wed, Jul 1, 2020 at 2:21 PM Jungtaek Lim  
>> wrote:
>>>
>>> I guess the method, query parameter, header, and the payload would be all 
>>> different for almost every use case - that makes it hard to generalize and 
>>> requires implementation to be pretty much complicated to be flexible enough.
>>>
>>> I'm not aware of any custom sink implementing REST so your best bet would 
>>> be simply implementing your own with foreachBatch, but so someone might 
>>> jump in and provide a pointer if there is something in the Spark ecosystem.
>>>
>>> Thanks,
>>> Jungtaek Lim (HeartSaVioR)
>>>
>>> On Thu, Jul 2, 2020 at 3:21 AM Sam Elamin  wrote:

 Hi All,


 We ingest alot of restful APIs into our lake and I'm wondering if it is at 
 all possible to created a rest sink in structured streaming?

 For now I'm only focusing on restful services that have an incremental ID 
 so my sink can just poll for new data then ingest.

 I can't seem to find a connector that does this and my gut instinct tells 
 me it's probably because it isn't possible due to something completely 
 obvious that I am missing

 I know some RESTful API obfuscate the IDs to a hash of strings and that 
 could be a problem but since I'm planning on focusing on just numerical 
 IDs that just get incremented I think I won't be facing that issue


 Can anyone let me know if this sounds like a daft idea? Will I need 
 something like Kafka or kinesis as a buffer and redundancy or am I 
 overthinking this?


 I would love to bounce ideas with people who runs structured streaming 
 jobs in production


 Kind regards
 San


>>
>>
>> --
>> Twitter: https://twitter.com/holdenkarau
>> Books (Learning Spark, High Performance Spark, etc.): https://amzn.to/2MaRAG9
>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Can I collect Dataset[Row] to driver without converting it to Array [Row]?

2020-04-22 Thread Andrew Melo
Hi Maqy

On Wed, Apr 22, 2020 at 3:24 AM maqy <454618...@qq.com> wrote:
>
> I will traverse this Dataset to convert it to Arrow and send it to Tensorflow 
> through Socket.

(I presume you're using the python tensorflow API, if you're not, please ignore)

There is a JIRA/PR ([1] [2]) which proposes to add native support for
Arrow serialization to python,

Under the hood, Spark is already serializing into Arrow format to
transmit to python, it's just additionally doing an unconditional
conversion to pandas once it reaches the python runner -- which is
good if you're using pandas, not so great if pandas isn't what you
operate on. The JIRA above would let you receive the arrow buffers
(that already exist) directly.

Cheers,
Andrew
[1] https://issues.apache.org/jira/browse/SPARK-30153
[2] https://github.com/apache/spark/pull/26783

>
> I tried to use toLocalIterator() to traverse the dataset instead of collect  
> to the driver, but toLocalIterator() will create a lot of jobs and will bring 
> a lot of time consumption.
>
>
>
> Best regards,
>
> maqy
>
>
>
> 发件人: Michael Artz
> 发送时间: 2020年4月22日 16:09
> 收件人: maqy
> 抄送: user@spark.apache.org
> 主题: Re: Can I collect Dataset[Row] to driver without converting it to Array 
> [Row]?
>
>
>
> What would you do with it once you get it into driver in a Dataset[Row]?
>
> Sent from my iPhone
>
>
>
> On Apr 22, 2020, at 3:06 AM, maqy <454618...@qq.com> wrote:
>
> 
>
> When the data is stored in the Dataset [Row] format, the memory usage is very 
> small.
>
> When I use collect () to collect data to the driver, each line of the dataset 
> will be converted to Row and stored in an array, which will bring great 
> memory overhead.
>
> So, can I collect Dataset[Row] to driver and keep its data format?
>
>
>
> Best regards,
>
> maqy
>
>
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Scala version compatibility

2020-04-06 Thread Andrew Melo
Hello,

On Mon, Apr 6, 2020 at 3:31 PM Koert Kuipers  wrote:

> actually i might be wrong about this. did you declare scala to be a
> provided dependency? so scala is not in your fat/uber jar? if so then maybe
> it will work.
>

I declare spark to be a provided dependency, so Scala's not included in my
artifact except for this single callsite.

Thanks
Andrew


> On Mon, Apr 6, 2020 at 4:16 PM Andrew Melo  wrote:
>
>>
>>
>> On Mon, Apr 6, 2020 at 3:08 PM Koert Kuipers  wrote:
>>
>>> yes it will
>>>
>>>
>> Ooof, I was hoping that wasn't the case. I guess I need to figure out how
>> to get Maven to compile/publish jars with different
>> dependencies/artifactIDs like how sbt does? (or re-implement the
>> functionality in java)
>>
>> Thanks for your help,
>> Andrew
>>
>>
>>> On Mon, Apr 6, 2020 at 3:50 PM Andrew Melo 
>>> wrote:
>>>
>>>> Hello all,
>>>>
>>>> I'm aware that Scala is not binary compatible between revisions. I have
>>>> some Java code whose only Scala dependency is the transitive dependency
>>>> through Spark. This code calls a Spark API which returns a Seq, which
>>>> I then convert into a List with
>>>> JavaConverters.seqAsJavaListConverter. Will this usage cause binary
>>>> incompatibility if the jar is compiled in one Scala version and executed in
>>>> another?
>>>>
>>>> I tried grokking
>>>> https://docs.scala-lang.org/overviews/core/binary-compatibility-of-scala-releases.html,
>>>> and wasn't quite able to make heads or tails of this particular case.
>>>>
>>>> Thanks!
>>>> Andrew
>>>>
>>>>
>>>>


Re: Scala version compatibility

2020-04-06 Thread Andrew Melo
On Mon, Apr 6, 2020 at 3:08 PM Koert Kuipers  wrote:

> yes it will
>
>
Ooof, I was hoping that wasn't the case. I guess I need to figure out how
to get Maven to compile/publish jars with different
dependencies/artifactIDs like how sbt does? (or re-implement the
functionality in java)

Thanks for your help,
Andrew


> On Mon, Apr 6, 2020 at 3:50 PM Andrew Melo  wrote:
>
>> Hello all,
>>
>> I'm aware that Scala is not binary compatible between revisions. I have
>> some Java code whose only Scala dependency is the transitive dependency
>> through Spark. This code calls a Spark API which returns a Seq, which
>> I then convert into a List with
>> JavaConverters.seqAsJavaListConverter. Will this usage cause binary
>> incompatibility if the jar is compiled in one Scala version and executed in
>> another?
>>
>> I tried grokking
>> https://docs.scala-lang.org/overviews/core/binary-compatibility-of-scala-releases.html,
>> and wasn't quite able to make heads or tails of this particular case.
>>
>> Thanks!
>> Andrew
>>
>>
>>


Scala version compatibility

2020-04-06 Thread Andrew Melo
Hello all,

I'm aware that Scala is not binary compatible between revisions. I have
some Java code whose only Scala dependency is the transitive dependency
through Spark. This code calls a Spark API which returns a Seq, which
I then convert into a List with
JavaConverters.seqAsJavaListConverter. Will this usage cause binary
incompatibility if the jar is compiled in one Scala version and executed in
another?

I tried grokking
https://docs.scala-lang.org/overviews/core/binary-compatibility-of-scala-releases.html,
and wasn't quite able to make heads or tails of this particular case.

Thanks!
Andrew


Optimizing LIMIT in DSv2

2020-03-30 Thread Andrew Melo
Hello,

Executing "SELECT Muon_Pt FROM rootDF LIMIT 10", where "rootDF" is a temp
view backed by a DSv2 reader yields the attached plan [1]. It appears that
the initial stage is run over every partition in rootDF, even though each
partition has 200k rows (modulo the last partition which holds the
remainder of rows in a file).

Is there some sort of hinting that can done from the datasource side to
better inform the optimizer or, alternately, am I missing an interface in
the PushDown filters that would let me elide transferring/decompressing
unnecessary partitions?

Thanks!
Andrew

[1]

== Parsed Logical Plan ==
'GlobalLimit 10
+- 'LocalLimit 10
   +- 'Project ['Muon_pt]
  +- 'UnresolvedRelation `rootDF`

== Analyzed Logical Plan ==
Muon_pt: array
GlobalLimit 10
+- LocalLimit 10
   +- Project [Muon_pt#119]
  +- SubqueryAlias `rootdf`
 +- RelationV2 root[run#0L, luminosityBlock#1L, event#2L,
CaloMET_phi#3, CaloMET_pt#4, CaloMET_sumEt#5, nElectron#6,
Electron_deltaEtaSC#7, Electron_dr03EcalRecHitSumEt#8,
Electron_dr03HcalDepth1TowerSumEt#9, Electron_dr03TkSumPt#10,
Electron_dxy#11, Electron_dxyErr#12, Electron_dz#13, Electron_dzErr#14,
Electron_eCorr#15, Electron_eInvMinusPInv#16, Electron_energyErr#17,
Electron_eta#18, Electron_hoe#19, Electron_ip3d#20, Electron_mass#21,
Electron_miniPFRelIso_all#22, Electron_miniPFRelIso_chg#23, ... 787 more
fields] (Options: [tree=Events,paths=["hdfs://cmshdfs/store/data"]])

== Optimized Logical Plan ==
GlobalLimit 10
+- LocalLimit 10
   +- Project [Muon_pt#119]
  +- RelationV2 root[run#0L, luminosityBlock#1L, event#2L,
CaloMET_phi#3, CaloMET_pt#4, CaloMET_sumEt#5, nElectron#6,
Electron_deltaEtaSC#7, Electron_dr03EcalRecHitSumEt#8,
Electron_dr03HcalDepth1TowerSumEt#9, Electron_dr03TkSumPt#10,
Electron_dxy#11, Electron_dxyErr#12, Electron_dz#13, Electron_dzErr#14,
Electron_eCorr#15, Electron_eInvMinusPInv#16, Electron_energyErr#17,
Electron_eta#18, Electron_hoe#19, Electron_ip3d#20, Electron_mass#21,
Electron_miniPFRelIso_all#22, Electron_miniPFRelIso_chg#23, ... 787 more
fields] (Options: [tree=Events,paths=["hdfs://cmshdfs/store/data"]])

== Physical Plan ==
CollectLimit 10
+- *(1) Project [Muon_pt#119]
   +- *(1) ScanV2 root[Muon_pt#119] (Options:
[tree=Events,paths=["hdfs://cmshdfs/store/data"]])


Supporting Kryo registration in DSv2

2020-03-26 Thread Andrew Melo
Hello all,

Is there a way to register classes within a datasourcev2 implementation in
the Kryo serializer?

I've attempted the following in both the constructor and static block of my
toplevel class:

SparkContext context = SparkContext.getOrCreate();
SparkConf conf = context.getConf();
Class[] classesRegistered = new Class[] {
edu.vanderbilt.accre.laurelin.spark_ttree.Reader.class,
edu.vanderbilt.accre.laurelin.spark_ttree.Partition.class,
edu.vanderbilt.accre.laurelin.spark_ttree.SlimTBranch.class
};
conf.registerKryoClasses(classesRegistered);

But (if I'm reading correctly), this is too late, since the config has
already been parsed while initializing the SparkContext, adding classes to
the SparkConf has no effect. From what I can tell, the kryo instance behind
is private, so I can't add the registration manually either.

Any thoughts?
Thanks
Andrew


Re: Reading 7z file in spark

2020-01-14 Thread Andrew Melo
It only makes sense if the underlying file is also splittable, and even
then, it doesn't really do anything for you if you don't explicitly tell
spark about the split boundaries

On Tue, Jan 14, 2020 at 7:36 PM Someshwar Kale  wrote:

> I would suggest to use other compression technique which is splittable for
> eg. Bzip2, lzo, lz4.
>
> On Wed, Jan 15, 2020, 1:32 AM Enrico Minack 
> wrote:
>
>> Hi,
>>
>> Spark does not support 7z natively, but you can read any file in Spark:
>>
>> def read(stream: PortableDataStream): Iterator[String] = { 
>> Seq(stream.getPath()).iterator }
>>
>> spark.sparkContext
>>   .binaryFiles("*.7z")
>>   .flatMap(file => read(file._2))
>>   .toDF("path")
>>   .show(false)
>>
>> This scales with the number of files. A single large 7z file would not
>> scale well (a single partition).
>>
>> Any file that matches *.7z will be loaded via the read(stream:
>> PortableDataStream) method, which returns an iterator over the rows.
>> This method is executed on the executor and can implement the 7z specific
>> code, which is independent of Spark and should not be too hard (here it
>> does not open the input stream but returns the path only).
>>
>> If you are planning to read the same files more than once, then it would
>> be worth to first uncompress and convert them into files Spark supports.
>> Then Spark can scale much better.
>>
>> Regards,
>> Enrico
>>
>>
>> Am 13.01.20 um 13:31 schrieb HARSH TAKKAR:
>>
>> Hi,
>>
>>
>> Is it possible to read 7z compressed file in spark?
>>
>>
>> Kind Regards
>> Harsh Takkar
>>
>>
>>


Re: Driver vs master

2019-10-07 Thread Andrew Melo
On Mon, Oct 7, 2019 at 20:49 ayan guha  wrote:

> HI
>
> I think you are mixing terminologies here. Loosely speaking, Master
> manages worker machines. Each worker machine can run one or more processes.
> A process can be a driver or executor. You submit applications to the
> master. Each application will have driver and executors. Master will decide
> where to put each of them. In cluster mode, master will distribute the
> drivers across the cluster. In client mode, master will try to run the
> driver processes within master's own process. You can launch multiple
> master processes as well and use them for a set of applications - this
> happens when you use YARN. I am not sure how Mesos or K8 works in that
> score though.
>

Right, that's why I initially had the caveat  "This depends on what
master/deploy mode you're using: if it's "local" master and "client mode"
then yes tasks execute in the same JVM as the driver".

The answer depends on the exact setup Amit has and how the application is
configured


> HTH...
>
> Ayan
>
>
>
> On Tue, Oct 8, 2019 at 12:11 PM Andrew Melo  wrote:
>
>>
>>
>> On Mon, Oct 7, 2019 at 19:20 Amit Sharma  wrote:
>>
>>> Thanks Andrew but I am asking specific to driver memory not about
>>> executors memory. We have just one master and if each jobs driver.memory=4g
>>> and master nodes total memory is 16gb then we can not execute more than 4
>>> jobs at a time.
>>
>>
>> I understand that. I think there's a misunderstanding with the
>> terminology, though. Are you running multiple separate spark instances on a
>> single machine or one instance with multiple jobs inside.
>>
>>
>>>
>>> On Monday, October 7, 2019, Andrew Melo  wrote:
>>>
>>>> Hi Amit
>>>>
>>>> On Mon, Oct 7, 2019 at 18:33 Amit Sharma  wrote:
>>>>
>>>>> Can you please help me understand this. I believe driver programs runs
>>>>> on master node
>>>>
>>>> If we are running 4 spark job and driver memory config is 4g then total
>>>>> 16 6b would be used of master node.
>>>>
>>>>
>>>> This depends on what master/deploy mode you're using: if it's "local"
>>>> master and "client mode" then yes tasks execute in the same JVM as the
>>>> driver. In this case though, the driver JVM uses whatever much space is
>>>> allocated for the driver regardless of how many threads you have.
>>>>
>>>>
>>>> So if we will run more jobs then we need more memory on master. Please
>>>>> correct me if I am wrong.
>>>>>
>>>>
>>>> This depends on your application, but in general more threads will
>>>> require more memory.
>>>>
>>>>
>>>>
>>>>>
>>>>> Thanks
>>>>> Amit
>>>>>
>>>> --
>>>> It's dark in this basement.
>>>>
>>> --
>> It's dark in this basement.
>>
>
>
> --
> Best Regards,
> Ayan Guha
>
-- 
It's dark in this basement.


Re: Driver vs master

2019-10-07 Thread Andrew Melo
Hi

On Mon, Oct 7, 2019 at 19:20 Amit Sharma  wrote:

> Thanks Andrew but I am asking specific to driver memory not about
> executors memory. We have just one master and if each jobs driver.memory=4g
> and master nodes total memory is 16gb then we can not execute more than 4
> jobs at a time.


I understand that. I think there's a misunderstanding with the terminology,
though. Are you running multiple separate spark instances on a single
machine or one instance with multiple jobs inside.


>
> On Monday, October 7, 2019, Andrew Melo  wrote:
>
>> Hi Amit
>>
>> On Mon, Oct 7, 2019 at 18:33 Amit Sharma  wrote:
>>
>>> Can you please help me understand this. I believe driver programs runs
>>> on master node
>>
>> If we are running 4 spark job and driver memory config is 4g then total
>>> 16 6b would be used of master node.
>>
>>
>> This depends on what master/deploy mode you're using: if it's "local"
>> master and "client mode" then yes tasks execute in the same JVM as the
>> driver. In this case though, the driver JVM uses whatever much space is
>> allocated for the driver regardless of how many threads you have.
>>
>>
>> So if we will run more jobs then we need more memory on master. Please
>>> correct me if I am wrong.
>>>
>>
>> This depends on your application, but in general more threads will
>> require more memory.
>>
>>
>>
>>>
>>> Thanks
>>> Amit
>>>
>> --
>> It's dark in this basement.
>>
> --
It's dark in this basement.


Re: Driver vs master

2019-10-07 Thread Andrew Melo
Hi Amit

On Mon, Oct 7, 2019 at 18:33 Amit Sharma  wrote:

> Can you please help me understand this. I believe driver programs runs on
> master node

If we are running 4 spark job and driver memory config is 4g then total 16
> 6b would be used of master node.


This depends on what master/deploy mode you're using: if it's "local"
master and "client mode" then yes tasks execute in the same JVM as the
driver. In this case though, the driver JVM uses whatever much space is
allocated for the driver regardless of how many threads you have.


So if we will run more jobs then we need more memory on master. Please
> correct me if I am wrong.
>

This depends on your application, but in general more threads will require
more memory.



>
> Thanks
> Amit
>
-- 
It's dark in this basement.


Re: Anaconda installation with Pyspark/Pyarrow (2.3.0+) on cloudera managed server

2019-05-06 Thread Andrew Melo
Hi,

On Mon, May 6, 2019 at 11:59 AM Gourav Sengupta
 wrote:
>
> Hence, what I mentioned initially does sound correct ?

I don't agree at all - we've had a significant boost from moving to
regular UDFs to pandas UDFs. YMMV, of course.

>
> On Mon, May 6, 2019 at 5:43 PM Andrew Melo  wrote:
>>
>> Hi,
>>
>> On Mon, May 6, 2019 at 11:41 AM Patrick McCarthy
>>  wrote:
>> >
>> > Thanks Gourav.
>> >
>> > Incidentally, since the regular UDF is row-wise, we could optimize that a 
>> > bit by taking the convert() closure and simply making that the UDF.
>> >
>> > Since there's that MGRS object that we have to create too, we could 
>> > probably optimize it further by applying the UDF via rdd.mapPartitions, 
>> > which would allow the UDF to instantiate objects once per-partition 
>> > instead of per-row and then iterate element-wise through the rows of the 
>> > partition.
>> >
>> > All that said, having done the above on prior projects I find the pandas 
>> > abstractions to be very elegant and friendly to the end-user so I haven't 
>> > looked back :)
>> >
>> > (The common memory model via Arrow is a nice boost too!)
>>
>> And some tentative SPIPs that want to use columnar representations
>> internally in Spark should also add some good performance in the
>> future.
>>
>> Cheers
>> Andrew
>>
>> >
>> > On Mon, May 6, 2019 at 11:13 AM Gourav Sengupta 
>> >  wrote:
>> >>
>> >> The proof is in the pudding
>> >>
>> >> :)
>> >>
>> >>
>> >>
>> >> On Mon, May 6, 2019 at 2:46 PM Gourav Sengupta 
>> >>  wrote:
>> >>>
>> >>> Hi Patrick,
>> >>>
>> >>> super duper, thanks a ton for sharing the code. Can you please confirm 
>> >>> that this runs faster than the regular UDF's?
>> >>>
>> >>> Interestingly I am also running same transformations using another geo 
>> >>> spatial library in Python, where I am passing two fields and getting 
>> >>> back an array.
>> >>>
>> >>>
>> >>> Regards,
>> >>> Gourav Sengupta
>> >>>
>> >>> On Mon, May 6, 2019 at 2:00 PM Patrick McCarthy 
>> >>>  wrote:
>> >>>>
>> >>>> Human time is considerably more expensive than computer time, so in 
>> >>>> that regard, yes :)
>> >>>>
>> >>>> This took me one minute to write and ran fast enough for my needs. If 
>> >>>> you're willing to provide a comparable scala implementation I'd be 
>> >>>> happy to compare them.
>> >>>>
>> >>>> @F.pandas_udf(T.StringType(), F.PandasUDFType.SCALAR)
>> >>>>
>> >>>> def generate_mgrs_series(lat_lon_str, level):
>> >>>>
>> >>>>
>> >>>> import mgrs
>> >>>>
>> >>>> m = mgrs.MGRS()
>> >>>>
>> >>>>
>> >>>> precision_level = 0
>> >>>>
>> >>>> levelval = level[0]
>> >>>>
>> >>>>
>> >>>> if levelval == 1000:
>> >>>>
>> >>>>precision_level = 2
>> >>>>
>> >>>> if levelval == 100:
>> >>>>
>> >>>>precision_level = 3
>> >>>>
>> >>>>
>> >>>> def convert(ll_str):
>> >>>>
>> >>>>   lat, lon = ll_str.split('_')
>> >>>>
>> >>>>
>> >>>>   return m.toMGRS(lat, lon,
>> >>>>
>> >>>>   MGRSPrecision = precision_level)
>> >>>>
>> >>>>
>> >>>> return lat_lon_str.apply(lambda x: convert(x))
>> >>>>
>> >>>> On Mon, May 6, 2019 at 8:23 AM Gourav Sengupta 
>> >>>>  wrote:
>> >>>>>
>> >>>>> And you found the PANDAS UDF more performant ? Can you share your code 
>> >>>>> and prove it?
>> >>>>>
>> >>>>> On Sun, May 5, 2019 at 9:24 PM Patrick McCarthy 
>> >>>>>  wrote:
>> >>>>>>
>> >>>>>> I

Re: Anaconda installation with Pyspark/Pyarrow (2.3.0+) on cloudera managed server

2019-05-06 Thread Andrew Melo
Hi,

On Mon, May 6, 2019 at 11:41 AM Patrick McCarthy
 wrote:
>
> Thanks Gourav.
>
> Incidentally, since the regular UDF is row-wise, we could optimize that a bit 
> by taking the convert() closure and simply making that the UDF.
>
> Since there's that MGRS object that we have to create too, we could probably 
> optimize it further by applying the UDF via rdd.mapPartitions, which would 
> allow the UDF to instantiate objects once per-partition instead of per-row 
> and then iterate element-wise through the rows of the partition.
>
> All that said, having done the above on prior projects I find the pandas 
> abstractions to be very elegant and friendly to the end-user so I haven't 
> looked back :)
>
> (The common memory model via Arrow is a nice boost too!)

And some tentative SPIPs that want to use columnar representations
internally in Spark should also add some good performance in the
future.

Cheers
Andrew

>
> On Mon, May 6, 2019 at 11:13 AM Gourav Sengupta  
> wrote:
>>
>> The proof is in the pudding
>>
>> :)
>>
>>
>>
>> On Mon, May 6, 2019 at 2:46 PM Gourav Sengupta  
>> wrote:
>>>
>>> Hi Patrick,
>>>
>>> super duper, thanks a ton for sharing the code. Can you please confirm that 
>>> this runs faster than the regular UDF's?
>>>
>>> Interestingly I am also running same transformations using another geo 
>>> spatial library in Python, where I am passing two fields and getting back 
>>> an array.
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Mon, May 6, 2019 at 2:00 PM Patrick McCarthy  
>>> wrote:

 Human time is considerably more expensive than computer time, so in that 
 regard, yes :)

 This took me one minute to write and ran fast enough for my needs. If 
 you're willing to provide a comparable scala implementation I'd be happy 
 to compare them.

 @F.pandas_udf(T.StringType(), F.PandasUDFType.SCALAR)

 def generate_mgrs_series(lat_lon_str, level):


 import mgrs

 m = mgrs.MGRS()


 precision_level = 0

 levelval = level[0]


 if levelval == 1000:

precision_level = 2

 if levelval == 100:

precision_level = 3


 def convert(ll_str):

   lat, lon = ll_str.split('_')


   return m.toMGRS(lat, lon,

   MGRSPrecision = precision_level)


 return lat_lon_str.apply(lambda x: convert(x))

 On Mon, May 6, 2019 at 8:23 AM Gourav Sengupta  
 wrote:
>
> And you found the PANDAS UDF more performant ? Can you share your code 
> and prove it?
>
> On Sun, May 5, 2019 at 9:24 PM Patrick McCarthy  
> wrote:
>>
>> I disagree that it's hype. Perhaps not 1:1 with pure scala 
>> performance-wise, but for python-based data scientists or others with a 
>> lot of python expertise it allows one to do things that would otherwise 
>> be infeasible at scale.
>>
>> For instance, I recently had to convert latitude / longitude pairs to 
>> MGRS strings 
>> (https://en.wikipedia.org/wiki/Military_Grid_Reference_System). Writing 
>> a pandas UDF (and putting the mgrs python package into a conda 
>> environment) was _significantly_ easier than any alternative I found.
>>
>> @Rishi - depending on your network is constructed, some lag could come 
>> from just uploading the conda environment. If you load it from hdfs with 
>> --archives does it improve?
>>
>> On Sun, May 5, 2019 at 2:15 PM Gourav Sengupta 
>>  wrote:
>>>
>>> hi,
>>>
>>> Pandas UDF is a bit of hype. One of their blogs shows the used case of 
>>> adding 1 to a field using Pandas UDF which is pretty much pointless. So 
>>> you go beyond the blog and realise that your actual used case is more 
>>> than adding one :) and the reality hits you
>>>
>>> Pandas UDF in certain scenarios is actually slow, try using apply for a 
>>> custom or pandas function. In fact in certain scenarios I have found 
>>> general UDF's work much faster and use much less memory. Therefore test 
>>> out your used case (with at least 30 million records) before trying to 
>>> use the Pandas UDF option.
>>>
>>> And when you start using GroupMap then you realise after reading 
>>> https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#pandas-udfs-aka-vectorized-udfs
>>>  that "Oh!! now I can run into random OOM errors and the maxrecords 
>>> options does not help at all"
>>>
>>> Excerpt from the above link:
>>> Note that all data for a group will be loaded into memory before the 
>>> function is applied. This can lead to out of memory exceptions, 
>>> especially if the group sizes are skewed. The configuration for 
>>> maxRecordsPerBatch is not applied on groups and it is up to the user to 
>>> ensure that the grouped data will 

Re: can't download 2.4.1 sourcecode

2019-04-22 Thread Andrew Melo
On Mon, Apr 22, 2019 at 10:54 PM yutaochina  wrote:
>
> 
> 
>
>
> when i want download the sourcecode find it dosenot work
>

In the interim -- https://github.com/apache/spark/archive/v2.4.1.tar.gz

>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Connecting to Spark cluster remotely

2019-04-22 Thread Andrew Melo
Hi Rishkesh

On Mon, Apr 22, 2019 at 4:26 PM Rishikesh Gawade
 wrote:
>
> To put it simply, what are the configurations that need to be done on the 
> client machine so that it can run driver on itself and executors on 
> spark-yarn cluster nodes?

TBH, if it were me, I would simply SSH to the cluster and start the
spark-shell there. I don't think there's any special spark
configuration you need, but depending on what address space your
cluster is using/where you're connecting from, it might be really hard
to get all the networking components lined up.

>
> On Mon, Apr 22, 2019, 8:22 PM Rishikesh Gawade  
> wrote:
>>
>> Hi.
>> I have been experiencing trouble while trying to connect to a Spark cluster 
>> remotely. This Spark cluster is configured to run using YARN.
>> Can anyone guide me or provide any step-by-step instructions for connecting 
>> remotely via spark-shell?
>> Here's the setup that I am using:
>> The Spark cluster is running with each node as a docker container hosted on 
>> a VM. It is using YARN for scheduling resources for computations.
>> I have a dedicated docker container acting as a spark client, on which i 
>> have the spark-shell installed(spark binary in standalone setup) and also 
>> the Hadoop and Yarn config directories set so that spark-shell can 
>> coordinate with the RM for resources.
>> With all of this set, i tried using the following command:
>>
>> spark-shell --master yarn --deploy-mode client
>>
>> This results in the spark-shell giving me a scala-based console, however, 
>> when I check the Resource Manager UI on the cluster, there seems to be no 
>> application/spark session running.
>> I have been expecting the driver to be running on the client machine and the 
>> executors running in the cluster. But that doesn't seem to happen.
>>
>> How can I achieve this?
>> Is whatever I am trying feasible, and if so, a good practice?
>>
>> Thanks & Regards,
>> Rishikesh

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Where does the Driver run?

2019-03-25 Thread Andrew Melo
Hi Pat,

Indeed, I don't think that it's possible to use cluster mode w/o
spark-submit. All the docs I see appear to always describe needing to use
spark-submit for cluster mode -- it's not even compatible with spark-shell.
But it makes sense to me -- if you want Spark to run your application's
driver, you need to package it up and send it to the cluster manager. You
can't start spark one place and then later migrate it to the cluster. It's
also why you can't use spark-shell in cluster mode either, I think.

Cheers
Andrew

On Mon, Mar 25, 2019 at 11:22 AM Pat Ferrel  wrote:

> In the GUI while the job is running the app-id link brings up logs to both
> executors, The “name” link goes to 4040 of the machine that launched the
> job but is not resolvable right now so the page is not shown. I’ll try the
> netstat but the use of port 4040 was a good clue.
>
> By what you say below this indicates the Driver is running on the
> launching machine, the client to the Spark Cluster. This should be the case
> in deployMode = client.
>
> Can someone explain what us going on? The Evidence seems to say that
> deployMode = cluster *does not work *as described unless you use
> spark-submit (and I’m only guessing at that).
>
> Further; if we don’t use spark-submit we can’t use deployMode = cluster ???
>
>
> From: Akhil Das  
> Reply: Akhil Das  
> Date: March 24, 2019 at 7:45:07 PM
> To: Pat Ferrel  
> Cc: user  
> Subject:  Re: Where does the Driver run?
>
> There's also a driver ui (usually available on port 4040), after running
> your code, I assume you are running it on your machine, visit
> localhost:4040 and you will get the driver UI.
>
> If you think the driver is running on your master/executor nodes, login to
> those machines and do a
>
>netstat -napt | grep -I listen
>
> You will see the driver listening on 404x there, this won't be the case
> mostly as you are not doing Spark-submit or using the deployMode=cluster.
>
> On Mon, 25 Mar 2019, 01:03 Pat Ferrel,  wrote:
>
>> Thanks, I have seen this many times in my research. Paraphrasing docs:
>> “in deployMode ‘cluster' the Driver runs on a Worker in the cluster”
>>
>> When I look at logs I see 2 executors on the 2 slaves (executor 0 and 1
>> with addresses that match slaves). When I look at memory usage while the
>> job runs I see virtually identical usage on the 2 Workers. This would
>> support your claim and contradict Spark docs for deployMode = cluster.
>>
>> The evidence seems to contradict the docs. I am now beginning to wonder
>> if the Driver only runs in the cluster if we use spark-submit
>>
>>
>>
>> From: Akhil Das  
>> Reply: Akhil Das  
>> Date: March 23, 2019 at 9:26:50 PM
>> To: Pat Ferrel  
>> Cc: user  
>> Subject:  Re: Where does the Driver run?
>>
>> If you are starting your "my-app" on your local machine, that's where the
>> driver is running.
>>
>> [image: image.png]
>>
>> Hope this helps.
>> 
>>
>> On Sun, Mar 24, 2019 at 4:13 AM Pat Ferrel  wrote:
>>
>>> I have researched this for a significant amount of time and find answers
>>> that seem to be for a slightly different question than mine.
>>>
>>> The Spark 2.3.3 cluster is running fine. I see the GUI on “
>>> http://master-address:8080;, there are 2 idle workers, as configured.
>>>
>>> I have a Scala application that creates a context and starts execution
>>> of a Job. I *do not use spark-submit*, I start the Job programmatically and
>>> this is where many explanations forks from my question.
>>>
>>> In "my-app" I create a new SparkConf, with the following code (slightly
>>> abbreviated):
>>>
>>>   conf.setAppName(“my-job")
>>>   conf.setMaster(“spark://master-address:7077”)
>>>   conf.set(“deployMode”, “cluster”)
>>>   // other settings like driver and executor memory requests
>>>   // the driver and executor memory requests are for all mem on the
>>> slaves, more than
>>>   // mem available on the launching machine with “my-app"
>>>   val jars = listJars(“/path/to/lib")
>>>   conf.setJars(jars)
>>>   …
>>>
>>> When I launch the job I see 2 executors running on the 2 workers/slaves.
>>> Everything seems to run fine and sometimes completes successfully. Frequent
>>> failures are the reason for this question.
>>>
>>> Where is the Driver running? I don’t see it in the GUI, I see 2
>>> Executors taking all cluster resources. With a Yarn cluster I would expect
>>> the “Driver" to run on/in the Yarn Master but I am using the Spark
>>> Standalone Master, where is the Drive part of the Job running?
>>>
>>> If is is running in the Master, we are in trouble because I start the
>>> Master on one of my 2 Workers sharing resources with one of the Executors.
>>> Executor mem + driver mem is > available mem on a Worker. I can change this
>>> but need so understand where the Driver part of the Spark Job runs. Is it
>>> in the Spark Master, or inside and Executor, or ???
>>>
>>> The “Driver” creates and 

Re: Where does the Driver run?

2019-03-24 Thread Andrew Melo
Hi Pat,

On Sun, Mar 24, 2019 at 1:03 PM Pat Ferrel  wrote:

> Thanks, I have seen this many times in my research. Paraphrasing docs: “in
> deployMode ‘cluster' the Driver runs on a Worker in the cluster”
>
> When I look at logs I see 2 executors on the 2 slaves (executor 0 and 1
> with addresses that match slaves). When I look at memory usage while the
> job runs I see virtually identical usage on the 2 Workers. This would
> support your claim and contradict Spark docs for deployMode = cluster.
>
> The evidence seems to contradict the docs. I am now beginning to wonder if
> the Driver only runs in the cluster if we use spark-submit
>

Where/how are you starting "./sbin/start-master.sh"?

Cheers
Andrew


>
>
>
> From: Akhil Das  
> Reply: Akhil Das  
> Date: March 23, 2019 at 9:26:50 PM
> To: Pat Ferrel  
> Cc: user  
> Subject:  Re: Where does the Driver run?
>
> If you are starting your "my-app" on your local machine, that's where the
> driver is running.
>
> [image: image.png]
>
> Hope this helps.
> 
>
> On Sun, Mar 24, 2019 at 4:13 AM Pat Ferrel  wrote:
>
>> I have researched this for a significant amount of time and find answers
>> that seem to be for a slightly different question than mine.
>>
>> The Spark 2.3.3 cluster is running fine. I see the GUI on “
>> http://master-address:8080;, there are 2 idle workers, as configured.
>>
>> I have a Scala application that creates a context and starts execution of
>> a Job. I *do not use spark-submit*, I start the Job programmatically and
>> this is where many explanations forks from my question.
>>
>> In "my-app" I create a new SparkConf, with the following code (slightly
>> abbreviated):
>>
>>   conf.setAppName(“my-job")
>>   conf.setMaster(“spark://master-address:7077”)
>>   conf.set(“deployMode”, “cluster”)
>>   // other settings like driver and executor memory requests
>>   // the driver and executor memory requests are for all mem on the
>> slaves, more than
>>   // mem available on the launching machine with “my-app"
>>   val jars = listJars(“/path/to/lib")
>>   conf.setJars(jars)
>>   …
>>
>> When I launch the job I see 2 executors running on the 2 workers/slaves.
>> Everything seems to run fine and sometimes completes successfully. Frequent
>> failures are the reason for this question.
>>
>> Where is the Driver running? I don’t see it in the GUI, I see 2 Executors
>> taking all cluster resources. With a Yarn cluster I would expect the
>> “Driver" to run on/in the Yarn Master but I am using the Spark Standalone
>> Master, where is the Drive part of the Job running?
>>
>> If is is running in the Master, we are in trouble because I start the
>> Master on one of my 2 Workers sharing resources with one of the Executors.
>> Executor mem + driver mem is > available mem on a Worker. I can change this
>> but need so understand where the Driver part of the Spark Job runs. Is it
>> in the Spark Master, or inside and Executor, or ???
>>
>> The “Driver” creates and broadcasts some large data structures so the
>> need for an answer is more critical than with more typical tiny Drivers.
>>
>> Thanks for you help!
>>
>
>
> --
> Cheers!
>
>


Please stop asking to unsubscribe

2019-01-31 Thread Andrew Melo
The correct way to unsubscribe is to mail

user-unsubscr...@spark.apache.org

Just mailing the list with "unsubscribe" doesn't actually do anything...

Thanks
Andrew

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: What are the alternatives to nested DataFrames?

2018-12-28 Thread Andrew Melo
Could you join() the DFs on a common key?

On Fri, Dec 28, 2018 at 18:35  wrote:

> Shabad , I am not sure what you are trying to say. Could you please give
> me an example? The result of the Query is a Dataframe that is created after
> iterating, so I am not sure how could I map that to a column without
> iterating and getting the values.
>
>
>
> I have a Dataframe that contains a list of cities for which I would like
> to iterate over and search in Elasticsearch.  This list is stored in
> Dataframe because it contains hundreds of thousands of elements with
> multiple properties that would not fit in a single machine.
>
>
>
> The issue is that the elastic-spark connector returns a Dataframe as well
> which leads to a dataframe creation within a Dataframe
>
>
>
> The only solution I found is to store the list of cities in a a regular
> scala Seq and iterate over that, but as far as I know this would make Seq
> centralized instead of distributed (run at the executor only?)
>
>
>
> Full example :
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *val cities= Seq("New York","Michigan")cities.foreach(r => {  val qb =
> QueryBuilders.matchQuery("name", r).operator(Operator.AND)
> print(qb.toString)  val dfs = sqlContext.esDF("cities/docs", qb.toString)
> // Returns a dataframe for each city  dfs.show() // Works as expected. It
> prints the individual dataframe with the result of the query})*
>
>
>
>
>
> *val cities = Seq("New York","Michigan").toDF()*
>
>
>
> *cities.foreach(r => {*
>
>
>
> *  val city  = r.getString(0)*
>
>
>
> *  val qb = QueryBuilders.matchQuery("name",
> city).operator(Operator.AND)*
>
> *  print(qb.toString)*
>
>
>
> *  val dfs = sqlContext.esDF("cities/docs", qb.toString) // null
> pointer*
>
>
>
> *  dfs.show()*
>
>
>
> *})*
>
>
>
>
>
> *From:* Shahab Yunus 
> *Sent:* Friday, December 28, 2018 12:34 PM
> *To:* em...@yeikel.com
> *Cc:* user 
> *Subject:* Re: What are the alternatives to nested DataFrames?
>
>
>
> Can you have a dataframe with a column which stores json (type string)? Or
> you can also have a column of array type in which you store all cities
> matching your query.
>
>
>
>
>
>
>
> On Fri, Dec 28, 2018 at 2:48 AM  wrote:
>
> Hi community ,
>
>
>
> As shown in other answers online , Spark does not support the nesting of
> DataFrames , but what are the options?
>
>
>
> I have the following scenario :
>
>
>
> dataFrame1 = List of Cities
>
>
>
> dataFrame2 = Created after searching in ElasticSearch for each city in
> dataFrame1
>
>
>
> I've tried :
>
>
>
>  val cities= sc.parallelize(Seq("New York")).toDF()
>
>cities.foreach(r => {
>
> val companyName = r.getString(0)
>
> println(companyName)
>
> val dfs = sqlContext.esDF("cities/docs", "?q=" + companyName)
>  //returns a DataFrame consisting of all the cities matching the entry in
> cities
>
> })
>
>
>
> Which triggers the expected null pointer exception
>
>
>
> java.lang.NullPointerException
>
> at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:53)
>
> at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:51)
>
> at
> org.elasticsearch.spark.sql.package$SQLContextFunctions.esDF(package.scala:37)
>
> at Main$$anonfun$main$1.apply(Main.scala:43)
>
> at Main$$anonfun$main$1.apply(Main.scala:39)
>
> at scala.collection.Iterator$class.foreach(Iterator.scala:742)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)
>
> at
> org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)
>
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
>
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:109)
>
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>
> at java.lang.Thread.run(Thread.java:748)
>
> 2018-12-28 02:01:00 ERROR TaskSetManager:70 - Task 7 in stage 0.0 failed 1
> times; aborting job
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 7 in stage 0.0 failed 1 times, most recent
> failure: Lost task 7.0 in stage 0.0 (TID 7, localhost, executor driver):
> java.lang.NullPointerException
>
>
>
> What options do I have?
>
>
>
> Thank you.
>
> --
It's dark in this basement.


Questions about caching

2018-12-11 Thread Andrew Melo
Greetings, Spark Aficionados-

I'm working on a project to (ab-)use PySpark to do particle physics
analysis, which involves iterating with a lot of transformations (to
apply weights and select candidate events) and reductions (to produce
histograms of relevant physics objects). We have a basic version
working, but I'm looking to exploit some of Spark's caching behavior
to speed up the interactive computation portion of the analysis,
probably by writing a thin convenience wrapper. I have a couple
questions I've been unable to find definitive answers to, which would
help me design this wrapper an efficient way:

1) When cache()-ing a dataframe where only a subset of the columns are
used, is the entire dataframe placed into the cache, or only the used
columns. E.G. does "df2" end up caching only "a", or all three
columns?

df1 = sc.read.load('test.parquet') # Has columns a, b, c
df2 = df1.cache()
df2.select('a').collect()

2) Are caches reference-based, or is there some sort of de-duplication
based on the logical/physical plans. So, for instance, does spark take
advantage of the fact that these two dataframes should have the same
content:

df1 = sc.read.load('test.parquet').cache()
df2 = sc.read.load('test.parquet').cache()

...or are df1 and df2 totally independent WRT caching behavior?

2a) If the cache is reference-based, is it sufficient to hold a
weakref to the python object to keep the cache in-scope?

3) Finally, the spark.externalBlockStore.blockManager is intriguing in
our environment where we have multiple users concurrently analyzing
mostly the same input datasets. We have enough RAM in our clusters to
cache a high percentage of the very common datasets, but only if users
could somehow share their caches (which, conveniently, are the larger
datasets), We also have very large edge SSD cache servers we use to
cache trans-oceanic I/O we could throw at this as well.

It looks, however, like that API was removed in 2.0.0 and there wasn't
a replacement. There are products like Alluxio, but they aren't
transparent, requiring the user to manually cache their dataframes by
doing save/loads to external files using "alluxio://" URIs. Is there
no way around this behavior now?

Sorry for the long email, and thanks!
Andrew

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org