Re: Help taking last value in each group (aggregates)

2017-08-28 Thread Everett Anderson
ow_number().over(windowSpec).as("row_number")) .filter("row_number == 1") .select($"group_id", $"row_id".as("last_row_id"), $"total") Would love to know if there's a better way! On Mon, Aug 28, 2017 at 9:19 AM, Everett An

Help taking last value in each group (aggregates)

2017-08-28 Thread Everett Anderson
Hi, I'm struggling a little with some unintuitive behavior with the Scala API. (Spark 2.0.2) I wrote something like df.orderBy("a", "b") .groupBy("group_id") .agg(sum("col_to_sum").as("total"), last("row_id").as("last_row_id"))) and expected a result with a unique group_id column, a

Re: Spark, S3A, and 503 SlowDown / rate limit issues

2017-07-10 Thread Everett Anderson
le. This happened when we unioned many Spark DataFrames together without doing a repartition or coalesce afterwards. After throwing in a repartition (to additionally balance the output shards) we haven't seen the error, again, but our graphs of S3 HEAD requests are still rather alarmingly hig

Spark, S3A, and 503 SlowDown / rate limit issues

2017-06-29 Thread Everett Anderson
Hi, We're using Spark 2.0.2 + Hadoop 2.7.3 on AWS EMR with S3A for direct I/O from/to S3 from our Spark jobs. We set mapreduce.fileoutputcommitter.algorithm.version=2 and are using encrypted S3 buckets. This has been working fine for us, but perhaps as we've been running more jobs in parallel,

Re: Temp checkpoint directory for EMR (S3 or HDFS)

2017-05-30 Thread Everett Anderson
b.com/apache/spark/blob/branch-2.0/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L184> . So this is one option, though certainly abusing the staging directory. A more general one might be to find where Dataset.persist(DISK_ONLY) writes. On Fri, May 26, 2017 at 9:08 AM, Everett

Temp checkpoint directory for EMR (S3 or HDFS)

2017-05-26 Thread Everett Anderson
Hi, I need to set a checkpoint directory as I'm starting to use GraphFrames. (Also, occasionally my regular DataFrame lineages get too long so it'd be nice to use checkpointing to squash the lineage.) I don't actually need this checkpointed data to live beyond the life of the job, however. I'm

Re: Driver spins hours in query plan optimization

2017-05-02 Thread Everett Anderson
Seems like https://issues.apache.org/jira/browse/SPARK-13346 is likely the same issue. Seems like for some people persist() doesn't work and they have to convert to RDDs and back. On Fri, Apr 14, 2017 at 1:39 PM, Everett Anderson <ever...@nuna.com> wrote: > Hi, > >

Re: Calculate mode separately for multiple columns in row

2017-04-27 Thread Everett Anderson
frequencies.maxBy(_._2)._1 } } On Wed, Apr 26, 2017 at 10:21 AM, Everett Anderson <ever...@nuna.com> wrote: > Hi, > > One common situation I run across is that I want to compact my data and > select the mode (most frequent value) in several columns for each group. > > Even c

Calculate mode separately for multiple columns in row

2017-04-26 Thread Everett Anderson
Hi, One common situation I run across is that I want to compact my data and select the mode (most frequent value) in several columns for each group. Even calculating mode for one column in SQL is a bit tricky. The ways I've seen usually involve a nested sub-select with a group by + count and

Driver spins hours in query plan optimization

2017-04-14 Thread Everett Anderson
Hi, We keep hitting a situation on Spark 2.0.2 (haven't tested later versions, yet) where the driver spins forever seemingly in query plan optimization for moderate queries, such as the union of a few (~5) other DataFrames. We can see the driver spinning with one core in the

Re: Assigning a unique row ID

2017-04-10 Thread Everett Anderson
Indeed, I tried persist with MEMORY_AND_DISK and it works! (I'm wary of MEMORY_ONLY for this as it could potentially recompute shards if it couldn't entirely cache in memory.) Thanks for the help, everybody!! On Sat, Apr 8, 2017 at 11:54 AM, Everett Anderson <ever...@nuna.com>

Re: Assigning a unique row ID

2017-04-08 Thread Everett Anderson
ay, awesome. Let me give that a go. > > Thanks, > Subhash > > Sent from my iPhone > > On Apr 7, 2017, at 7:32 PM, Everett Anderson <ever...@nuna.com.INVALID> > wrote: > > Hi, > > Thanks, but that's using a random UUID. Certainly unlikely to have >

Re: Assigning a unique row ID

2017-04-07 Thread Everett Anderson
PM, Tim Smith <secs...@gmail.com> wrote: > http://stackoverflow.com/questions/37231616/add-a-new- > column-to-a-dataframe-new-column-i-want-it-to-be-a-uuid-generator > > > On Fri, Apr 7, 2017 at 3:56 PM, Everett Anderson <ever...@nuna.com.invalid > > wrote: >

Assigning a unique row ID

2017-04-07 Thread Everett Anderson
Hi, What's the best way to assign a truly unique row ID (rather than a hash) to a DataFrame/Dataset? I originally thought that functions.monotonically_increasing_id would do this, but it seems to have a rather unfortunate property that if you add it as a column to table A and then derive tables

Re: Spark 2.0.2 Dataset union() slowness vs RDD union?

2017-03-20 Thread Everett Anderson
PM, Everett Anderson <ever...@nuna.com> wrote: > Hi! > > On Thu, Mar 16, 2017 at 5:20 PM, Burak Yavuz <brk...@gmail.com> wrote: > >> Hi Everett, >> >> IIRC we added unionAll in Spark 2.0 which is the same implementation as >> rdd union. The union

Re: Spark 2.0.2 Dataset union() slowness vs RDD union?

2017-03-16 Thread Everett Anderson
rom https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/Dataset.html#union(org.apache.spark.sql.Dataset) and https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/Dataset.html#unionAll(org.apache.spark.sql.Dataset) > > Best, > Burak > > On Thu, M

Re: Spark 2.0.2 Dataset union() slowness vs RDD union?

2017-03-16 Thread Everett Anderson
16, 2017 at 2:55 PM, Everett Anderson <ever...@nuna.com> wrote: > Hi, > > We're using Dataset union() in Spark 2.0.2 to concatenate a bunch of > tables together and save as Parquet to S3, but it seems to take a long > time. We're using the S3A FileSystem implementation

Spark 2.0.2 Dataset union() slowness vs RDD union?

2017-03-16 Thread Everett Anderson
Hi, We're using Dataset union() in Spark 2.0.2 to concatenate a bunch of tables together and save as Parquet to S3, but it seems to take a long time. We're using the S3A FileSystem implementation under the covers, too, if that helps. Watching the Spark UI, the executors all eventually stop

Best way to assign a unique IDs to row groups

2017-03-01 Thread Everett Anderson
Hi, I've used functions.monotonically_increasing_id() for assigning a unique ID to all rows, but I'd like to assign a unique ID to each group of rows with the same key. The two ways I can think of to do this are Option 1: Create separate group ID table and join back - Create a new data

Re: Strange behavior with 'not' and filter pushdown

2017-02-14 Thread Everett Anderson
Wrapping this up -- fix is in 2.1.0 and has been backported to the 2.0.x branch, as well. On Mon, Feb 13, 2017 at 6:41 PM, Everett Anderson <ever...@nuna.com> wrote: > Went ahead and opened > > https://issues.apache.org/jira/browse/SPARK-19586 > > though I'd generally

Re: Strange behavior with 'not' and filter pushdown

2017-02-13 Thread Everett Anderson
Went ahead and opened https://issues.apache.org/jira/browse/SPARK-19586 though I'd generally expect to just close it as fixed in 2.1.0 and roll on. On Sat, Feb 11, 2017 at 5:01 PM, Everett Anderson <ever...@nuna.com> wrote: > On the plus side, looks like this may be fixed

Re: Strange behavior with 'not' and filter pushdown

2017-02-11 Thread Everett Anderson
) +- *FileScan parquet [username#14] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/tmp/test_table], PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))], ReadSchema: struct On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson <ever...@nuna.com> wrote: > Bumping th

Re: Strange behavior with 'not' and filter pushdown

2017-02-10 Thread Everett Anderson
Bumping this thread. Translating "where not(username is not null)" into a filter of [IsNotNull(username), Not(IsNotNull(username))] seems like a rather severe bug. Spark 1.6.2: explain select count(*) from parquet_table where not( username is not null) == Physical Plan ==

Re: Un-exploding / denormalizing Spark SQL help

2017-02-08 Thread Everett Anderson
nks everyone for the great suggestions!) > > On Thu, 9 Feb 2017 at 4:01 am, Xiaomeng Wan <shawn...@gmail.com> wrote: > >> You could also try pivot. >> >> On 7 February 2017 at 16:13, Everett Anderson <ever...@nuna.com.invalid> >> wrote: >>

Re: Un-exploding / denormalizing Spark SQL help

2017-02-07 Thread Everett Anderson
ay have seen groupBy + join be better than >> window (there were more exchanges in play for windows I reckon). >> >> Pozdrawiam, >> Jacek Laskowski >> >> https://medium.com/@jaceklaskowski/ >> Mastering Apache Spark 2.0 https://bit.ly/mastering-apac

Re: Un-exploding / denormalizing Spark SQL help

2017-02-07 Thread Everett Anderson
;).getField("priority")) .drop("1", "2", "3") +---++--+--+-+--+--+-+--+--+-+ | id|name|extra1| data1|priority1|extra2| data2|priority2|extra3| data3|priority3| +---++--+--+-+--+--+-+--+--+-+ | 1|Fred| 8|value1|1|

Un-exploding / denormalizing Spark SQL help

2017-02-07 Thread Everett Anderson
Hi, I'm trying to un-explode or denormalize a table like +---++-+--++ |id |name|extra|data |priority| +---++-+--++ |1 |Fred|8|value1|1 | |1 |Fred|8|value8|2 | |1 |Fred|8|value5|3 | |2 |Amy |9|value3|1 | |2 |Amy

Re: Running Spark on EMR

2017-01-16 Thread Everett Anderson
On Sun, Jan 15, 2017 at 11:09 AM, Andrew Holway < andrew.hol...@otternetworks.de> wrote: > use yarn :) > > "spark-submit --master yarn" > Doesn't this require first copying out various Hadoop configuration XML files from the EMR master node to the machine running the spark-submit? Or is there a

Re: Writing DataFrame filter results to separate files

2016-12-06 Thread Everett Anderson
On Mon, Dec 5, 2016 at 5:33 PM, Michael Armbrust wrote: > 1. In my case, I'd need to first explode my data by ~12x to assign each >> record to multiple 12-month rolling output windows. I'm not sure Spark SQL >> would be able to optimize this away, combining it with the

Re: Writing DataFrame filter results to separate files

2016-12-05 Thread Everett Anderson
each partition -- window in my case -- be shuffled to a single machine and then written together as one output shard? For a large amount of data per window, that seems less than ideal. > > On Mon, Dec 5, 2016 at 10:59 AM, Everett Anderson < > ever...@nuna.com.invalid> wrote: > >> Hi, &

Writing DataFrame filter results to separate files

2016-12-05 Thread Everett Anderson
Hi, I have a DataFrame of records with dates, and I'd like to write all 12-month (with overlap) windows to separate outputs. Currently, I have a loop equivalent to: for ((windowStart, windowEnd) <- windows) { val windowData = allData.filter( getFilterCriteria(windowStart,

Modifying Metadata in StructType schemas

2016-10-24 Thread Everett Anderson
Hi, I've been using the immutable Metadata within the StructType of a DataFrame/Dataset to track application-level column lineage. However, since it's immutable, the only way to modify it is to do a full trip of 1. Convert DataFrame/Dataset to Row RDD 2. Create new, modified Metadata per

Equivalent to --files for driver?

2016-09-21 Thread Everett Anderson
Hi, I'm running Spark 1.6.2 on YARN and I often use the cluster deploy mode with spark-submit. While the --files param is useful for getting files onto the cluster in the working directories of the executors, the driver's working directory doesn't get them. Is there some equivalent to --files

Re: S3A + EMR failure when writing Parquet?

2016-09-04 Thread Everett Anderson
d=true. Is that right? On Tue, Aug 30, 2016 at 11:49 AM, Steve Loughran <ste...@hortonworks.com> wrote: > > On 29 Aug 2016, at 18:18, Everett Anderson <ever...@nuna.com.INVALID > <ever...@nuna.com.invalid>> wrote: > > Okay, I don't think it's really just S3A is

Does Spark on YARN inherit or replace the Hadoop/YARN configs?

2016-08-30 Thread Everett Anderson
Hi, I've had a bit of trouble getting Spark on YARN to work. When executing in this mode and submitting from outside the cluster, one must set HADOOP_CONF_DIR or YARN_CONF_DIR , from which spark-submit can find the params it needs to

Re: S3A + EMR failure when writing Parquet?

2016-08-29 Thread Everett Anderson
t have the EMRFS implementation locally. On Sun, Aug 28, 2016 at 4:19 PM, Everett Anderson <ever...@nuna.com> wrote: > (Sorry, typo -- I was using spark.hadoop.mapreduce.f > ileoutputcommitter.algorithm.version=2 not 'hadooop', of course) > > On Sun, Aug 28, 2016 at 12:51 PM, Everett An

Re: S3A + EMR failure when writing Parquet?

2016-08-28 Thread Everett Anderson
(Sorry, typo -- I was using spark.hadoop.mapreduce. fileoutputcommitter.algorithm.version=2 not 'hadooop', of course) On Sun, Aug 28, 2016 at 12:51 PM, Everett Anderson <ever...@nuna.com> wrote: > Hi, > > I'm having some trouble figuring out a failure when using S3A when writin

S3A + EMR failure when writing Parquet?

2016-08-28 Thread Everett Anderson
Hi, I'm having some trouble figuring out a failure when using S3A when writing a DataFrame as Parquet on EMR 4.7.2 (which is Hadoop 2.7.2 and Spark 1.6.2). It works when using EMRFS (s3://), though. I'm using these extra conf params, though I've also tried without everything but the encryption

Re: Plans for improved Spark DataFrame/Dataset unit testing?

2016-08-21 Thread Everett Anderson
ht? > > -- > Bedrytski Aliaksandr > sp...@bedryt.ski > > > > On Sat, Aug 20, 2016, at 01:25, Everett Anderson wrote: > Hi! > > Just following up on this -- > > When people talk about a shared session/context for testing like this, > I assume it's sti

Re: Plans for improved Spark DataFrame/Dataset unit testing?

2016-08-19 Thread Everett Anderson
123 ). I'll try and > include this in the next release :) > > On Mon, Aug 1, 2016 at 9:22 AM, Koert Kuipers <ko...@tresata.com> wrote: > >> we share a single single sparksession across tests, and they can run in >> parallel. is pretty fast >> >> On Mon, Aug 1, 2016 at

Submitting jobs to YARN from outside EMR -- config & S3 impl

2016-08-15 Thread Everett Anderson
Hi, We're currently using an EMR cluster (which uses YARN) but submitting Spark jobs to it using spark-submit from different machines outside the cluster. We haven't had time to investigate using something like Livy , yet. We also have a need to use a mix of

Re: Java and SparkSession

2016-08-05 Thread Everett Anderson
Hi, Can you say more about what goes wrong? I was migrating my code and began using this for initialization: SparkConf sparkConf = new SparkConf().setAppName(...) SparkSession sparkSession = new SparkSession.Builder().config(sparkConf).getOrCreate(); JavaSparkContext jsc = new

Plans for improved Spark DataFrame/Dataset unit testing?

2016-08-01 Thread Everett Anderson
Hi, Right now, if any code uses DataFrame/Dataset, I need a test setup that brings up a local master as in this article . That's a lot of overhead for unit testing and the tests can't run in

Re: Role-based S3 access outside of EMR

2016-07-28 Thread Everett Anderson
p.fs.s3a.S3AFileSystem > fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem > fs.AbstractFileSystem.s3.impl=org.apache.hadoop.fs.s3a.S3A > fs.AbstractFileSystem.s3a.impl=org.apache.hadoop.fs.s3a.S3A > > And make sure the s3a jars are in your classpath > > Thanks, > Ewan > > *From:* Everett Ande

Re: Programmatic use of UDFs from Java

2016-07-22 Thread Everett Anderson
u, Jul 21, 2016 at 10:10 AM, Everett Anderson < > ever...@nuna.com.invalid> wrote: > >> Hi, >> >> In the Java Spark DataFrames API, you can create a UDF, register it, and >> then access it by string name by using the convenience UDF classes in >> org.apac

Re: Creating a DataFrame from scratch

2016-07-22 Thread Everett Anderson
it Integer.class, but I suspect it still won't work because Integer may not have the bean-style getters. On Fri, Jul 22, 2016 at 9:37 AM, Everett Anderson <ever...@nuna.com> wrote: > Hey, > > I think what's happening is that you're calling this createDataFrame > method > <https

Programmatic use of UDFs from Java

2016-07-21 Thread Everett Anderson
Hi, In the Java Spark DataFrames API, you can create a UDF, register it, and then access it by string name by using the convenience UDF classes in org.apache.spark.sql.api.java . Example

Re: Role-based S3 access outside of EMR

2016-07-21 Thread Everett Anderson
t; SDK and File System Dependencies >> >> as mentioned above, using EMRFS libs solved this problem: >> >> http://docs.aws.amazon.com//ElasticMapReduce/latest/ReleaseGuide/emr-fs.html >> >> >> 2016-07-21 8:37 GMT+02:00 Gourav Sengupta <gourav.sengu...@gm

Re: Role-based S3 access outside of EMR

2016-07-20 Thread Everett Anderson
the data from the > data center back to my local env. > > Andy > > From: Everett Anderson <ever...@nuna.com.INVALID> > Date: Tuesday, July 19, 2016 at 2:30 PM > To: "user @spark" <user@spark.apache.org> > Subject: Role-based S3 access outside o

Role-based S3 access outside of EMR

2016-07-19 Thread Everett Anderson
Hi, When running on EMR, AWS configures Hadoop to use their EMRFS Hadoop FileSystem implementation for s3:// URLs and seems to install the necessary S3 credentials properties, as well. Often, it's nice during development to run outside of a cluster even with the "local" Spark master, though,

Re: Best practice for handing tables between pipeline components

2016-06-28 Thread Everett Anderson
gt; Alluxio off heap memory would help to share cached objects >> >> On Mon, Jun 27, 2016 at 11:14 AM Everett Anderson >> <ever...@nuna.com.invalid> wrote: >> >>> Hi, >>> >>> We have a pipeline of components strung together via Airflow runnin

Best practice for handing tables between pipeline components

2016-06-27 Thread Everett Anderson
Hi, We have a pipeline of components strung together via Airflow running on AWS. Some of them are implemented in Spark, but some aren't. Generally they can all talk to a JDBC/ODBC end point or read/write files from S3. Ideally, we wouldn't suffer the I/O cost of writing all the data to HDFS or

Re: Best way to go from RDD to DataFrame of StringType columns

2016-06-17 Thread Everett Anderson
> HTH > > > Dr Mich Talebzadeh > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > &g

Re: Best way to go from RDD to DataFrame of StringType columns

2016-06-17 Thread Everett Anderson
t; <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* > > > > http://talebzadehmich.wordpress.com > > > > On 17 June 2016 at 20:38, Everett Anderson <ever...@nuna.com.invalid> > wrote: > >> Hi, >> >> I have

Best way to go from RDD to DataFrame of StringType columns

2016-06-17 Thread Everett Anderson
Hi, I have a system with files in a variety of non-standard input formats, though they're generally flat text files. I'd like to dynamically create DataFrames of string columns. What's the best way to go from a RDD to a DataFrame of StringType columns? My current plan is - Call map() on the

Re: StackOverflowError even with JavaSparkContext union(JavaRDD... rdds)

2016-06-05 Thread Everett Anderson
> memory consuming to have 80M for each thread (very simply there might be > 100 of them), but this is just a workaround. This is configuration that I > use to train random forest with input of 400k samples. > > Hope this helps. > > -- > Be well! > Jean Morozov > > O

StackOverflowError even with JavaSparkContext union(JavaRDD... rdds)

2016-06-05 Thread Everett Anderson
Hi! I have a fairly simple Spark (1.6.1) Java RDD-based program that's scanning through lines of about 1000 large text files of records and computing some metrics about each line (record type, line length, etc). Most are identical so I'm calling distinct(). In the loop over the list of files,