Re: Sanely updating parquet partitions.

2016-04-29 Thread Philip Weaver
;)).parquet("/path/to/table/a=*") I also checked that the symlinks were followed the way I wanted, by removing one of the symlinks after creating the DataFrame, and I was able to query the DataFrame without error. - Philip On Fri, Apr 29, 2016 at 9:56 AM, Philip Weaver <philip.we

Sanely updating parquet partitions.

2016-04-29 Thread Philip Weaver
Hello, I have a parquet dataset, partitioned by a column 'a'. I want to take advantage of Spark SQL's ability to filter to the partition when you filter on 'a'. I also want to periodically update individual partitions without disrupting any jobs that are querying the data. The obvious solution

Re: Support for time column type?

2016-04-04 Thread Philip Weaver
. if all the same operators are defined for it), but I'll give it a try. - Philip On Fri, Apr 1, 2016 at 1:33 PM, Michael Armbrust <mich...@databricks.com> wrote: > There is also CalendarIntervalType. Is that what you are looking for? > > On Fri, Apr 1, 2016 at 1:11 PM, Philip Wea

Support for time column type?

2016-04-01 Thread Philip Weaver
Hi, I don't see any mention of a time type in the documentation (there is DateType and TimestampType, but not TimeType), and have been unable to find any documentation about whether this will be supported in the future. Does anyone know if this is currently supported or will be supported in the

Re: Location preferences in pyspark?

2015-10-20 Thread Philip Weaver
Sat, Oct 17, 2015 at 8:42 AM, Philip Weaver <philip.wea...@gmail.com> > wrote: > >> I believe what I want is the exact functionality provided by >> SparkContext.makeRDD in Scala. For each element in the RDD, I want specify >> a list of preferred hosts for processing th

Location preferences in pyspark?

2015-10-16 Thread Philip Weaver
I believe what I want is the exact functionality provided by SparkContext.makeRDD in Scala. For each element in the RDD, I want specify a list of preferred hosts for processing that element. It looks like this method only exists in Scala, and as far as I can tell there is no similar functionality

Re: Limiting number of cores per job in multi-threaded driver.

2015-10-04 Thread Philip Weaver
y understanding was that you are sharing the machine across many jobs. > That was the context in which I was making that comment. > > -adrian > > Sent from my iPhone > > On 03 Oct 2015, at 07:03, Philip Weaver <philip.wea...@gmail.com> wrote: > > You can't really

Re: Limiting number of cores per job in multi-threaded driver.

2015-10-04 Thread Philip Weaver
nt from my iPhone > > On 3 Oct, 2015, at 12:02 am, Philip Weaver <philip.wea...@gmail.com> > wrote: > > You can't really say 8 cores is not much horsepower when you have no idea > what my use case is. That's silly. > > On Fri, Sep 18, 2015 at 10:33 PM, Adrian Tanase <

Re: Limiting number of cores per job in multi-threaded driver.

2015-10-04 Thread Philip Weaver
don't scale beyond about 8 cores. 2.) The next submitted job will have to wait for resources to become available. - Philip On Sun, Oct 4, 2015 at 2:33 PM, Philip Weaver <philip.wea...@gmail.com> wrote: > I believe I've described my use case clearly, and I'm being questioned > that it'

Re: Limiting number of cores per job in multi-threaded driver.

2015-10-02 Thread Philip Weaver
sks that the scheduler can then run across the many jobs you > might have - as opposed to fewer, longer tasks... > > Lastly, 8 cores is not that much horsepower :) > You may consider running with beefier machines or a larger cluster, to get > at least tens of cores. > > Hope this hel

Metadata in Parquet

2015-09-30 Thread Philip Weaver
Hi, I am using org.apache.spark.sql.types.Metadata to store extra information along with each of my fields. I'd also like to store Metadata for the entire DataFrame, not attached to any specific field. Is this supported? - Philip

Re: Remove duplicate keys by always choosing first in file.

2015-09-24 Thread Philip Weaver
Oops, I didn't catch the suggestion to just use RDD.zipWithIndex, which I forgot existed (and I've discoverd I actually used in another project!). I will use that instead of the mapPartitionsWithIndex/zipWithIndex solution that I posted originally. On Tue, Sep 22, 2015 at 9:07 AM, Philip Weaver

Re: Remove duplicate keys by always choosing first in file.

2015-09-22 Thread Philip Weaver
s exactly the info you need, as the index is the original line > number > > in the file, not the index in the partition. > > > > Sent from my iPhone > > > > On 22 Sep 2015, at 17:50, Philip Weaver <philip.wea...@gmail.com> wrote: > > > > Thanks. If

Re: Remove duplicate keys by always choosing first in file.

2015-09-22 Thread Philip Weaver
: > sc.textFile("README.md", 4) > > You can then just do .groupBy(…).mapValues(_.sortBy(…).head) - I’m > skimming through some tuples, hopefully this is clear enough. > > -adrian > > From: Philip Weaver > Date: Tuesday, September 22, 2015 at 3:26 AM > To: user &g

Re: Remove duplicate keys by always choosing first in file.

2015-09-21 Thread Philip Weaver
tering values serially. > You would take the first and ignore the rest. Note that "first" > depends on your RDD having an ordering to begin with, or else you rely > on however it happens to be ordered after whatever operations give you > a key-value RDD. > > On Tue,

Re: Remove duplicate keys by always choosing first in file.

2015-09-21 Thread Philip Weaver
with any row should yield > Some(row). After that, combining is a no-op for other rows. > > On Tue, Sep 22, 2015 at 4:27 AM, Philip Weaver <philip.wea...@gmail.com> > wrote: > > Hmm, I don't think that's what I want. There's no "zero value" in my use > > case. &

Remove duplicate keys by always choosing first in file.

2015-09-21 Thread Philip Weaver
I am processing a single file and want to remove duplicate rows by some key by always choosing the first row in the file for that key. The best solution I could come up with is to zip each row with the partition index and local index, like this: rdd.mapPartitionsWithIndex { case (partitionIndex,

Re: Limiting number of cores per job in multi-threaded driver.

2015-09-18 Thread Philip Weaver
the purpose. Does that make sense? Thanks in advance for any advice you can provide! - Philip On Sat, Sep 12, 2015 at 10:40 PM, Philip Weaver <philip.wea...@gmail.com> wrote: > I'm playing around with dynamic allocation in spark-1.5.0, with the FAIR > scheduler, so I can define a

Re: Limiting number of cores per job in multi-threaded driver.

2015-09-18 Thread Philip Weaver
(whoops, redundant sentence in that first paragraph) On Fri, Sep 18, 2015 at 8:36 AM, Philip Weaver <philip.wea...@gmail.com> wrote: > Here's a specific example of what I want to do. My Spark application is > running with total-executor-cores=8. A request comes in, it spawns a thread

Trouble using dynamic allocation and shuffle service.

2015-09-14 Thread Philip Weaver
Hello, I am trying to use dynamic allocation which requires the shuffle service. I am running Spark on mesos. Whenever I set spark.shuffle.service.enabled=true, my Spark driver fails with an error like this: Caused by: java.net.ConnectException: Connection refused: devspark1/ 172.26.21.70:7337

Re: Trouble using dynamic allocation and shuffle service.

2015-09-14 Thread Philip Weaver
nd > it should work. > > Let me know if that's not clear. > > Tim > > On Mon, Sep 14, 2015 at 11:36 AM, Philip Weaver <philip.wea...@gmail.com> > wrote: > >> Hello, I am trying to use dynamic allocation which requires the shuffle >> service. I am running Spark o

Limiting number of cores per job in multi-threaded driver.

2015-09-12 Thread Philip Weaver
I'm playing around with dynamic allocation in spark-1.5.0, with the FAIR scheduler, so I can define a long-running application capable of executing multiple simultaneous spark jobs. The kind of jobs that I'm running do not benefit from more than 4 cores, but I want my application to be able to

Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-23 Thread Philip Weaver
[ .. about 1000 partition paths go here ] Why does spark have to scan all partitions when the query only concerns with 1 partitions? Doesn't it defeat the purpose of partitioning? Thanks! On Thu, Aug 20, 2015 at 4:12 PM, Philip Weaver philip.wea...@gmail.com wrote: I hadn't heard

Re: Spark Sql behaves strangely with tables with a lot of partitions

2015-08-19 Thread Philip Weaver
I've had the same problem. It turns out that Spark (specifically parquet) is very slow at partition discovery. It got better in 1.5 (not yet released), but was still unacceptably slow. Sadly, we ended up reading parquet files manually in Python (via C++) and had to abandon Spark SQL because of

Re: Driver staggering task launch times

2015-08-14 Thread Philip Weaver
Ah, nevermind, I don't know anything about scheduling tasks in YARN. On Thu, Aug 13, 2015 at 11:03 PM, Ara Vartanian arav...@cs.wisc.edu wrote: I’m running on Yarn. On Aug 13, 2015, at 10:58 PM, Philip Weaver philip.wea...@gmail.com wrote: Are you running on mesos, yarn or standalone

Re: Write to cassandra...each individual statement

2015-08-13 Thread Philip Weaver
All you'd need to do is *transform* the rdd before writing it, e.g. using the .map function. On Thu, Aug 13, 2015 at 11:30 AM, Priya Ch learnings.chitt...@gmail.com wrote: Hi All, I have a question in writing rdd to cassandra. Instead of writing entire rdd to cassandra, i want to write

Re: Write to cassandra...each individual statement

2015-08-13 Thread Philip Weaver
. Hence using rdd.foreach i am handling different logic for individual messages. Now bulk rdd.saveToCassandra will now work. Hope you got what i am trying to say.. On Fri, Aug 14, 2015 at 12:07 AM, Philip Weaver philip.wea...@gmail.com wrote: All you'd need to do is *transform* the rdd before

Re: Driver staggering task launch times

2015-08-13 Thread Philip Weaver
Are you running on mesos, yarn or standalone? If you're on mesos, are you using coarse grain or fine grained mode? On Thu, Aug 13, 2015 at 10:13 PM, Ara Vartanian arav...@cs.wisc.edu wrote: I’m observing an unusual situation where my step duration increases as I add further executors to my

Re: grouping by a partitioned key

2015-08-12 Thread Philip Weaver
. DataFrame.foreachPartition is the way. I haven't tried it, but, following looks like a not-so-sophisticated way of making spark sql partition aware. http://spark.apache.org/docs/latest/sql-programming-guide.html#partition-discovery On Wed, Aug 12, 2015 at 5:00 AM, Philip Weaver philip.wea

Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-11 Thread Philip Weaver
:23 AM, Philip Weaver philip.wea...@gmail.com wrote: Thanks, I also confirmed that the partition discovery is slow by writing a non-Spark application that uses the parquet library directly to load that partitions. It's so slow that my colleague's Python application can read the entire

grouping by a partitioned key

2015-08-11 Thread Philip Weaver
If I have an RDD that happens to already be partitioned by a key, how efficient can I expect a groupBy operation to be? I would expect that Spark shouldn't have to move data around between nodes, and simply will have a small amount of work just checking the partitions to discover that it doesn't

Re: grouping by a partitioned key

2015-08-11 Thread Philip Weaver
the case? What is it you try to achieve? There might be another way for it, when you might be 100% sure what’s happening. You can print debugString or explain (for DataFrame) to see what’s happening under the hood. On 12 Aug 2015, at 01:19, Philip Weaver philip.wea...@gmail.com wrote: If I

Re: Spark failed while trying to read parquet files

2015-08-07 Thread Philip Weaver
Yes, NullPointerExceptions are pretty common in Spark (or, rather, I seem to encounter them a lot!) but can occur for a few different reasons. Could you add some more detail, like what the schema is for the data, or the code you're using to read it? On Fri, Aug 7, 2015 at 3:20 PM, Jerrick Hoang

Re: How to distribute non-serializable object in transform task or broadcast ?

2015-08-07 Thread Philip Weaver
If the object cannot be serialized, then I don't think broadcast will make it magically serializable. You can't transfer data structures between nodes without serializing them somehow. On Fri, Aug 7, 2015 at 7:31 AM, Sujit Pal sujitatgt...@gmail.com wrote: Hi Hao, I think sc.broadcast will

Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-07 Thread Philip Weaver
faster? Cheng On 8/7/15 2:02 AM, Philip Weaver wrote: With DEBUG, the log output was over 10MB, so I opted for just INFO output. The (sanitized) log is attached. The driver is essentially this code: info(A) val t = System.currentTimeMillis val df = sqlContext.read.parquet(dir

Re: Unable to persist RDD to HDFS

2015-08-06 Thread Philip Weaver
This isn't really a Spark question. You're trying to parse a string to an integer, but it contains an invalid character. The exception message explains this. On Wed, Aug 5, 2015 at 11:34 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Code: import java.text.SimpleDateFormat import

Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-06 Thread Philip Weaver
Weaver wrote: I built spark from the v1.5.0-snapshot-20150803 tag in the repo and tried again. The initialization time is about 1 minute now, which is still pretty terrible. On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver philip.wea...@gmail.com wrote: Absolutely, thanks! On Wed, Aug 5, 2015

Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-06 Thread Philip Weaver
I built spark from the v1.5.0-snapshot-20150803 tag in the repo and tried again. The initialization time is about 1 minute now, which is still pretty terrible. On Wed, Aug 5, 2015 at 9:08 PM, Philip Weaver philip.wea...@gmail.com wrote: Absolutely, thanks! On Wed, Aug 5, 2015 at 9:07 PM

Re: spark hangs at broadcasting during a filter

2015-08-05 Thread Philip Weaver
How big is droprows? Try explicitly broadcasting it like this: val broadcastDropRows = sc.broadcast(dropRows) val valsrows = ... .filter(x = !broadcastDropRows.value.contains(x._1)) - Philip On Wed, Aug 5, 2015 at 11:54 AM, AlexG swift...@gmail.com wrote: I'm trying to load a 1 Tb file

Re: How to read gzip data in Spark - Simple question

2015-08-05 Thread Philip Weaver
) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:164) Any suggestions On Wed, Aug 5, 2015 at 8:18 PM, Philip Weaver philip.wea...@gmail.com wrote: The parallelize method does

Re: Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-05 Thread Philip Weaver
on. Cheng On 8/6/15 8:26 AM, Philip Weaver wrote: I have a parquet directory that was produced by partitioning by two keys, e.g. like this: df.write.partitionBy(a, b).parquet(asdf) There are 35 values of a, and about 1100-1200 values of b for each value of a, for a total of over 40,000

Re: How to read gzip data in Spark - Simple question

2015-08-05 Thread Philip Weaver
The parallelize method does not read the contents of a file. It simply takes a collection and distributes it to the cluster. In this case, the String is a collection 67 characters. Use sc.textFile instead of sc.parallelize, and it should work as you want. On Wed, Aug 5, 2015 at 8:12 PM, ÐΞ€ρ@Ҝ

Very high latency to initialize a DataFrame from partitioned parquet database.

2015-08-05 Thread Philip Weaver
I have a parquet directory that was produced by partitioning by two keys, e.g. like this: df.write.partitionBy(a, b).parquet(asdf) There are 35 values of a, and about 1100-1200 values of b for each value of a, for a total of over 40,000 partitions. Before running any transformations or actions

Re: Turn Off Compression for Textfiles

2015-08-04 Thread Philip Weaver
The .gz extension indicates that the file is compressed with gzip. Choose a different extension (e.g. .txt) when you save them. On Tue, Aug 4, 2015 at 7:00 PM, Brandon White bwwintheho...@gmail.com wrote: How do you turn off gz compression for saving as textfiles? Right now, I am reading ,gz

Safe to write to parquet at the same time?

2015-08-03 Thread Philip Weaver
I think this question applies regardless if I have two completely separate Spark jobs or tasks on different machines, or two cores that are part of the same task on the same machine. If two jobs/tasks/cores/stages both save to the same parquet directory in parallel like this:

Unable to compete with performance of single-threaded Scala application

2015-08-03 Thread Philip Weaver
Hello, I am running Spark 1.4.0 on Mesos 0.22.1, and usually I run my jobs in coarse-grained mode. I have written some single-threaded standalone Scala applications for a problem that I am working on, and I am unable to get a Spark solution that comes close to the performance of this