Re: Help taking last value in each group (aggregates)

2017-08-28 Thread Everett Anderson
I'm still unclear on if orderBy/groupBy + aggregates is a viable approach
or when one could rely on the last or first aggregate functions, but a
working alternative is to use window functions with row_number and a filter
kind of like this:

import spark.implicits._

val reverseOrdering = Seq("a", "b").map(col => df(col).desc)

val windowSpec = Window.partitionBy("group_id").orderBy(reverseOrdering:_*)

df.select("group_id",
  "row_id",
  sum("col_to_sum").over(windowSpec).as("total"),
  row_number().over(windowSpec).as("row_number"))
  .filter("row_number == 1")
  .select($"group_id",
  $"row_id".as("last_row_id"),
  $"total")

Would love to know if there's a better way!

On Mon, Aug 28, 2017 at 9:19 AM, Everett Anderson <ever...@nuna.com> wrote:

> Hi,
>
> I'm struggling a little with some unintuitive behavior with the Scala API.
> (Spark 2.0.2)
>
> I wrote something like
>
> df.orderBy("a", "b")
>   .groupBy("group_id")
>   .agg(sum("col_to_sum").as("total"),
>last("row_id").as("last_row_id")))
>
> and expected a result with a unique group_id column, a column called
> "total" that's the sum of all col_to_sum in each group, and a column called
> "last_row_id" that's the last row_id seen in each group when the groups are
> sorted by columns a and b.
>
> However, the result is actually non-deterministic and changes based on the
> initial sorting and partitioning of df.
>
> I also tried
>
> df.orderBy("group_id", "a", "b")
>   .groupBy("group_id")
>   .agg(sum("col_to_sum").as("total"),
>last("row_id").as("last_row_id")))
>
> thinking the problem might be that the groupBy does another shuffle that
> loses the ordering, but that also doesn't seem to work.
>
> Looking through the code
> <https://github.com/apache/spark/blob/branch-2.0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala>,
> both the Last and First aggregate functions have this comment:
>
> Even if [[Last]] is used on an already sorted column, if
> we do partial aggregation and final aggregation
> (when mergeExpression
> is used) its result will not be deterministic
> (unless the input table is sorted and has
> a single partition, and we use a single reducer to do the aggregation.).
>
>
> Some questions:
>
>1. What's the best way to take some values from the last row in an
>ordered group while performing some other aggregates over the entire group?
>
>2. Given these comments on last and first, when would these functions
>be useful? It would be rare to bring an entire Spark table to a single
>partition.
>
> Thanks!
>
>


Help taking last value in each group (aggregates)

2017-08-28 Thread Everett Anderson
Hi,

I'm struggling a little with some unintuitive behavior with the Scala API.
(Spark 2.0.2)

I wrote something like

df.orderBy("a", "b")
  .groupBy("group_id")
  .agg(sum("col_to_sum").as("total"),
   last("row_id").as("last_row_id")))

and expected a result with a unique group_id column, a column called
"total" that's the sum of all col_to_sum in each group, and a column called
"last_row_id" that's the last row_id seen in each group when the groups are
sorted by columns a and b.

However, the result is actually non-deterministic and changes based on the
initial sorting and partitioning of df.

I also tried

df.orderBy("group_id", "a", "b")
  .groupBy("group_id")
  .agg(sum("col_to_sum").as("total"),
   last("row_id").as("last_row_id")))

thinking the problem might be that the groupBy does another shuffle that
loses the ordering, but that also doesn't seem to work.

Looking through the code
,
both the Last and First aggregate functions have this comment:

Even if [[Last]] is used on an already sorted column, if
we do partial aggregation and final aggregation
(when mergeExpression
is used) its result will not be deterministic
(unless the input table is sorted and has
a single partition, and we use a single reducer to do the aggregation.).


Some questions:

   1. What's the best way to take some values from the last row in an
   ordered group while performing some other aggregates over the entire group?

   2. Given these comments on last and first, when would these functions be
   useful? It would be rare to bring an entire Spark table to a single
   partition.

Thanks!


Re: Spark, S3A, and 503 SlowDown / rate limit issues

2017-07-10 Thread Everett Anderson
Hey,

Thanks for the responses, guys!

On Thu, Jul 6, 2017 at 7:08 AM, Steve Loughran <ste...@hortonworks.com>
wrote:

>
> On 5 Jul 2017, at 14:40, Vadim Semenov <vadim.seme...@datadoghq.com>
> wrote:
>
> Are you sure that you use S3A?
> Because EMR says that they do not support S3A
>
> https://aws.amazon.com/premiumsupport/knowledge-center/emr-file-system-s3/
> > Amazon EMR does not currently support use of the Apache Hadoop S3A file
> system.
>
>
Oof. I figured they didn't offer technical support for S3A, but didn't know
that there was something saying EMR does not support use of S3A. My
impression was that many people were using it and it's the recommended S3
library in Hadoop 2.7+ <https://wiki.apache.org/hadoop/AmazonS3> from
Hadoop's point of view.

We're using it rather than S3N because we use encrypted buckets, and I
don't think S3N supports picking up credentials from a machine role. Also,
it was a bit distressing that it's unmaintained and has open bugs.

We're S3A rather than EMRFS because we have a setup where we submit work to
a cluster via spark-submit run outside the cluster master node with
--master yarn. When you do this, the Hadoop configuration accessible to
spark-submit overrides that of the EMR cluster itself. If you use a
configuration that uses EMRFS and any of the resources (like the JAR) you
give to spark-submit are on S3, spark-submit will instantiate the EMRFS
FileSystem impl, which is currently only available on the cluster, and
fail. That said, we could work around this by resetting the configuration
in code.


>
> I think that the HEAD requests come from the `createBucketIfNotExists` in
> the AWS S3 library that checks if the bucket exists every time you do a PUT
> request, i.e. creates a HEAD request.
>
> You can disable that by setting `fs.s3.buckets.create.enabled` to `false`
> http://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-
> plan-upload-s3.html
>
> Oh, interesting. We are definitely seeing a ton of HEAD requests, which
might be that. It looks like the `fs.s3.buckets.create.enabled` is an EMRFS
option, though, not one common to the Hadoop S3 FileSystem implementations.
Does that sound right?

>
>
>
>
> Yeah, I'd like to see the stack traces before blaming S3A and the ASF
> codebase
>

(Sorry, to be clear -- I'm not trying to blame S3A. I figured someone else
might've hit this and bet we had just misconfigured something or were doing
this the wrong way.)


>
> One thing I do know is that the shipping S3A client doesn't have any
> explicit handling of 503/retry events. I know that:
> https://issues.apache.org/jira/browse/HADOOP-14531
>
> There is some retry logic in bits of the AWS SDK related to file upload:
> that may log and retry, but in all the operations listing files, getting
> their details, etc: no resilience to throttling.
>
> If it is surfacing against s3a, there isn't anything which can immediately
> be done to fix it, other than "spread your data around more buckets". Do
> attach the stack trace you get under https://issues.apache.or
> g/jira/browse/HADOOP-14381 though: I'm about half-way through the
> resilience code (& fault injection needed to test it). The more where I can
> see problems arise, the more confident I can be that those codepaths will
> be resilient.
>

Will do!

We did end up finding that some of our jobs were sharding data way too
finely, ending up with 5-10k+ tiny Parquet shards per table. This happened
when we unioned many Spark DataFrames together without doing a repartition
or coalesce afterwards. After throwing in a repartition (to additionally
balance the output shards) we haven't seen the error, again, but our graphs
of S3 HEAD requests are still rather alarmingly high.




>
>
> On Thu, Jun 29, 2017 at 4:56 PM, Everett Anderson <
> ever...@nuna.com.invalid> wrote:
>
>> Hi,
>>
>> We're using Spark 2.0.2 + Hadoop 2.7.3 on AWS EMR with S3A for direct I/O
>> from/to S3 from our Spark jobs. We set 
>> mapreduce.fileoutputcommitter.algorithm.version=2
>> and are using encrypted S3 buckets.
>>
>> This has been working fine for us, but perhaps as we've been running more
>> jobs in parallel, we've started getting errors like
>>
>> Status Code: 503, AWS Service: Amazon S3, AWS Request ID: ..., AWS Error
>> Code: SlowDown, AWS Error Message: Please reduce your request rate., S3
>> Extended Request ID: ...
>>
>> We enabled CloudWatch S3 request metrics for one of our buckets and I was
>> a little alarmed to see spikes of over 800k S3 requests over a minute or
>> so, with the bulk of them HEAD requests.
>>
>> We read and write Parquet files, and most tables have around 50
>> shards/parts, though some have up to 200. I imagine there's additional
>> parallelism when reading a shard in Parquet, though.
>>
>> Has anyone else encountered this? How did you solve it?
>>
>> I'd sure prefer to avoid copying all our data in and out of HDFS for each
>> job, if possible.
>>
>> Thanks!
>>
>>
>
>


Spark, S3A, and 503 SlowDown / rate limit issues

2017-06-29 Thread Everett Anderson
Hi,

We're using Spark 2.0.2 + Hadoop 2.7.3 on AWS EMR with S3A for direct I/O
from/to S3 from our Spark jobs. We
set mapreduce.fileoutputcommitter.algorithm.version=2 and are using
encrypted S3 buckets.

This has been working fine for us, but perhaps as we've been running more
jobs in parallel, we've started getting errors like

Status Code: 503, AWS Service: Amazon S3, AWS Request ID: ..., AWS Error
Code: SlowDown, AWS Error Message: Please reduce your request rate., S3
Extended Request ID: ...

We enabled CloudWatch S3 request metrics for one of our buckets and I was a
little alarmed to see spikes of over 800k S3 requests over a minute or so,
with the bulk of them HEAD requests.

We read and write Parquet files, and most tables have around 50
shards/parts, though some have up to 200. I imagine there's additional
parallelism when reading a shard in Parquet, though.

Has anyone else encountered this? How did you solve it?

I'd sure prefer to avoid copying all our data in and out of HDFS for each
job, if possible.

Thanks!


Re: Temp checkpoint directory for EMR (S3 or HDFS)

2017-05-30 Thread Everett Anderson
Still haven't found a --conf option.

Regarding a temporary HDFS checkpoint directory, it looks like when using
--master yarn, spark-submit supplies a SPARK_YARN_STAGING_DIR environment
variable. Thus, one could do the following when creating a SparkSession:

val checkpointPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"),
"checkpoints").toString
sparkSession.sparkContext.setCheckpointDir(checkpointPath)

The staging directory is in an HDFS path like

/user/[user]/.sparkStaging/[YARN application ID]

and is deleted at the end of the application
<https://github.com/apache/spark/blob/branch-2.0/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala#L184>
.

So this is one option, though certainly abusing the staging directory.

A more general one might be to find where Dataset.persist(DISK_ONLY) writes.


On Fri, May 26, 2017 at 9:08 AM, Everett Anderson <ever...@nuna.com> wrote:

> Hi,
>
> I need to set a checkpoint directory as I'm starting to use GraphFrames.
> (Also, occasionally my regular DataFrame lineages get too long so it'd be
> nice to use checkpointing to squash the lineage.)
>
> I don't actually need this checkpointed data to live beyond the life of
> the job, however. I'm running jobs on AWS EMR (so on YARN + HDFS) and
> reading and writing non-transient data to S3.
>
> Two questions:
>
> 1. Is there a Spark --conf option to set the checkpoint directory? Somehow
> I couldn't find it, but surely it exists.
>
> 2. What's a good checkpoint directory for this use case? I imagine it'd be
> on HDFS and presumably in a YARN application-specific temporary path that
> gets cleaned up afterwards. Does anyone have a recommendation?
>
> Thanks!
>
> - Everett
>
>


Temp checkpoint directory for EMR (S3 or HDFS)

2017-05-26 Thread Everett Anderson
Hi,

I need to set a checkpoint directory as I'm starting to use GraphFrames.
(Also, occasionally my regular DataFrame lineages get too long so it'd be
nice to use checkpointing to squash the lineage.)

I don't actually need this checkpointed data to live beyond the life of the
job, however. I'm running jobs on AWS EMR (so on YARN + HDFS) and reading
and writing non-transient data to S3.

Two questions:

1. Is there a Spark --conf option to set the checkpoint directory? Somehow
I couldn't find it, but surely it exists.

2. What's a good checkpoint directory for this use case? I imagine it'd be
on HDFS and presumably in a YARN application-specific temporary path that
gets cleaned up afterwards. Does anyone have a recommendation?

Thanks!

- Everett


Re: Driver spins hours in query plan optimization

2017-05-02 Thread Everett Anderson
Seems like

https://issues.apache.org/jira/browse/SPARK-13346

is likely the same issue.

Seems like for some people persist() doesn't work and they have to convert
to RDDs and back.


On Fri, Apr 14, 2017 at 1:39 PM, Everett Anderson <ever...@nuna.com> wrote:

> Hi,
>
> We keep hitting a situation on Spark 2.0.2 (haven't tested later versions,
> yet) where the driver spins forever seemingly in query plan optimization
> for moderate queries, such as the union of a few (~5) other DataFrames.
>
> We can see the driver spinning with one core in the nioEventLoopGroup-2-2
> thread in a deep trace like the attached.
>
> Throwing in a MEMORY_OR_DISK persist() so the query plan is collapsed
> works around this, but it's a little surprising how often we encounter the
> problem, forcing us to work to manage persisting/unpersisting tables and
> potentially suffering unnecessary disk I/O.
>
> I've looking through JIRA but don't see open issues about this -- might've
> just not found them successfully.
>
> Anyone else encounter this?
>
>


Re: Calculate mode separately for multiple columns in row

2017-04-27 Thread Everett Anderson
For the curious, I played around with a UDAF for this (shown below). On the
downside, it assembles a Map of all possible values of the column that'll
need to be stored in memory somewhere.

I suspect some kind of sorted groupByKey + cogroup could stream values
through, though might not support partial aggregation, then. Will try that
next.

/**
  * [[UserDefinedAggregateFunction]] for computing the mode of a string
column.
  *
  * WARNING: This will assemble a Map of all possible values in memory.
  *
  * It'll ignore null values and return null if all values are null.
  */
class ModeAggregateFunction extends UserDefinedAggregateFunction {

  override def inputSchema: StructType = StructType(StructField("value",
StringType) :: Nil)

  override def bufferSchema: StructType = StructType(
StructField("values", MapType(StringType, LongType, valueContainsNull =
false)) :: Nil)

  override def dataType: DataType = StringType

  override def deterministic: Boolean = true

  // This is the initial value for your buffer schema.
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Map[String, Long]()
  }

  override def update(buffer: MutableAggregationBuffer, input: Row): Unit =
{
if (input == null || input.getString(0) == null) {
  return
}

val value = input.getString(0)
val frequencies = buffer.getAs[Map[String, Long]](0)
val count = frequencies.getOrElse(value, 0L)

buffer(0) = frequencies + (value -> (count + 1L))
  }

  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit
= {
val frequencies1: Map[String, Long] = buffer1.getAs[Map[String,
Long]](0)
val frequencies2: Map[String, Long] = buffer2.getAs[Map[String,
Long]](0)

buffer1(0) = frequencies1 ++ frequencies2.map({
  case (k: String,v: Long) => k -> (v.asInstanceOf[Long] +
frequencies1.getOrElse(k, 0L))
})
  }

  override def evaluate(buffer: Row): Any = {
val frequencies = buffer.getAs[Map[String, Long]](0)
if (frequencies.isEmpty) {
  return null
}
frequencies.maxBy(_._2)._1
  }
}




On Wed, Apr 26, 2017 at 10:21 AM, Everett Anderson <ever...@nuna.com> wrote:

> Hi,
>
> One common situation I run across is that I want to compact my data and
> select the mode (most frequent value) in several columns for each group.
>
> Even calculating mode for one column in SQL is a bit tricky. The ways I've
> seen usually involve a nested sub-select with a group by + count and then a
> window function using rank().
>
> However, what if you want to calculate the mode for several columns,
> producing a new row with the results? And let's say the set of columns is
> only known at runtime.
>
> In Spark SQL, I start going down a road of many self-joins. The more
> efficient way leads me to either RDD[Row] or Dataset[Row] where I could do
> a groupByKey + flatMapGroups, keeping state as I iterate over the Rows in
> each group.
>
> What's the best way?
>
> Here's a contrived example:
>
> val input = spark.sparkContext.parallelize(Seq(
> ("catosaur", "black", "claws"),
> ("catosaur", "orange", "scales"),
> ("catosaur", "black", "scales"),
> ("catosaur", "orange", "scales"),
> ("catosaur", "black", "spikes"),
> ("bearcopter", "gray", "claws"),
> ("bearcopter", "black", "fur"),
> ("bearcopter", "gray", "flight"),
> ("bearcopter", "gray", "flight")))
> .toDF("creature", "color", "feature")
>
> +--+--+---+
> |creature  |color |feature|
> +--+--+---+
> |catosaur  |black |claws  |
> |catosaur  |orange|scales |
> |catosaur  |black |scales |
> |catosaur  |orange|scales |
> |catosaur  |black |spikes |
> |bearcopter|gray  |claws  |
> |bearcopter|black |fur|
> |bearcopter|gray  |flight |
> |bearcopter|gray  |flight |
> +--+--+---+
>
> val expectedOutput = spark.sparkContext.parallelize(Seq(
> ("catosaur", "black", "scales"),
> ("bearcopter", "gray", "flight")))
> .toDF("creature", "color", "feature")
>
> +--+-+---+
> |creature  |color|feature|
> +--+-+---+
> |catosaur  |black|scales |
> |bearcopter|gray |flight |
> +--+-+---+
>
>
>
>
>
>


Calculate mode separately for multiple columns in row

2017-04-26 Thread Everett Anderson
Hi,

One common situation I run across is that I want to compact my data and
select the mode (most frequent value) in several columns for each group.

Even calculating mode for one column in SQL is a bit tricky. The ways I've
seen usually involve a nested sub-select with a group by + count and then a
window function using rank().

However, what if you want to calculate the mode for several columns,
producing a new row with the results? And let's say the set of columns is
only known at runtime.

In Spark SQL, I start going down a road of many self-joins. The more
efficient way leads me to either RDD[Row] or Dataset[Row] where I could do
a groupByKey + flatMapGroups, keeping state as I iterate over the Rows in
each group.

What's the best way?

Here's a contrived example:

val input = spark.sparkContext.parallelize(Seq(
("catosaur", "black", "claws"),
("catosaur", "orange", "scales"),
("catosaur", "black", "scales"),
("catosaur", "orange", "scales"),
("catosaur", "black", "spikes"),
("bearcopter", "gray", "claws"),
("bearcopter", "black", "fur"),
("bearcopter", "gray", "flight"),
("bearcopter", "gray", "flight")))
.toDF("creature", "color", "feature")

+--+--+---+
|creature  |color |feature|
+--+--+---+
|catosaur  |black |claws  |
|catosaur  |orange|scales |
|catosaur  |black |scales |
|catosaur  |orange|scales |
|catosaur  |black |spikes |
|bearcopter|gray  |claws  |
|bearcopter|black |fur|
|bearcopter|gray  |flight |
|bearcopter|gray  |flight |
+--+--+---+

val expectedOutput = spark.sparkContext.parallelize(Seq(
("catosaur", "black", "scales"),
("bearcopter", "gray", "flight")))
.toDF("creature", "color", "feature")

+--+-+---+
|creature  |color|feature|
+--+-+---+
|catosaur  |black|scales |
|bearcopter|gray |flight |
+--+-+---+


Driver spins hours in query plan optimization

2017-04-14 Thread Everett Anderson
Hi,

We keep hitting a situation on Spark 2.0.2 (haven't tested later versions,
yet) where the driver spins forever seemingly in query plan optimization
for moderate queries, such as the union of a few (~5) other DataFrames.

We can see the driver spinning with one core in the nioEventLoopGroup-2-2
thread in a deep trace like the attached.

Throwing in a MEMORY_OR_DISK persist() so the query plan is collapsed works
around this, but it's a little surprising how often we encounter the
problem, forcing us to work to manage persisting/unpersisting tables and
potentially suffering unnecessary disk I/O.

I've looking through JIRA but don't see open issues about this -- might've
just not found them successfully.

Anyone else encounter this?
org.apache.spark.sql.catalyst.expressions.AttributeReference.equals(namedExpressions.scala:228)
org.apache.spark.sql.catalyst.expressions.Cast.equals(Cast.scala:119)
scala.collection.IndexedSeqOptimized$class.sameElements(IndexedSeqOptimized.scala:167)
scala.collection.mutable.ArrayBuffer.sameElements(ArrayBuffer.scala:48)
scala.collection.GenSeqLike$class.equals(GenSeqLike.scala:475)
scala.collection.AbstractSeq.equals(Seq.scala:41)
org.apache.spark.sql.catalyst.expressions.Least.equals(conditionalExpressions.scala:290)
scala.collection.IndexedSeqOptimized$class.sameElements(IndexedSeqOptimized.scala:167)
scala.collection.mutable.ArrayBuffer.sameElements(ArrayBuffer.scala:48)
scala.collection.GenSeqLike$class.equals(GenSeqLike.scala:475)
scala.collection.AbstractSeq.equals(Seq.scala:41)
org.apache.spark.sql.catalyst.expressions.Greatest.equals(conditionalExpressions.scala:350)
org.apache.spark.sql.catalyst.expressions.GreaterThanOrEqual.equals(predicates.scala:522)
scala.collection.mutable.FlatHashTable$class.addEntry(FlatHashTable.scala:151)
scala.collection.mutable.HashSet.addEntry(HashSet.scala:40)
scala.collection.mutable.FlatHashTable$class.addElem(FlatHashTable.scala:139)
scala.collection.mutable.HashSet.addElem(HashSet.scala:40)
scala.collection.mutable.HashSet.$plus$eq(HashSet.scala:59)
scala.collection.mutable.HashSet.$plus$eq(HashSet.scala:40)
scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59)
scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59)
scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
scala.collection.mutable.AbstractSet.$plus$plus$eq(Set.scala:46)
scala.collection.mutable.HashSet.clone(HashSet.scala:83)
scala.collection.mutable.HashSet.clone(HashSet.scala:40)
org.apache.spark.sql.catalyst.expressions.ExpressionSet.$plus(ExpressionSet.scala:65)
org.apache.spark.sql.catalyst.expressions.ExpressionSet.$plus(ExpressionSet.scala:50)
scala.collection.SetLike$$anonfun$$plus$plus$1.apply(SetLike.scala:141)
scala.collection.SetLike$$anonfun$$plus$plus$1.apply(SetLike.scala:141)
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:316)
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:972)
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:972)
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:972)
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:972)
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
scala.collection.TraversableOnce$class.$div$colon(TraversableOnce.scala:151)
scala.collection.AbstractTraversable.$div$colon(Traversable.scala:104)
scala.collection.SetLike$class.$plus$plus(SetLike.scala:141)
org.apache.spark.sql.catalyst.expressions.ExpressionSet.$plus$plus(ExpressionSet.scala:50)
org.apache.spark.sql.catalyst.plans.logical.UnaryNode$$anonfun$getAliasedConstraints$1.apply(LogicalPlan.scala:300)
org.apache.spark.sql.catalyst.plans.logical.UnaryNode$$anonfun$getAliasedConstraints$1.apply(LogicalPlan.scala:297)
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
org.apache.spark.sql.catalyst.plans.logical.UnaryNode.getAliasedConstraints(LogicalPlan.scala:297)
org.apache.spark.sql.catalyst.plans.logical.Project.validConstraints(basicLogicalOperators.scala:55)
org.apache.spark.sql.catalyst.plans.QueryPlan.constraints$lzycompute(QueryPlan.scala:174)
org.apache.spark.sql.catalyst.plans.QueryPlan.constraints(QueryPlan.scala:174)
org.apache.spark.sql.catalyst.plans.logical.Project.validConstraints(basicLogicalOperators.scala:55)
org.apache.spark.sql.catalyst.plans.QueryPlan.constraints$lzycompute(QueryPlan.scala:174)
org.apache.spark.sql.catalyst.plans.QueryPlan.constraints(QueryPlan.scala:174)

Re: Assigning a unique row ID

2017-04-10 Thread Everett Anderson
Indeed, I tried persist with MEMORY_AND_DISK and it works! (I'm wary of
MEMORY_ONLY for this as it could potentially recompute shards if it
couldn't entirely cache in memory.)

Thanks for the help, everybody!!

On Sat, Apr 8, 2017 at 11:54 AM, Everett Anderson <ever...@nuna.com> wrote:

>
>
> On Fri, Apr 7, 2017 at 8:04 PM, Subhash Sriram <subhash.sri...@gmail.com>
> wrote:
>
>> Hi,
>>
>> We use monotonically_increasing_id() as well, but just cache the table
>> first like Ankur suggested. With that method, we get the same keys in all
>> derived tables.
>>
>
> Ah, okay, awesome. Let me give that a go.
>
>
>
>>
>> Thanks,
>> Subhash
>>
>> Sent from my iPhone
>>
>> On Apr 7, 2017, at 7:32 PM, Everett Anderson <ever...@nuna.com.INVALID>
>> wrote:
>>
>> Hi,
>>
>> Thanks, but that's using a random UUID. Certainly unlikely to have
>> collisions, but not guaranteed.
>>
>> I'd rather prefer something like monotonically_increasing_id or RDD's
>> zipWithUniqueId but with better behavioral characteristics -- so they don't
>> surprise people when 2+ outputs derived from an original table end up not
>> having the same IDs for the same rows, anymore.
>>
>> It seems like this would be possible under the covers, but would have the
>> performance penalty of needing to do perhaps a count() and then also a
>> checkpoint.
>>
>> I was hoping there's a better way.
>>
>>
>> On Fri, Apr 7, 2017 at 4:24 PM, Tim Smith <secs...@gmail.com> wrote:
>>
>>> http://stackoverflow.com/questions/37231616/add-a-new-column
>>> -to-a-dataframe-new-column-i-want-it-to-be-a-uuid-generator
>>>
>>>
>>> On Fri, Apr 7, 2017 at 3:56 PM, Everett Anderson <
>>> ever...@nuna.com.invalid> wrote:
>>>
>>>> Hi,
>>>>
>>>> What's the best way to assign a truly unique row ID (rather than a
>>>> hash) to a DataFrame/Dataset?
>>>>
>>>> I originally thought that functions.monotonically_increasing_id would
>>>> do this, but it seems to have a rather unfortunate property that if you add
>>>> it as a column to table A and then derive tables X, Y, Z and save those,
>>>> the row ID values in X, Y, and Z may end up different. I assume this is
>>>> because it delays the actual computation to the point where each of those
>>>> tables is computed.
>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> --
>>> Thanks,
>>>
>>> Tim
>>>
>>
>>
>


Re: Assigning a unique row ID

2017-04-08 Thread Everett Anderson
On Fri, Apr 7, 2017 at 8:04 PM, Subhash Sriram <subhash.sri...@gmail.com>
wrote:

> Hi,
>
> We use monotonically_increasing_id() as well, but just cache the table
> first like Ankur suggested. With that method, we get the same keys in all
> derived tables.
>

Ah, okay, awesome. Let me give that a go.



>
> Thanks,
> Subhash
>
> Sent from my iPhone
>
> On Apr 7, 2017, at 7:32 PM, Everett Anderson <ever...@nuna.com.INVALID>
> wrote:
>
> Hi,
>
> Thanks, but that's using a random UUID. Certainly unlikely to have
> collisions, but not guaranteed.
>
> I'd rather prefer something like monotonically_increasing_id or RDD's
> zipWithUniqueId but with better behavioral characteristics -- so they don't
> surprise people when 2+ outputs derived from an original table end up not
> having the same IDs for the same rows, anymore.
>
> It seems like this would be possible under the covers, but would have the
> performance penalty of needing to do perhaps a count() and then also a
> checkpoint.
>
> I was hoping there's a better way.
>
>
> On Fri, Apr 7, 2017 at 4:24 PM, Tim Smith <secs...@gmail.com> wrote:
>
>> http://stackoverflow.com/questions/37231616/add-a-new-column
>> -to-a-dataframe-new-column-i-want-it-to-be-a-uuid-generator
>>
>>
>> On Fri, Apr 7, 2017 at 3:56 PM, Everett Anderson <
>> ever...@nuna.com.invalid> wrote:
>>
>>> Hi,
>>>
>>> What's the best way to assign a truly unique row ID (rather than a hash)
>>> to a DataFrame/Dataset?
>>>
>>> I originally thought that functions.monotonically_increasing_id would
>>> do this, but it seems to have a rather unfortunate property that if you add
>>> it as a column to table A and then derive tables X, Y, Z and save those,
>>> the row ID values in X, Y, and Z may end up different. I assume this is
>>> because it delays the actual computation to the point where each of those
>>> tables is computed.
>>>
>>>
>>
>>
>> --
>>
>> --
>> Thanks,
>>
>> Tim
>>
>
>


Re: Assigning a unique row ID

2017-04-07 Thread Everett Anderson
Hi,

Thanks, but that's using a random UUID. Certainly unlikely to have
collisions, but not guaranteed.

I'd rather prefer something like monotonically_increasing_id or RDD's
zipWithUniqueId but with better behavioral characteristics -- so they don't
surprise people when 2+ outputs derived from an original table end up not
having the same IDs for the same rows, anymore.

It seems like this would be possible under the covers, but would have the
performance penalty of needing to do perhaps a count() and then also a
checkpoint.

I was hoping there's a better way.


On Fri, Apr 7, 2017 at 4:24 PM, Tim Smith <secs...@gmail.com> wrote:

> http://stackoverflow.com/questions/37231616/add-a-new-
> column-to-a-dataframe-new-column-i-want-it-to-be-a-uuid-generator
>
>
> On Fri, Apr 7, 2017 at 3:56 PM, Everett Anderson <ever...@nuna.com.invalid
> > wrote:
>
>> Hi,
>>
>> What's the best way to assign a truly unique row ID (rather than a hash)
>> to a DataFrame/Dataset?
>>
>> I originally thought that functions.monotonically_increasing_id would do
>> this, but it seems to have a rather unfortunate property that if you add it
>> as a column to table A and then derive tables X, Y, Z and save those, the
>> row ID values in X, Y, and Z may end up different. I assume this is because
>> it delays the actual computation to the point where each of those tables is
>> computed.
>>
>>
>
>
> --
>
> --
> Thanks,
>
> Tim
>


Assigning a unique row ID

2017-04-07 Thread Everett Anderson
Hi,

What's the best way to assign a truly unique row ID (rather than a hash) to
a DataFrame/Dataset?

I originally thought that functions.monotonically_increasing_id would do
this, but it seems to have a rather unfortunate property that if you add it
as a column to table A and then derive tables X, Y, Z and save those, the
row ID values in X, Y, and Z may end up different. I assume this is because
it delays the actual computation to the point where each of those tables is
computed.


Re: Spark 2.0.2 Dataset union() slowness vs RDD union?

2017-03-20 Thread Everett Anderson
Closing the loop on this --

It appears we were just hitting some other problem related to S3A/S3,
likely that the temporary directory used by the S3A Hadoop file system
implementation for buffering data during upload either was full or had the
wrong permissions.




On Thu, Mar 16, 2017 at 6:03 PM, Everett Anderson <ever...@nuna.com> wrote:

> Hi!
>
> On Thu, Mar 16, 2017 at 5:20 PM, Burak Yavuz <brk...@gmail.com> wrote:
>
>> Hi Everett,
>>
>> IIRC we added unionAll in Spark 2.0 which is the same implementation as
>> rdd union. The union in DataFrames with Spark 2.0 does dedeuplication, and
>> that's why you should be seeing the slowdown.
>>
>
> I thought it was the other way -- unionAll was deprecated in 2.0 and union
> now does not de-dupe --
>
> "Deprecated. use union(). Since 2.0.0.
> Returns a new Dataset containing union of rows in this Dataset and another
> Dataset. This is equivalent to UNION ALL in SQL."
>
> from
>
> https://spark.apache.org/docs/2.0.2/api/java/org/apache/
> spark/sql/Dataset.html#union(org.apache.spark.sql.Dataset)
> and
> https://spark.apache.org/docs/2.0.2/api/java/org/apache/
> spark/sql/Dataset.html#unionAll(org.apache.spark.sql.Dataset)
>
>
>
>
>
>
>
>
>
>>
>> Best,
>> Burak
>>
>> On Thu, Mar 16, 2017 at 4:14 PM, Everett Anderson <
>> ever...@nuna.com.invalid> wrote:
>>
>>> Looks like the Dataset version of union may also fail with the following
>>> on larger data sets, which again seems like it might be drawing everything
>>> into the driver for some reason --
>>>
>>> 7/03/16 22:28:21 WARN TaskSetManager: Lost task 1.0 in stage 91.0 (TID
>>> 5760, ip-10-8-52-198.us-west-2.compute.internal):
>>> java.lang.IllegalArgumentException: bound must be positive
>>> at java.util.Random.nextInt(Random.java:388)
>>> at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.c
>>> onfChanged(LocalDirAllocator.java:305)
>>> at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.g
>>> etLocalPathForWrite(LocalDirAllocator.java:344)
>>> at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.c
>>> reateTmpFileForWrite(LocalDirAllocator.java:416)
>>> at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite
>>> (LocalDirAllocator.java:198)
>>> at org.apache.hadoop.fs.s3a.S3AOutputStream.(S3AOutputStr
>>> eam.java:87)
>>> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410)
>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
>>> at org.apache.parquet.hadoop.ParquetFileWriter.(ParquetFi
>>> leWriter.java:176)
>>> at org.apache.parquet.hadoop.ParquetFileWriter.(ParquetFi
>>> leWriter.java:160)
>>> at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWrite
>>> r(ParquetOutputFormat.java:289)
>>> at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWrite
>>> r(ParquetOutputFormat.java:262)
>>> at org.apache.spark.sql.execution.datasources.parquet.ParquetOu
>>> tputWriter.(ParquetFileFormat.scala:562)
>>> at org.apache.spark.sql.execution.datasources.parquet.ParquetFi
>>> leFormat$$anon$1.newInstance(ParquetFileFormat.scala:139)
>>> at org.apache.spark.sql.execution.datasources.BaseWriterContain
>>> er.newOutputWriter(WriterContainer.scala:131)
>>> at org.apache.spark.sql.execution.datasources.DefaultWriterCont
>>> ainer.writeRows(WriterContainer.scala:247)
>>> at org.apache.spark.sql.execution.datasources.InsertIntoHadoopF
>>> sRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.appl
>>> y(InsertIntoHadoopFsRelationCommand.scala:143)
>>> at org.apache.spark.sql.execution.datasources.InsertIntoHadoopF
>>> sRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.appl
>>> y(InsertIntoHadoopFsRelationCommand.scala:143)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:86)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>> Executor.java:1142)
>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>> lExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> On Thu, Mar 16, 2017 at 2:55 PM, Everett Anderson <ever...@nuna.com>

Re: Spark 2.0.2 Dataset union() slowness vs RDD union?

2017-03-16 Thread Everett Anderson
Hi!

On Thu, Mar 16, 2017 at 5:20 PM, Burak Yavuz <brk...@gmail.com> wrote:

> Hi Everett,
>
> IIRC we added unionAll in Spark 2.0 which is the same implementation as
> rdd union. The union in DataFrames with Spark 2.0 does dedeuplication, and
> that's why you should be seeing the slowdown.
>

I thought it was the other way -- unionAll was deprecated in 2.0 and union
now does not de-dupe --

"Deprecated. use union(). Since 2.0.0.
Returns a new Dataset containing union of rows in this Dataset and another
Dataset. This is equivalent to UNION ALL in SQL."

from

https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/Dataset.html#union(org.apache.spark.sql.Dataset)
and
https://spark.apache.org/docs/2.0.2/api/java/org/apache/spark/sql/Dataset.html#unionAll(org.apache.spark.sql.Dataset)









>
> Best,
> Burak
>
> On Thu, Mar 16, 2017 at 4:14 PM, Everett Anderson <
> ever...@nuna.com.invalid> wrote:
>
>> Looks like the Dataset version of union may also fail with the following
>> on larger data sets, which again seems like it might be drawing everything
>> into the driver for some reason --
>>
>> 7/03/16 22:28:21 WARN TaskSetManager: Lost task 1.0 in stage 91.0 (TID
>> 5760, ip-10-8-52-198.us-west-2.compute.internal):
>> java.lang.IllegalArgumentException: bound must be positive
>> at java.util.Random.nextInt(Random.java:388)
>> at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.c
>> onfChanged(LocalDirAllocator.java:305)
>> at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.g
>> etLocalPathForWrite(LocalDirAllocator.java:344)
>> at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.c
>> reateTmpFileForWrite(LocalDirAllocator.java:416)
>> at org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite
>> (LocalDirAllocator.java:198)
>> at org.apache.hadoop.fs.s3a.S3AOutputStream.(S3AOutputStr
>> eam.java:87)
>> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410)
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
>> at org.apache.parquet.hadoop.ParquetFileWriter.(ParquetFi
>> leWriter.java:176)
>> at org.apache.parquet.hadoop.ParquetFileWriter.(ParquetFi
>> leWriter.java:160)
>> at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWrite
>> r(ParquetOutputFormat.java:289)
>> at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWrite
>> r(ParquetOutputFormat.java:262)
>> at org.apache.spark.sql.execution.datasources.parquet.ParquetOu
>> tputWriter.(ParquetFileFormat.scala:562)
>> at org.apache.spark.sql.execution.datasources.parquet.ParquetFi
>> leFormat$$anon$1.newInstance(ParquetFileFormat.scala:139)
>> at org.apache.spark.sql.execution.datasources.BaseWriterContain
>> er.newOutputWriter(WriterContainer.scala:131)
>> at org.apache.spark.sql.execution.datasources.DefaultWriterCont
>> ainer.writeRows(WriterContainer.scala:247)
>> at org.apache.spark.sql.execution.datasources.InsertIntoHadoopF
>> sRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.
>> apply(InsertIntoHadoopFsRelationCommand.scala:143)
>> at org.apache.spark.sql.execution.datasources.InsertIntoHadoopF
>> sRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.
>> apply(InsertIntoHadoopFsRelationCommand.scala:143)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>> at org.apache.spark.scheduler.Task.run(Task.scala:86)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> On Thu, Mar 16, 2017 at 2:55 PM, Everett Anderson <ever...@nuna.com>
>> wrote:
>>
>>> Hi,
>>>
>>> We're using Dataset union() in Spark 2.0.2 to concatenate a bunch of
>>> tables together and save as Parquet to S3, but it seems to take a long
>>> time. We're using the S3A FileSystem implementation under the covers, too,
>>> if that helps.
>>>
>>> Watching the Spark UI, the executors all eventually stop (we're using
>>> dynamic allocation) but under the SQL tab we can see a "save at
>>> NativeMethodAccessorImpl.java:-2" in Running Queries. The driver is
>>> still running of course, but it may take tens of minutes to finish. It
>>> makes me wonder if our data all being collected through the driver.
>>>
>>> If we instead convert the Datasets to RDDs and call SparkContext.union()
>>> it works quickly.
>>>
>>> Anyone know if this is a known issue?
>>>
>>>
>>
>


Re: Spark 2.0.2 Dataset union() slowness vs RDD union?

2017-03-16 Thread Everett Anderson
Looks like the Dataset version of union may also fail with the following on
larger data sets, which again seems like it might be drawing everything
into the driver for some reason --

7/03/16 22:28:21 WARN TaskSetManager: Lost task 1.0 in stage 91.0 (TID
5760, ip-10-8-52-198.us-west-2.compute.internal):
java.lang.IllegalArgumentException: bound must be positive
at java.util.Random.nextInt(Random.java:388)
at
org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.confChanged(LocalDirAllocator.java:305)
at
org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:344)
at
org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.createTmpFileForWrite(LocalDirAllocator.java:416)
at
org.apache.hadoop.fs.LocalDirAllocator.createTmpFileForWrite(LocalDirAllocator.java:198)
at org.apache.hadoop.fs.s3a.S3AOutputStream.(S3AOutputStream.java:87)
at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:410)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
at
org.apache.parquet.hadoop.ParquetFileWriter.(ParquetFileWriter.java:176)
at
org.apache.parquet.hadoop.ParquetFileWriter.(ParquetFileWriter.java:160)
at
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:289)
at
org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:262)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.(ParquetFileFormat.scala:562)
at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:139)
at
org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:131)
at
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:247)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

On Thu, Mar 16, 2017 at 2:55 PM, Everett Anderson <ever...@nuna.com> wrote:

> Hi,
>
> We're using Dataset union() in Spark 2.0.2 to concatenate a bunch of
> tables together and save as Parquet to S3, but it seems to take a long
> time. We're using the S3A FileSystem implementation under the covers, too,
> if that helps.
>
> Watching the Spark UI, the executors all eventually stop (we're using
> dynamic allocation) but under the SQL tab we can see a "save at
> NativeMethodAccessorImpl.java:-2" in Running Queries. The driver is still
> running of course, but it may take tens of minutes to finish. It makes me
> wonder if our data all being collected through the driver.
>
> If we instead convert the Datasets to RDDs and call SparkContext.union()
> it works quickly.
>
> Anyone know if this is a known issue?
>
>


Spark 2.0.2 Dataset union() slowness vs RDD union?

2017-03-16 Thread Everett Anderson
Hi,

We're using Dataset union() in Spark 2.0.2 to concatenate a bunch of tables
together and save as Parquet to S3, but it seems to take a long time. We're
using the S3A FileSystem implementation under the covers, too, if that
helps.

Watching the Spark UI, the executors all eventually stop (we're using
dynamic allocation) but under the SQL tab we can see a "save at
NativeMethodAccessorImpl.java:-2" in Running Queries. The driver is still
running of course, but it may take tens of minutes to finish. It makes me
wonder if our data all being collected through the driver.

If we instead convert the Datasets to RDDs and call SparkContext.union() it
works quickly.

Anyone know if this is a known issue?


Best way to assign a unique IDs to row groups

2017-03-01 Thread Everett Anderson
Hi,

I've used functions.monotonically_increasing_id() for assigning a unique ID
to all rows, but I'd like to assign a unique ID to each group of rows with
the same key.

The two ways I can think of to do this are

Option 1: Create separate group ID table and join back

   - Create a new data frame with the distinct values of the keys.
   - Add an ID column to it via monotonically_increasing_id.
   - Join this table back with the original to add the group ID. In this
   best case, this will be small enough to be a broadcast join.

Option 2: Add ID column / groupByKey / flatMapGroups

   - Add an ID column with monotonically_increasing_id
   - groupByKey
   - flatMapGroups and apply the first seen ID from the iterator to the
   other rows

Option 2 is a little annoying if you're dealing with Dataset[Row], as you
have to do a lot of work to get the fields out of the old Row objects and
create new ones.

Is there a better way?

Also, generally, while assigning a unique ID to all rows seems like a
commonly needed operation, there are comments in RDD.zipWithUniqueId as
well as monotonically_increasing_id that suggest these may not be
especially reliable in various cases. Do people hit those much?

Thanks!

- Everett


Re: Strange behavior with 'not' and filter pushdown

2017-02-14 Thread Everett Anderson
Wrapping this up -- fix is in 2.1.0 and has been backported to the 2.0.x
branch, as well.

On Mon, Feb 13, 2017 at 6:41 PM, Everett Anderson <ever...@nuna.com> wrote:

> Went ahead and opened
>
> https://issues.apache.org/jira/browse/SPARK-19586
>
> though I'd generally expect to just close it as fixed in 2.1.0 and roll on.
>
> On Sat, Feb 11, 2017 at 5:01 PM, Everett Anderson <ever...@nuna.com>
> wrote:
>
>> On the plus side, looks like this may be fixed in 2.1.0:
>>
>> == Physical Plan ==
>> *HashAggregate(keys=[], functions=[count(1)])
>> +- Exchange SinglePartition
>>+- *HashAggregate(keys=[], functions=[partial_count(1)])
>>   +- *Project
>>  +- *Filter NOT isnotnull(username#14)
>> +- *FileScan parquet [username#14] Batched: true, Format:
>> Parquet, Location: InMemoryFileIndex[file:/tmp/test_table],
>> PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))],
>> ReadSchema: struct
>>
>>
>>
>> On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson <ever...@nuna.com>
>> wrote:
>>
>>> Bumping this thread.
>>>
>>> Translating "where not(username is not null)" into a filter of  
>>> [IsNotNull(username),
>>> Not(IsNotNull(username))] seems like a rather severe bug.
>>>
>>> Spark 1.6.2:
>>>
>>> explain select count(*) from parquet_table where not( username is not
>>> null)
>>>
>>> == Physical Plan ==
>>> TungstenAggregate(key=[], 
>>> functions=[(count(1),mode=Final,isDistinct=false)],
>>> output=[_c0#1822L])
>>> +- TungstenExchange SinglePartition, None
>>>  +- TungstenAggregate(key=[], 
>>> functions=[(count(1),mode=Partial,isDistinct=false)],
>>> output=[count#1825L])
>>>  +- Project
>>>  +- Filter NOT isnotnull(username#1590)
>>>  +- Scan ParquetRelation[username#1590] InputPaths: ,
>>> PushedFilters: [Not(IsNotNull(username))]
>>>
>>> Spark 2.0.2
>>>
>>> explain select count(*) from parquet_table where not( username is not
>>> null)
>>>
>>> == Physical Plan ==
>>> *HashAggregate(keys=[], functions=[count(1)])
>>> +- Exchange SinglePartition
>>>  +- *HashAggregate(keys=[], functions=[partial_count(1)])
>>>  +- *Project
>>>  +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
>>>  +- *BatchedScan parquet default.[username#35] Format:
>>> ParquetFormat, InputPaths: , PartitionFilters: [],
>>> PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],
>>> ReadSchema: struct
>>>
>>> Example to generate the above:
>>>
>>> // Create some fake data
>>>
>>> import org.apache.spark.sql.Row
>>> import org.apache.spark.sql.Dataset
>>> import org.apache.spark.sql.types._
>>>
>>> val rowsRDD = sc.parallelize(Seq(
>>> Row(1, "fred"),
>>> Row(2, "amy"),
>>> Row(3, null)))
>>>
>>> val schema = StructType(Seq(
>>> StructField("id", IntegerType, nullable = true),
>>> StructField("username", StringType, nullable = true)))
>>>
>>> val data = sqlContext.createDataFrame(rowsRDD, schema)
>>>
>>> val path = "SOME PATH HERE"
>>>
>>> data.write.mode("overwrite").parquet(path)
>>>
>>> val testData = sqlContext.read.parquet(path)
>>>
>>> testData.registerTempTable("filter_test_table")
>>>
>>>
>>> %sql
>>> explain select count(*) from filter_test_table where not( username is
>>> not null)
>>>
>>>
>>> On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas <
>>> akosti...@nuna.com.invalid> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have an application where I’m filtering data with SparkSQL with
>>>> simple WHERE clauses. I also want the ability to show the unmatched rows
>>>> for any filter, and so am wrapping the previous clause in `NOT()` to get
>>>> the inverse. Example:
>>>>
>>>> Filter:  username is not null
>>>> Inverse filter:  NOT(username is not null)
>>>>
>>>> This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the
>>>> inverse filter always returns zero results. It looks like this is a problem
>>>> with how the filter is getting pushed down to Parquet. Specifically, the
>>>> 

Re: Strange behavior with 'not' and filter pushdown

2017-02-13 Thread Everett Anderson
Went ahead and opened

https://issues.apache.org/jira/browse/SPARK-19586

though I'd generally expect to just close it as fixed in 2.1.0 and roll on.

On Sat, Feb 11, 2017 at 5:01 PM, Everett Anderson <ever...@nuna.com> wrote:

> On the plus side, looks like this may be fixed in 2.1.0:
>
> == Physical Plan ==
> *HashAggregate(keys=[], functions=[count(1)])
> +- Exchange SinglePartition
>+- *HashAggregate(keys=[], functions=[partial_count(1)])
>   +- *Project
>  +- *Filter NOT isnotnull(username#14)
> +- *FileScan parquet [username#14] Batched: true, Format:
> Parquet, Location: InMemoryFileIndex[file:/tmp/test_table],
> PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))],
> ReadSchema: struct
>
>
>
> On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson <ever...@nuna.com>
> wrote:
>
>> Bumping this thread.
>>
>> Translating "where not(username is not null)" into a filter of  
>> [IsNotNull(username),
>> Not(IsNotNull(username))] seems like a rather severe bug.
>>
>> Spark 1.6.2:
>>
>> explain select count(*) from parquet_table where not( username is not
>> null)
>>
>> == Physical Plan ==
>> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)],
>> output=[_c0#1822L])
>> +- TungstenExchange SinglePartition, None
>>  +- TungstenAggregate(key=[], 
>> functions=[(count(1),mode=Partial,isDistinct=false)],
>> output=[count#1825L])
>>  +- Project
>>  +- Filter NOT isnotnull(username#1590)
>>  +- Scan ParquetRelation[username#1590] InputPaths: ,
>> PushedFilters: [Not(IsNotNull(username))]
>>
>> Spark 2.0.2
>>
>> explain select count(*) from parquet_table where not( username is not
>> null)
>>
>> == Physical Plan ==
>> *HashAggregate(keys=[], functions=[count(1)])
>> +- Exchange SinglePartition
>>  +- *HashAggregate(keys=[], functions=[partial_count(1)])
>>  +- *Project
>>  +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
>>  +- *BatchedScan parquet default.[username#35] Format:
>> ParquetFormat, InputPaths: , PartitionFilters: [],
>> PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],
>> ReadSchema: struct
>>
>> Example to generate the above:
>>
>> // Create some fake data
>>
>> import org.apache.spark.sql.Row
>> import org.apache.spark.sql.Dataset
>> import org.apache.spark.sql.types._
>>
>> val rowsRDD = sc.parallelize(Seq(
>> Row(1, "fred"),
>> Row(2, "amy"),
>> Row(3, null)))
>>
>> val schema = StructType(Seq(
>> StructField("id", IntegerType, nullable = true),
>> StructField("username", StringType, nullable = true)))
>>
>> val data = sqlContext.createDataFrame(rowsRDD, schema)
>>
>> val path = "SOME PATH HERE"
>>
>> data.write.mode("overwrite").parquet(path)
>>
>> val testData = sqlContext.read.parquet(path)
>>
>> testData.registerTempTable("filter_test_table")
>>
>>
>> %sql
>> explain select count(*) from filter_test_table where not( username is not
>> null)
>>
>>
>> On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas <
>> akosti...@nuna.com.invalid> wrote:
>>
>>> Hi,
>>>
>>> I have an application where I’m filtering data with SparkSQL with simple
>>> WHERE clauses. I also want the ability to show the unmatched rows for any
>>> filter, and so am wrapping the previous clause in `NOT()` to get the
>>> inverse. Example:
>>>
>>> Filter:  username is not null
>>> Inverse filter:  NOT(username is not null)
>>>
>>> This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the
>>> inverse filter always returns zero results. It looks like this is a problem
>>> with how the filter is getting pushed down to Parquet. Specifically, the
>>> pushdown includes both the “is not null” filter, AND “not(is not null)”,
>>> which would obviously result in zero matches. An example below:
>>>
>>> pyspark:
>>> > x = spark.sql('select my_id from my_table where *username is not null*
>>> ')
>>> > y = spark.sql('select my_id from my_table where not(*username is not
>>> null*)')
>>>
>>> > x.explain()
>>> == Physical Plan ==
>>> *Project [my_id#6L]
>>> +- *Filter isnotnull(username#91)
>>>+- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>>Format: 

Re: Strange behavior with 'not' and filter pushdown

2017-02-11 Thread Everett Anderson
On the plus side, looks like this may be fixed in 2.1.0:

== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[partial_count(1)])
  +- *Project
 +- *Filter NOT isnotnull(username#14)
+- *FileScan parquet [username#14] Batched: true, Format:
Parquet, Location: InMemoryFileIndex[file:/tmp/test_table],
PartitionFilters: [], PushedFilters: [Not(IsNotNull(username))],
ReadSchema: struct



On Fri, Feb 10, 2017 at 11:26 AM, Everett Anderson <ever...@nuna.com> wrote:

> Bumping this thread.
>
> Translating "where not(username is not null)" into a filter of  
> [IsNotNull(username),
> Not(IsNotNull(username))] seems like a rather severe bug.
>
> Spark 1.6.2:
>
> explain select count(*) from parquet_table where not( username is not null)
>
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)],
> output=[_c0#1822L])
> +- TungstenExchange SinglePartition, None
>  +- TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)],
> output=[count#1825L])
>  +- Project
>  +- Filter NOT isnotnull(username#1590)
>  +- Scan ParquetRelation[username#1590] InputPaths: ,
> PushedFilters: [Not(IsNotNull(username))]
>
> Spark 2.0.2
>
> explain select count(*) from parquet_table where not( username is not null)
>
> == Physical Plan ==
> *HashAggregate(keys=[], functions=[count(1)])
> +- Exchange SinglePartition
>  +- *HashAggregate(keys=[], functions=[partial_count(1)])
>  +- *Project
>  +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
>  +- *BatchedScan parquet default.[username#35] Format:
> ParquetFormat, InputPaths: , PartitionFilters: [],
> PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],
> ReadSchema: struct
>
> Example to generate the above:
>
> // Create some fake data
>
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.Dataset
> import org.apache.spark.sql.types._
>
> val rowsRDD = sc.parallelize(Seq(
> Row(1, "fred"),
> Row(2, "amy"),
> Row(3, null)))
>
> val schema = StructType(Seq(
> StructField("id", IntegerType, nullable = true),
> StructField("username", StringType, nullable = true)))
>
> val data = sqlContext.createDataFrame(rowsRDD, schema)
>
> val path = "SOME PATH HERE"
>
> data.write.mode("overwrite").parquet(path)
>
> val testData = sqlContext.read.parquet(path)
>
> testData.registerTempTable("filter_test_table")
>
>
> %sql
> explain select count(*) from filter_test_table where not( username is not
> null)
>
>
> On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas <akosti...@nuna.com.invalid
> > wrote:
>
>> Hi,
>>
>> I have an application where I’m filtering data with SparkSQL with simple
>> WHERE clauses. I also want the ability to show the unmatched rows for any
>> filter, and so am wrapping the previous clause in `NOT()` to get the
>> inverse. Example:
>>
>> Filter:  username is not null
>> Inverse filter:  NOT(username is not null)
>>
>> This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the
>> inverse filter always returns zero results. It looks like this is a problem
>> with how the filter is getting pushed down to Parquet. Specifically, the
>> pushdown includes both the “is not null” filter, AND “not(is not null)”,
>> which would obviously result in zero matches. An example below:
>>
>> pyspark:
>> > x = spark.sql('select my_id from my_table where *username is not null*
>> ')
>> > y = spark.sql('select my_id from my_table where not(*username is not
>> null*)')
>>
>> > x.explain()
>> == Physical Plan ==
>> *Project [my_id#6L]
>> +- *Filter isnotnull(username#91)
>>+- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>PartitionFilters: [], PushedFilters: [IsNotNull(username)],
>>ReadSchema: struct<my_id:bigint,username:string>
>> [1159]> y.explain()
>> == Physical Plan ==
>> *Project [my_id#6L]
>> +- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))username
>>+- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>>Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>>PartitionFilters: [],
>>PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],user
>> name
>>ReadSchema: struct<my_id:bigint,username:string>
>>
>> Presently I’m working around this by using the new functionality of NOT
>> EXISTS in Spark 2, but that seems like overkill.
>>
>> Any help appreciated.
>>
>>
>> *Alexi Kostibas*Engineering
>> *Nuna*
>> 650 Townsend Street, Suite 425
>> San Francisco, CA 94103
>>
>>
>


Re: Strange behavior with 'not' and filter pushdown

2017-02-10 Thread Everett Anderson
Bumping this thread.

Translating "where not(username is not null)" into a filter of
[IsNotNull(username),
Not(IsNotNull(username))] seems like a rather severe bug.

Spark 1.6.2:

explain select count(*) from parquet_table where not( username is not null)

== Physical Plan ==
TungstenAggregate(key=[],
functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#1822L])
+- TungstenExchange SinglePartition, None
 +- TungstenAggregate(key=[],
functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#1825L])
 +- Project
 +- Filter NOT isnotnull(username#1590)
 +- Scan ParquetRelation[username#1590] InputPaths: ,
PushedFilters: [Not(IsNotNull(username))]

Spark 2.0.2

explain select count(*) from parquet_table where not( username is not null)

== Physical Plan ==
*HashAggregate(keys=[], functions=[count(1)])
+- Exchange SinglePartition
 +- *HashAggregate(keys=[], functions=[partial_count(1)])
 +- *Project
 +- *Filter (isnotnull(username#35) && NOT isnotnull(username#35))
 +- *BatchedScan parquet default.[username#35] Format:
ParquetFormat, InputPaths: , PartitionFilters: [],
PushedFilters: [IsNotNull(username), Not(IsNotNull(username))], ReadSchema:
struct

Example to generate the above:

// Create some fake data

import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.types._

val rowsRDD = sc.parallelize(Seq(
Row(1, "fred"),
Row(2, "amy"),
Row(3, null)))

val schema = StructType(Seq(
StructField("id", IntegerType, nullable = true),
StructField("username", StringType, nullable = true)))

val data = sqlContext.createDataFrame(rowsRDD, schema)

val path = "SOME PATH HERE"

data.write.mode("overwrite").parquet(path)

val testData = sqlContext.read.parquet(path)

testData.registerTempTable("filter_test_table")


%sql
explain select count(*) from filter_test_table where not( username is not
null)


On Wed, Feb 8, 2017 at 4:56 PM, Alexi Kostibas 
wrote:

> Hi,
>
> I have an application where I’m filtering data with SparkSQL with simple
> WHERE clauses. I also want the ability to show the unmatched rows for any
> filter, and so am wrapping the previous clause in `NOT()` to get the
> inverse. Example:
>
> Filter:  username is not null
> Inverse filter:  NOT(username is not null)
>
> This worked fine in Spark 1.6. After upgrading to Spark 2.0.2, the inverse
> filter always returns zero results. It looks like this is a problem with
> how the filter is getting pushed down to Parquet. Specifically, the
> pushdown includes both the “is not null” filter, AND “not(is not null)”,
> which would obviously result in zero matches. An example below:
>
> pyspark:
> > x = spark.sql('select my_id from my_table where *username is not null*')
> > y = spark.sql('select my_id from my_table where not(*username is not
> null*)')
>
> > x.explain()
> == Physical Plan ==
> *Project [my_id#6L]
> +- *Filter isnotnull(username#91)
>+- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>PartitionFilters: [], PushedFilters: [IsNotNull(username)],
>ReadSchema: struct
> [1159]> y.explain()
> == Physical Plan ==
> *Project [my_id#6L]
> +- *Filter (isnotnull(username#91) && NOT isnotnull(username#91))username
>+- *BatchedScan parquet default.my_table[my_id#6L,username#91]
>Format: ParquetFormat, InputPaths: s3://my-path/my.parquet,
>PartitionFilters: [],
>PushedFilters: [IsNotNull(username), Not(IsNotNull(username))],
> username
>ReadSchema: struct
>
> Presently I’m working around this by using the new functionality of NOT
> EXISTS in Spark 2, but that seems like overkill.
>
> Any help appreciated.
>
>
> *Alexi Kostibas*Engineering
> *Nuna*
> 650 Townsend Street, Suite 425
> San Francisco, CA 94103
>
>


Re: Un-exploding / denormalizing Spark SQL help

2017-02-08 Thread Everett Anderson
On Wed, Feb 8, 2017 at 1:14 PM, ayan guha <guha.a...@gmail.com> wrote:

> Will a sql solution will be acceptable?
>

I'm very curious to see how it'd be done in raw SQL if you're up for it!

I think the 2 programmatic solutions so far are viable, though, too.

(By the way, thanks everyone for the great suggestions!)





>
> On Thu, 9 Feb 2017 at 4:01 am, Xiaomeng Wan <shawn...@gmail.com> wrote:
>
>> You could also try pivot.
>>
>> On 7 February 2017 at 16:13, Everett Anderson <ever...@nuna.com.invalid>
>> wrote:
>>
>>
>>
>> On Tue, Feb 7, 2017 at 2:21 PM, Michael Armbrust <mich...@databricks.com>
>> wrote:
>>
>> I think the fastest way is likely to use a combination of conditionals
>> (when / otherwise), first (ignoring nulls), while grouping by the id.
>> This should get the answer with only a single shuffle.
>>
>> Here is an example
>> <https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/1023043053387187/3908422850525880/2840265927289860/latest.html>
>> .
>>
>>
>> Very cool! Using the simpler aggregates feels cleaner.
>>
>>
>>
>> On Tue, Feb 7, 2017 at 5:07 PM, Jacek Laskowski <ja...@japila.pl> wrote:
>>
>> Hi Everett,
>>
>> That's pretty much what I'd do. Can't think of a way to beat your
>> solution. Why do you "feel vaguely uneasy about it"?
>>
>>
>> Maybe it felt like I was unnecessarily grouping-by twice, but probably
>> mostly that I hadn't used pivot before.
>>
>> Interestingly, the physical plans are not especially different between
>> these two solutions after the rank column is added. They both have two
>> SortAggregates that seem to be figuring out where to put results based on
>> the rank:
>>
>> My original one:
>>
>> == Physical Plan ==
>> *Project [id#279, name#280, 1#372.extra AS extra1#409, 1#372.data AS
>> data1#435, 1#372.priority AS priority1#462, 2#374.extra AS extra2#490,
>> 2#374.data AS data2#519, 2#374.priority AS priority2#549, 3#376.extra AS
>> extra3#580, 3#376.data AS data3#612, 3#376.priority AS priority3#645]
>> +- SortAggregate(key=[id#279,name#280], functions=[first(if
>> ((cast(rank#292 as double) = 1.0)) temp_struct#312 else null,
>> true),first(if ((cast(rank#292 as double) = 2.0)) temp_struct#312 else
>> null, true),first(if ((cast(rank#292 as double) = 3.0)) temp_struct#312
>> else null, true)])
>>+- SortAggregate(key=[id#279,name#280], functions=[partial_first(if
>> ((cast(rank#292 as double) = 1.0)) temp_struct#312 else null,
>> true),partial_first(if ((cast(rank#292 as double) = 2.0)) temp_struct#312
>> else null, true),partial_first(if ((cast(rank#292 as double) = 3.0))
>> temp_struct#312 else null, true)])
>>   +- *Project [id#279, name#280, rank#292, struct(extra#281,
>> data#282, priority#283) AS temp_struct#312]
>>  +- Window [denserank(priority#283) windowspecdefinition(id#279,
>> name#280, priority#283 ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT
>> ROW) AS rank#292], [id#279, name#280], [priority#283 ASC]
>> +- *Sort [id#279 ASC, name#280 ASC, priority#283 ASC], false,
>> 0
>>+- Exchange hashpartitioning(id#279, name#280, 200)
>>   +- Scan ExistingRDD[id#279,name#280,
>> extra#281,data#282,priority#283]
>>
>>
>> And modifying Michael's slightly to use a rank:
>>
>> import org.apache.spark.sql.functions._
>>
>> def getColumnWithRank(column: String, rank: Int) = {
>>   first(when(col("rank") === lit(rank), col(column)).otherwise(null),
>> ignoreNulls = true)
>> }
>>
>> val withRankColumn = data.withColumn("rank", 
>> functions.dense_rank().over(Window.partitionBy("id",
>> "name").orderBy("priority")))
>>
>> val modCollapsed = withRankColumn
>>   .groupBy($"id", $"name")
>>   .agg(
>> getColumnWithRank("data", 1) as 'data1,
>> getColumnWithRank("data", 2) as 'data2,
>> getColumnWithRank("data", 3) as 'data3,
>> getColumnWithRank("extra", 1) as 'extra1,
>> getColumnWithRank("extra", 2) as 'extra2,
>> getColumnWithRank("extra", 3) as 'extra3)
>>
>>
>> modCollapsed.explain
>>
>> == Physical Plan ==
>> SortAggregate(key=[id#279,name#280], functions=[first(CASE WHEN
>> (rank#965 = 1) THEN data#282 ELSE null END, true),first(CASE WHEN (rank#965
>> = 2) THEN data#282 

Re: Un-exploding / denormalizing Spark SQL help

2017-02-07 Thread Everett Anderson
ck out the execution plan (with explain) to see how it's
>> gonna work at runtime. I may have seen groupBy + join be better than
>> window (there were more exchanges in play for windows I reckon).
>>
>> Pozdrawiam,
>> Jacek Laskowski
>> 
>> https://medium.com/@jaceklaskowski/
>> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
>> Follow me at https://twitter.com/jaceklaskowski
>>
>>
>> On Tue, Feb 7, 2017 at 10:54 PM, Everett Anderson <ever...@nuna.com>
>> wrote:
>> >
>> >
>> > On Tue, Feb 7, 2017 at 12:50 PM, Jacek Laskowski <ja...@japila.pl>
>> wrote:
>> >>
>> >> Hi,
>> >>
>> >> Could groupBy and withColumn or UDAF work perhaps? I think window could
>> >> help here too.
>> >
>> >
>> > This seems to work, but I do feel vaguely uneasy about it. :)
>> >
>> > // First add a 'rank' column which is priority order just in case
>> priorities
>> > aren't
>> > // from 1 with no gaps.
>> > val temp1 = data.withColumn("rank", functions.dense_rank()
>> >.over(Window.partitionBy("id", "name").orderBy("priority")))
>> >
>> > +---++-+--+++
>> > | id|name|extra|  data|priority|rank|
>> > +---++-+--+++
>> > |  1|Fred|8|value1|   1|   1|
>> > |  1|Fred|8|value8|   2|   2|
>> > |  1|Fred|8|value5|   3|   3|
>> > |  2| Amy|9|value3|   1|   1|
>> > |  2| Amy|9|value5|   2|   2|
>> > +---++-+--+++
>> >
>> > // Now move all the columns we want to denormalize into a struct column
>> to
>> > keep them together.
>> > val temp2 = temp1.withColumn("temp_struct", struct(temp1("extra"),
>> > temp1("data"), temp1("priority")))
>> >   .drop("extra", "data", "priority")
>> >
>> > +---++++
>> > | id|name|rank| temp_struct|
>> > +---++++
>> > |  1|Fred|   1|[8,value1,1]|
>> > |  1|Fred|   2|[8,value8,2]|
>> > |  1|Fred|   3|[8,value5,3]|
>> > |  2| Amy|   1|[9,value3,1]|
>> > |  2| Amy|   2|[9,value5,2]|
>> > +---++++
>> >
>> > // groupBy, again, but now pivot the rank column. We need an aggregate
>> > function after pivot,
>> > // so use first -- there will only ever be one element.
>> > val temp3 = temp2.groupBy("id", "name")
>> >   .pivot("rank", Seq("1", "2", "3"))
>> >   .agg(functions.first("temp_struct"))
>> >
>> > +---+++++
>> > | id|name|   1|   2|   3|
>> > +---+++++
>> > |  1|Fred|[8,value1,1]|[8,value8,2]|[8,value5,3]|
>> > |  2| Amy|[9,value3,1]|[9,value5,2]|null|
>> > +---+++++
>> >
>> > // Now just moving things out of the structs and clean up.
>> > val output = temp3.withColumn("extra1", temp3("1").getField("extra"))
>> >  .withColumn("data1", temp3("1").getField("data"))
>> >  .withColumn("priority1", temp3("1").getField("priority"))
>> >  .withColumn("extra2", temp3("2").getField("extra"))
>> >  .withColumn("data2", temp3("2").getField("data"))
>> >  .withColumn("priority2", temp3("2").getField("priority"))
>> >  .withColumn("extra3", temp3("3").getField("extra"))
>> >  .withColumn("data3", temp3("3").getField("data"))
>> >  .withColumn("priority3", temp3("3").getField("priority"))
>> >  .drop("1", "2", "3")
>> >
>> > +---++--+--+-+--+--+-+--
>> +--+-+
>> > | id|name|extra1| data1|priority1|extra2| data2|priority2|extra3|
>> > data3|priority3|
>> > +---++--+--+-+--+--+-+--
>> +--+-+
>> > |  1|Fred| 8|value1|1| 8|value8|2| 8|value5|
>> > 3|

Re: Un-exploding / denormalizing Spark SQL help

2017-02-07 Thread Everett Anderson
On Tue, Feb 7, 2017 at 12:50 PM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> Could groupBy and withColumn or UDAF work perhaps? I think window could
> help here too.
>

This seems to work, but I do feel vaguely uneasy about it. :)

// First add a 'rank' column which is priority order just in case
priorities aren't
// from 1 with no gaps.
val temp1 = data.withColumn("rank", functions.dense_rank()
   .over(Window.partitionBy("id", "name").orderBy("priority")))

+---++-+--+++
| id|name|extra|  data|priority|rank|
+---++-+--+++
|  1|Fred|8|value1|   1|   1|
|  1|Fred|8|value8|   2|   2|
|  1|Fred|8|value5|   3|   3|
|  2| Amy|9|value3|   1|   1|
|  2| Amy|9|value5|   2|   2|
+---++-+--+++

// Now move all the columns we want to denormalize into a struct column to
keep them together.
val temp2 = temp1.withColumn("temp_struct", struct(temp1("extra"),
temp1("data"), temp1("priority")))
  .drop("extra", "data", "priority")

+---++++
| id|name|rank| temp_struct|
+---++++
|  1|Fred|   1|[8,value1,1]|
|  1|Fred|   2|[8,value8,2]|
|  1|Fred|   3|[8,value5,3]|
|  2| Amy|   1|[9,value3,1]|
|  2| Amy|   2|[9,value5,2]|
+---++++

// groupBy, again, but now pivot the rank column. We need an aggregate
function after pivot,
// so use first -- there will only ever be one element.
val temp3 = temp2.groupBy("id", "name")
  .pivot("rank", Seq("1", "2", "3"))
  .agg(functions.first("temp_struct"))

+---+++++
| id|name|   1|   2|   3|
+---+++++
|  1|Fred|[8,value1,1]|[8,value8,2]|[8,value5,3]|
|  2| Amy|[9,value3,1]|[9,value5,2]|null|
+---+++++

// Now just moving things out of the structs and clean up.
val output = temp3.withColumn("extra1", temp3("1").getField("extra"))
 .withColumn("data1", temp3("1").getField("data"))
 .withColumn("priority1", temp3("1").getField("priority"))
 .withColumn("extra2", temp3("2").getField("extra"))
 .withColumn("data2", temp3("2").getField("data"))
 .withColumn("priority2", temp3("2").getField("priority"))
 .withColumn("extra3", temp3("3").getField("extra"))
 .withColumn("data3", temp3("3").getField("data"))
 .withColumn("priority3", temp3("3").getField("priority"))
 .drop("1", "2", "3")

+---++--+--+-+--+--+-+--+--+-+
| id|name|extra1| data1|priority1|extra2| data2|priority2|extra3|
data3|priority3|
+---++--+--+-+--+--+-+--+--+-+
|  1|Fred| 8|value1|1| 8|value8|2| 8|value5|
 3|
|  2| Amy| 9|value3|1| 9|value5|2|  null|  null|
  null|
+---++--+--+-+--+--+-+--+--+-+








>
> Jacek
>
> On 7 Feb 2017 8:02 p.m., "Everett Anderson" <ever...@nuna.com.invalid>
> wrote:
>
>> Hi,
>>
>> I'm trying to un-explode or denormalize a table like
>>
>> +---++-+--++
>> |id |name|extra|data  |priority|
>> +---++-+--++
>> |1  |Fred|8|value1|1   |
>> |1  |Fred|8|value8|2   |
>> |1  |Fred|8|value5|3   |
>> |2  |Amy |9|value3|1   |
>> |2  |Amy |9|value5|2   |
>> +---++-+--++
>>
>> into something that looks like
>>
>> +---++--+--+-+--+--+-+--
>> +--+-+
>> |id |name|extra1|data1 |priority1|extra2|data2 |priority2|extra3|data3
>> |priority3|
>> +---++--+--+-+--+--+-+--
>> +--+-+
>> |1  |Fred|8 |value1|1|8 |value8|2|8 |value5|3
>>|
>> |2  |Amy |9 |value3|1|9 |value5|2|null  |null
>>  |null |
>> +---++--+--+-+--+--+-+--
>> +--+-+
>>
>> If I were going the other direction, I'd create a new column with an
>> array of structs, each with 'extra', 'data', and 'priority' fields and then
>> explode it.
>>
>

Un-exploding / denormalizing Spark SQL help

2017-02-07 Thread Everett Anderson
Hi,

I'm trying to un-explode or denormalize a table like

+---++-+--++
|id |name|extra|data  |priority|
+---++-+--++
|1  |Fred|8|value1|1   |
|1  |Fred|8|value8|2   |
|1  |Fred|8|value5|3   |
|2  |Amy |9|value3|1   |
|2  |Amy |9|value5|2   |
+---++-+--++

into something that looks like

+---++--+--+-+--+--+-+--+--+-+
|id |name|extra1|data1 |priority1|extra2|data2 |priority2|extra3|data3
|priority3|
+---++--+--+-+--+--+-+--+--+-+
|1  |Fred|8 |value1|1|8 |value8|2|8 |value5|3
 |
|2  |Amy |9 |value3|1|9 |value5|2|null  |null
 |null |
+---++--+--+-+--+--+-+--+--+-+

If I were going the other direction, I'd create a new column with an array
of structs, each with 'extra', 'data', and 'priority' fields and then
explode it.

Going from the more normalized view, though, I'm having a harder time.

I want to group or partition by (id, name) and order by priority, but after
that I can't figure out how to get multiple rows rotated into one.

Any ideas?

Here's the code to create the input table above:

import org.apache.spark.sql.Row
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.types._

val rowsRDD = sc.parallelize(Seq(
Row(1, "Fred", 8, "value1", 1),
Row(1, "Fred", 8, "value8", 2),
Row(1, "Fred", 8, "value5", 3),
Row(2, "Amy", 9, "value3", 1),
Row(2, "Amy", 9, "value5", 2)))

val schema = StructType(Seq(
StructField("id", IntegerType, nullable = true),
StructField("name", StringType, nullable = true),
StructField("extra", IntegerType, nullable = true),
StructField("data", StringType, nullable = true),
StructField("priority", IntegerType, nullable = true)))

val data = sqlContext.createDataFrame(rowsRDD, schema)


Re: Running Spark on EMR

2017-01-16 Thread Everett Anderson
On Sun, Jan 15, 2017 at 11:09 AM, Andrew Holway <
andrew.hol...@otternetworks.de> wrote:

> use yarn :)
>
> "spark-submit --master yarn"
>

Doesn't this require first copying out various Hadoop configuration XML
files from the EMR master node to the machine running the spark-submit? Or
is there a well-known minimal set of host/port options to avoid that?

I'm currently copying out several XML files and using them on a client
running spark-submit, but I feel uneasy about this as it seems like the
local values override values on the cluster at runtime -- they're copied up
with the job.




>
>
> On Sun, Jan 15, 2017 at 7:55 PM, Darren Govoni 
> wrote:
>
>> So what was the answer?
>>
>>
>>
>> Sent from my Verizon, Samsung Galaxy smartphone
>>
>>  Original message 
>> From: Andrew Holway 
>> Date: 1/15/17 11:37 AM (GMT-05:00)
>> To: Marco Mistroni 
>> Cc: Neil Jonkers , User 
>> Subject: Re: Running Spark on EMR
>>
>> Darn. I didn't respond to the list. Sorry.
>>
>>
>>
>> On Sun, Jan 15, 2017 at 5:29 PM, Marco Mistroni 
>> wrote:
>>
>>> thanks Neil. I followed original suggestion from Andrw and everything is
>>> working fine now
>>> kr
>>>
>>> On Sun, Jan 15, 2017 at 4:27 PM, Neil Jonkers 
>>> wrote:
>>>
 Hello,

 Can you drop the url:

  spark://master:7077

 The url is used when running Spark in standalone mode.

 Regards


  Original message 
 From: Marco Mistroni
 Date:15/01/2017 16:34 (GMT+02:00)
 To: User
 Subject: Running Spark on EMR

 hi all
  could anyone assist here?
 i am trying to run spark 2.0.0 on an EMR cluster,but i am having issues
 connecting to the master node
 So, below is a snippet of what i am doing


 sc = SparkSession.builder.master(sparkHost).appName("DataProcess"
 ).getOrCreate()

 sparkHost is passed as input parameter. that was thought so that i can
 run the script locally
 on my spark local instance as well as submitting scripts on any cluster
 i want


 Now i have
 1 - setup a cluster on EMR.
 2 - connected to masternode
 3  - launch the command spark-submit myscripts.py spark://master:7077

 But that results in an connection refused exception
 Then i have tried to remove the .master call above and launch the
 script with the following command

 spark-submit --master spark://master:7077   myscript.py  but still i
 am getting
 connectionREfused exception


 I am using Spark 2.0.0 , could anyone advise on how shall i build the
 spark session and how can i submit a pythjon script to the cluster?

 kr
  marco

>>>
>>>
>>
>>
>> --
>> Otter Networks UG
>> http://otternetworks.de
>> Gotenstraße 17
>> 10829 Berlin
>>
>
>
>
> --
> Otter Networks UG
> http://otternetworks.de
> Gotenstraße 17
> 10829 Berlin
>


Re: Writing DataFrame filter results to separate files

2016-12-06 Thread Everett Anderson
On Mon, Dec 5, 2016 at 5:33 PM, Michael Armbrust 
wrote:

> 1. In my case, I'd need to first explode my data by ~12x to assign each
>> record to multiple 12-month rolling output windows. I'm not sure Spark SQL
>> would be able to optimize this away, combining it with the output writing
>> to do it incrementally.
>>
>
> You are right, but I wouldn't worry about the RAM use.  If implemented
> properly (or if you just use the builtin window
> 
> function), it should all be pipelined.
>


Very cool! Will give it a go. I'm still on Spark 1.6.x so hadn't seen that
function, either!


>
>
>> 2. Wouldn't each partition -- window in my case -- be shuffled to a
>> single machine and then written together as one output shard? For a large
>> amount of data per window, that seems less than ideal.
>>
>
> Oh sorry, I thought you wanted one file per value.  If you drop the
> repartition then it won't shuffle, but will just write in parallel on
> each machine.
>

Thanks!


Re: Writing DataFrame filter results to separate files

2016-12-05 Thread Everett Anderson
Hi,

Thanks for the reply!

On Mon, Dec 5, 2016 at 1:30 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> If you repartition($"column") and then do .write.partitionBy("column") you
> should end up with a single file for each value of the partition column.
>

I have two concerns there:

1. In my case, I'd need to first explode my data by ~12x to assign each
record to multiple 12-month rolling output windows. I'm not sure Spark SQL
would be able to optimize this away, combining it with the output writing
to do it incrementally.

2. Wouldn't each partition -- window in my case -- be shuffled to a single
machine and then written together as one output shard? For a large amount
of data per window, that seems less than ideal.


>
> On Mon, Dec 5, 2016 at 10:59 AM, Everett Anderson <
> ever...@nuna.com.invalid> wrote:
>
>> Hi,
>>
>> I have a DataFrame of records with dates, and I'd like to write all
>> 12-month (with overlap) windows to separate outputs.
>>
>> Currently, I have a loop equivalent to:
>>
>> for ((windowStart, windowEnd) <- windows) {
>> val windowData = allData.filter(
>> getFilterCriteria(windowStart, windowEnd))
>> windowData.write.format(...).save(...)
>> }
>>
>> This works fine, but has the drawback that since Spark doesn't
>> parallelize the writes, there is a fairly cost based on the number of
>> windows.
>>
>> Is there a way around this?
>>
>> In MapReduce, I'd probably multiply the data in a Mapper with a window ID
>> and then maybe use something like MultipleOutputs
>> <https://hadoop.apache.org/docs/r2.7.3/api/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html>.
>> But I'm a bit worried of trying to do this in Spark because of the data
>> explosion and RAM use. What's the best approach?
>>
>> Thanks!
>>
>> - Everett
>>
>>
>


Writing DataFrame filter results to separate files

2016-12-05 Thread Everett Anderson
Hi,

I have a DataFrame of records with dates, and I'd like to write all
12-month (with overlap) windows to separate outputs.

Currently, I have a loop equivalent to:

for ((windowStart, windowEnd) <- windows) {
val windowData = allData.filter(
getFilterCriteria(windowStart, windowEnd))
windowData.write.format(...).save(...)
}

This works fine, but has the drawback that since Spark doesn't parallelize
the writes, there is a fairly cost based on the number of windows.

Is there a way around this?

In MapReduce, I'd probably multiply the data in a Mapper with a window ID
and then maybe use something like MultipleOutputs
.
But I'm a bit worried of trying to do this in Spark because of the data
explosion and RAM use. What's the best approach?

Thanks!

- Everett


Modifying Metadata in StructType schemas

2016-10-24 Thread Everett Anderson
Hi,

I've been using the immutable Metadata within the StructType of a
DataFrame/Dataset to track application-level column lineage.

However, since it's immutable, the only way to modify it is to do a full
trip of

   1. Convert DataFrame/Dataset to Row RDD
   2. Create new, modified Metadata per column from the old
   3. Create a new StructType with the modified metadata
   4. Convert the Row RDD + StructType schema to a DataFrame/Dataset

It looks like conversion to/from an RDD might involve real work, even
though in this case the data itself isn't modified at all.

Is there a better way to do this?

Thanks!


Equivalent to --files for driver?

2016-09-21 Thread Everett Anderson
Hi,

I'm running Spark 1.6.2 on YARN and I often use the cluster deploy mode
with spark-submit. While the --files param is useful for getting files onto
the cluster in the working directories of the executors, the driver's
working directory doesn't get them.

Is there some equivalent to --files for the driver program when run in this
mode?


Re: S3A + EMR failure when writing Parquet?

2016-09-04 Thread Everett Anderson
Hey,

Thanks for the reply and sorry for the late response!

I haven't been able to figure out the root cause, but I have been able to
get things working if both the cluster and the remote submitter use S3A
instead of EMRFS for all s3:// interactions, so I'm going with that, for
now.

My impression from reading your various other replies on S3A is that it's
also best to use mapreduce.fileoutputcommitter.algorithm.version=2 (which might
someday be the default
<https://issues.apache.org/jira/browse/MAPREDUCE-6336>) and, presumably if
your data fits well in memory, use fs.s3a.fast.upload=true. Is that right?



On Tue, Aug 30, 2016 at 11:49 AM, Steve Loughran <ste...@hortonworks.com>
wrote:

>
> On 29 Aug 2016, at 18:18, Everett Anderson <ever...@nuna.com.INVALID
> <ever...@nuna.com.invalid>> wrote:
>
> Okay, I don't think it's really just S3A issue, anymore. I can run the job
> using fs.s3.impl/spark.hadoop.fs.s3.impl set to the S3A impl as a --conf
> param from the EMR console successfully, as well.
>
> The problem seems related to the fact that we're trying to spark-submit
> jobs to a YARN cluster from outside the cluster itself.
>
> The docs <https://spark.apache.org/docs/1.6.2/running-on-yarn.html>
> suggest one must copy the Hadoop/YARN config XML outside of the cluster to
> do this, which feels gross, but it's what we did. We had changed fs.s3.impl
> to use S3A in that config, and that seems to result in the failure, though
> I still can't figure out why.
>
> Interestingly, if I don't make that change to the XML, and leave it as the
> EMRFS implementation, it will work, as long as I use s3a:// URIs for the
> jar, otherwise spark-submit won't be able to ship them to the cluster since
> it won't have the EMRFS implementation locally.
>
>
> I see: you are trying to use EMR's "special" S3 in-cluster, but
> spark-submit is trying to submit remotely.
>
> 1.  Trying to change the value of fs.s3.impl to S3a works for upload, but
> not runtime
> 2. use s3a for the upload, leave things alone and all works.
>
> I would just go with S3a, this is just the JARs being discussed here right
> —not the actual data?
>
> When the JARs are needed, they'll be copied on EMR using the amazon S3A
> implementation —whatever they've done there— to the local filesystem, where
> classloaders can pick them up and use. It might be that s3a:// URLs are
> slower on EMR than s3:// URLs, but there's no fundamental reason wny it
> isn't going to work.
>
>
>
>
>
> On Sun, Aug 28, 2016 at 4:19 PM, Everett Anderson <ever...@nuna.com>
> wrote:
>
>> (Sorry, typo -- I was using spark.hadoop.mapreduce.f
>> ileoutputcommitter.algorithm.version=2 not 'hadooop', of course)
>>
>> On Sun, Aug 28, 2016 at 12:51 PM, Everett Anderson <ever...@nuna.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I'm having some trouble figuring out a failure when using S3A when
>>> writing a DataFrame as Parquet on EMR 4.7.2 (which is Hadoop 2.7.2 and
>>> Spark 1.6.2). It works when using EMRFS (s3://), though.
>>>
>>> I'm using these extra conf params, though I've also tried without
>>> everything but the encryption one with the same result:
>>>
>>> --conf spark.hadooop.mapreduce.fileoutputcommitter.algorithm.version=2
>>> --conf spark.hadoop.mapreduce.fileoutputcommitter.cleanup.skipped=true
>>> --conf spark.hadoop.fs.s3a.server-side-encryption-algorithm=AES256
>>>
>> --conf spark.sql.parquet.output.committer.class=org.apache.spark.sq
>>> l.parquet.DirectParquetOutputCommitter
>>>
>>> It looks like it does actually write the parquet shards under
>>>
>>> /_temporary/0/_temporary//
>>>
>>> but then must hit that S3 exception when trying to copy/rename. I think
>>> the NullPointerException deep down in Parquet is due to it causing close()
>>> more than once so isn't the root cause, but I'm not sure.
>>>
>>
> given the stack trace has abortTask() in it, I'd suspect that's a
> follow-on failure.
>
>
>
> One possibility here may be related to how EMR will handle your
> credentials (session credentials served up over IAM HTTP) and how Apache
> Hadoop 2.7's s3a auth works (IAM isn't supported until 2.8). That could
> trigger the problem. But I don't know.
>
> I do know that I have dataframes writing back to s3a on Hadoop 2.7.3, *not
> on EMR*.
>
>
>
>>> Anyone seen something like this?
>>>
>>> 16/08/28 19:46:28 ERROR InsertIntoHadoopFsRelation: Aborting job.
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 
>>> in stag

Does Spark on YARN inherit or replace the Hadoop/YARN configs?

2016-08-30 Thread Everett Anderson
Hi,

I've had a bit of trouble getting Spark on YARN to work. When executing in
this mode and submitting from outside the cluster, one must set
HADOOP_CONF_DIR or YARN_CONF_DIR
, from which
spark-submit can find the params it needs to locate and talk to the YARN
application manager.

However, Spark also packages up all the Hadoop+YARN config files, ships
them to the cluster, and then uses them there.

Does it only override settings on the cluster using those shipped files? Or
does it use those entirely instead of the config the cluster already has?

My impression is that it currently replaces rather than overrides, which
means you can't construct a minimal client-side Hadoop/YARN config with
only the properties necessary to find the cluster. Is that right?


Re: S3A + EMR failure when writing Parquet?

2016-08-29 Thread Everett Anderson
Okay, I don't think it's really just S3A issue, anymore. I can run the job
using fs.s3.impl/spark.hadoop.fs.s3.impl set to the S3A impl as a --conf
param from the EMR console successfully, as well.

The problem seems related to the fact that we're trying to spark-submit
jobs to a YARN cluster from outside the cluster itself.

The docs <https://spark.apache.org/docs/1.6.2/running-on-yarn.html> suggest
one must copy the Hadoop/YARN config XML outside of the cluster to do this,
which feels gross, but it's what we did. We had changed fs.s3.impl to use
S3A in that config, and that seems to result in the failure, though I still
can't figure out why.

Interestingly, if I don't make that change to the XML, and leave it as the
EMRFS implementation, it will work, as long as I use s3a:// URIs for the
jar, otherwise spark-submit won't be able to ship them to the cluster since
it won't have the EMRFS implementation locally.


On Sun, Aug 28, 2016 at 4:19 PM, Everett Anderson <ever...@nuna.com> wrote:

> (Sorry, typo -- I was using spark.hadoop.mapreduce.f
> ileoutputcommitter.algorithm.version=2 not 'hadooop', of course)
>
> On Sun, Aug 28, 2016 at 12:51 PM, Everett Anderson <ever...@nuna.com>
> wrote:
>
>> Hi,
>>
>> I'm having some trouble figuring out a failure when using S3A when
>> writing a DataFrame as Parquet on EMR 4.7.2 (which is Hadoop 2.7.2 and
>> Spark 1.6.2). It works when using EMRFS (s3://), though.
>>
>> I'm using these extra conf params, though I've also tried without
>> everything but the encryption one with the same result:
>>
>> --conf spark.hadooop.mapreduce.fileoutputcommitter.algorithm.version=2
>> --conf spark.hadoop.mapreduce.fileoutputcommitter.cleanup.skipped=true
>> --conf spark.hadoop.fs.s3a.server-side-encryption-algorithm=AES256
>> --conf spark.sql.parquet.output.committer.class=org.apache.spark.
>> sql.parquet.DirectParquetOutputCommitter
>>
>> It looks like it does actually write the parquet shards under
>>
>> /_temporary/0/_temporary//
>>
>> but then must hit that S3 exception when trying to copy/rename. I think
>> the NullPointerException deep down in Parquet is due to it causing close()
>> more than once so isn't the root cause, but I'm not sure.
>>
>> Anyone seen something like this?
>>
>> 16/08/28 19:46:28 ERROR InsertIntoHadoopFsRelation: Aborting job.
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in 
>> stage 1.0 failed 4 times, most recent failure: Lost task 9.3 in stage 1.0 
>> (TID 54, ip-10-8-38-103.us-west-2.compute.internal): 
>> org.apache.spark.SparkException: Task failed while writing rows
>>  at 
>> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:269)
>>  at 
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
>>  at 
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
>>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>  at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>  at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.RuntimeException: Failed to commit task
>>  at 
>> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$commitTask$1(WriterContainer.scala:283)
>>  at 
>> org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:265)
>>  at 
>> org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:260)
>>  at 
>> org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:260)
>>  at 
>> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1277)
>>  at 
>> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:266)
>>  ... 8 more
>>  Suppressed: java.lang.NullPointerException
>>  at 
>> org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRow

Re: S3A + EMR failure when writing Parquet?

2016-08-28 Thread Everett Anderson
(Sorry, typo -- I was using spark.hadoop.mapreduce.
fileoutputcommitter.algorithm.version=2 not 'hadooop', of course)

On Sun, Aug 28, 2016 at 12:51 PM, Everett Anderson <ever...@nuna.com> wrote:

> Hi,
>
> I'm having some trouble figuring out a failure when using S3A when writing
> a DataFrame as Parquet on EMR 4.7.2 (which is Hadoop 2.7.2 and Spark
> 1.6.2). It works when using EMRFS (s3://), though.
>
> I'm using these extra conf params, though I've also tried without
> everything but the encryption one with the same result:
>
> --conf spark.hadooop.mapreduce.fileoutputcommitter.algorithm.version=2
> --conf spark.hadoop.mapreduce.fileoutputcommitter.cleanup.skipped=true
> --conf spark.hadoop.fs.s3a.server-side-encryption-algorithm=AES256
> --conf spark.sql.parquet.output.committer.class=org.apache.
> spark.sql.parquet.DirectParquetOutputCommitter
>
> It looks like it does actually write the parquet shards under
>
> /_temporary/0/_temporary//
>
> but then must hit that S3 exception when trying to copy/rename. I think
> the NullPointerException deep down in Parquet is due to it causing close()
> more than once so isn't the root cause, but I'm not sure.
>
> Anyone seen something like this?
>
> 16/08/28 19:46:28 ERROR InsertIntoHadoopFsRelation: Aborting job.
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in 
> stage 1.0 failed 4 times, most recent failure: Lost task 9.3 in stage 1.0 
> (TID 54, ip-10-8-38-103.us-west-2.compute.internal): 
> org.apache.spark.SparkException: Task failed while writing rows
>   at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:269)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
>   at 
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Failed to commit task
>   at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$commitTask$1(WriterContainer.scala:283)
>   at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:265)
>   at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:260)
>   at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:260)
>   at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1277)
>   at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:266)
>   ... 8 more
>   Suppressed: java.lang.NullPointerException
>   at 
> org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:147)
>   at 
> org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:113)
>   at 
> org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
>   at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetRelation.scala:101)
>   at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$abortTask$1(WriterContainer.scala:290)
>   at 
> org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$2.apply$mcV$sp(WriterContainer.scala:266)
>   at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1286)
>   ... 9 more
> Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied 
> (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: 
> EA0E434768316935), S3 Extended Request ID: 
> fHtu7Q9VSi/8h0RAyfRiyK6uAJnajZBrwqZH3eBfF5kM13H6dDl006031NTwU/whyGu1uNqW1mI=
>   at 
> com.amazonaws.http.Ama

S3A + EMR failure when writing Parquet?

2016-08-28 Thread Everett Anderson
Hi,

I'm having some trouble figuring out a failure when using S3A when writing
a DataFrame as Parquet on EMR 4.7.2 (which is Hadoop 2.7.2 and Spark
1.6.2). It works when using EMRFS (s3://), though.

I'm using these extra conf params, though I've also tried without
everything but the encryption one with the same result:

--conf spark.hadooop.mapreduce.fileoutputcommitter.algorithm.version=2
--conf spark.hadoop.mapreduce.fileoutputcommitter.cleanup.skipped=true
--conf spark.hadoop.fs.s3a.server-side-encryption-algorithm=AES256
--conf
spark.sql.parquet.output.committer.class=org.apache.spark.sql.parquet.DirectParquetOutputCommitter

It looks like it does actually write the parquet shards under

/_temporary/0/_temporary//

but then must hit that S3 exception when trying to copy/rename. I think the
NullPointerException deep down in Parquet is due to it causing close() more
than once so isn't the root cause, but I'm not sure.

Anyone seen something like this?

16/08/28 19:46:28 ERROR InsertIntoHadoopFsRelation: Aborting job.
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 9 in stage 1.0 failed 4 times, most recent failure: Lost task 9.3
in stage 1.0 (TID 54, ip-10-8-38-103.us-west-2.compute.internal):
org.apache.spark.SparkException: Task failed while writing rows
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:269)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:148)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Failed to commit task
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$commitTask$1(WriterContainer.scala:283)
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply$mcV$sp(WriterContainer.scala:265)
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:260)
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:260)
at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1277)
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:266)
... 8 more
Suppressed: java.lang.NullPointerException
at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:147)
at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:113)
at 
org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:112)
at 
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetRelation.scala:101)
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer.org$apache$spark$sql$execution$datasources$DefaultWriterContainer$$abortTask$1(WriterContainer.scala:290)
at 
org.apache.spark.sql.execution.datasources.DefaultWriterContainer$$anonfun$writeRows$2.apply$mcV$sp(WriterContainer.scala:266)
at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1286)
... 9 more
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Access
Denied (Service: Amazon S3; Status Code: 403; Error Code:
AccessDenied; Request ID: EA0E434768316935), S3 Extended Request ID:
fHtu7Q9VSi/8h0RAyfRiyK6uAJnajZBrwqZH3eBfF5kM13H6dDl006031NTwU/whyGu1uNqW1mI=
at 
com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1389)
at 
com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:902)
at 
com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
at 
com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
at 
com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
at 
com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
at 
com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
at 

Re: Plans for improved Spark DataFrame/Dataset unit testing?

2016-08-21 Thread Everett Anderson
On Sun, Aug 21, 2016 at 3:08 AM, Bedrytski Aliaksandr <sp...@bedryt.ski>
wrote:

> Hi,
>
> we share the same spark/hive context between tests (executed in
> parallel), so the main problem is that the temporary tables are
> overwritten each time they are created, this may create race conditions
> as these tempTables may be seen as global mutable shared state.
>
> So each time we create a temporary table, we add an unique, incremented,
> thread safe id (AtomicInteger) to its name so that there are only
> specific, non-shared temporary tables used for a test.
>

Makes sense.

But when you say you're sharing the same spark/hive context between tests,
I'm assuming that's between the same tests within one test class, but
you're not sharing across test classes (which a build tool like Maven or
Gradle might have executed in separate JVMs).

Is that right?




>
> --
>   Bedrytski Aliaksandr
>   sp...@bedryt.ski
>
>
>
> On Sat, Aug 20, 2016, at 01:25, Everett Anderson wrote:
> Hi!
>
> Just following up on this --
>
> When people talk about a shared session/context for testing like this,
> I assume it's still within one test class. So it's still the case that
> if you have a lot of test classes that test Spark-related things, you
> must configure your build system to not run in them in parallel.
> You'll get the benefit of not creating and tearing down a Spark
> session/context between test cases with a test class, though.
>
> Is that right?
>
> Or have people figured out a way to have sbt (or Maven/Gradle/etc)
> share Spark sessions/contexts across integration tests in a safe way?
>
>
> On Mon, Aug 1, 2016 at 3:23 PM, Holden Karau
> <hol...@pigscanfly.ca> wrote:
> Thats a good point - there is an open issue for spark-testing-base to
> support this shared sparksession approach - but I haven't had the
> time ( https://github.com/holdenk/spark-testing-base/issues/123 ).
> I'll try and include this in the next release :)
>
> On Mon, Aug 1, 2016 at 9:22 AM, Koert Kuipers
> <ko...@tresata.com> wrote:
> we share a single single sparksession across tests, and they can run
> in parallel. is pretty fast
>
> On Mon, Aug 1, 2016 at 12:02 PM, Everett Anderson
> <ever...@nuna.com.invalid> wrote:
> Hi,
>
> Right now, if any code uses DataFrame/Dataset, I need a test setup
> that brings up a local master as in this article[1].
>
> That's a lot of overhead for unit testing and the tests can't run
> in parallel, so testing is slow -- this is more like what I'd call
> an integration test.
>
> Do people have any tricks to get around this? Maybe using spy mocks
> on fake DataFrame/Datasets?
>
> Anyone know if there are plans to make more traditional unit
> testing possible with Spark SQL, perhaps with a stripped down in-
> memory implementation? (I admit this does seem quite hard since
> there's so much functionality in these classes!)
>
> Thanks!
>
>
> - Everett
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>
>


Re: Plans for improved Spark DataFrame/Dataset unit testing?

2016-08-19 Thread Everett Anderson
Hi!

Just following up on this --

When people talk about a shared session/context for testing like this, I
assume it's still within one test class. So it's still the case that if you
have a lot of test classes that test Spark-related things, you must
configure your build system to not run in them in parallel. You'll get the
benefit of not creating and tearing down a Spark session/context between
test cases with a test class, though.

Is that right?

Or have people figured out a way to have sbt (or Maven/Gradle/etc) share
Spark sessions/contexts across integration tests in a safe way?


On Mon, Aug 1, 2016 at 3:23 PM, Holden Karau <hol...@pigscanfly.ca> wrote:

> Thats a good point - there is an open issue for spark-testing-base to
> support this shared sparksession approach - but I haven't had the time (
> https://github.com/holdenk/spark-testing-base/issues/123 ). I'll try and
> include this in the next release :)
>
> On Mon, Aug 1, 2016 at 9:22 AM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> we share a single single sparksession across tests, and they can run in
>> parallel. is pretty fast
>>
>> On Mon, Aug 1, 2016 at 12:02 PM, Everett Anderson <
>> ever...@nuna.com.invalid> wrote:
>>
>>> Hi,
>>>
>>> Right now, if any code uses DataFrame/Dataset, I need a test setup that
>>> brings up a local master as in this article
>>> <http://blog.cloudera.com/blog/2015/09/making-apache-spark-testing-easy-with-spark-testing-base/>
>>> .
>>>
>>> That's a lot of overhead for unit testing and the tests can't run in
>>> parallel, so testing is slow -- this is more like what I'd call an
>>> integration test.
>>>
>>> Do people have any tricks to get around this? Maybe using spy mocks on
>>> fake DataFrame/Datasets?
>>>
>>> Anyone know if there are plans to make more traditional unit testing
>>> possible with Spark SQL, perhaps with a stripped down in-memory
>>> implementation? (I admit this does seem quite hard since there's so much
>>> functionality in these classes!)
>>>
>>> Thanks!
>>>
>>> - Everett
>>>
>>>
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>


Submitting jobs to YARN from outside EMR -- config & S3 impl

2016-08-15 Thread Everett Anderson
Hi,

We're currently using an EMR cluster (which uses YARN) but submitting Spark
jobs to it using spark-submit from different machines outside the cluster.
We haven't had time to investigate using something like Livy
, yet.

We also have a need to use a mix of cluster and client modes in this
configuration.

Three things we've struggled with here are

   1. Configuring spark-submit with the necessary master node host & ports
   2. Setting up the cluster to support file staging
   3. S3 implementation choices

I'm curious -- how do others handle these?

Here's what we're doing in case it helps anybody --

*Configuring spark-submit *

As far as I can tell, you can't tell spark-submit the YARN resource manager
info on the command-line with --conf properties. You must set a
SPARK_CONF_DIR or HADOOP_CONF_DIR environment variable pointing to a local
directory with core-site.xml, yarn-site.xml, and optionally hive-site.xml.

However, these setting files will override what's on the cluster, so you
have to be careful and try to assemble just what you need (since you might
use differently configured clusters).

A starting point is to start a cluster and grab the files out of
/etc/hadoop/conf and then whittle them down.

*Setting up the cluster to support file staging*

Out of the box, spark-submit will fail when trying to stage files because
the cluster will try to put them in /user/(local user name on the machine
the job was submitted from). That directory and user won't exist on the
cluster.

I think spark.yarn.stagingDir can change the directory, but you seem to
need to setup your cluster with a bootstrap action to create and give fully
open write permissions.

*S3 implementation choices*

Back in the "Role-based S3 access outside of EMR' thread, we talked about
using S3A when running with the local master on an EC2 instance, which
works in Hadoop 2.7+ with the right libraries.

AWS provides their own Hadoop FileSystem implementation for S3 called
EMRFS, and the default EMR cluster setup uses it for "s3://" scheme URIs.
As far as I know, they haven't released this library for use elsewhere. It
supports "consistency view", which uses a DynamoDB to overcome any S3
list-key inconsistency/lag for I/O ops from the cluster. Presumably, also,
they maintain it and its config, and keep them up to date and performing
well.

If you use cluster mode and "s3://" scheme URIs, things work fine.

However, if you use client mode, it seems like Spark will try to use the
Hadoop "s3://" scheme FileSystem on the submitting host for something, and
it will fail because the default implementation won't know the credentials.
One work-around is to set environment variables or Hadoop conf properties
with your secret keys (!).

Another solution is to use the S3A implementation in Hadoop 2.7.x or later.
However, if you use "s3a://" scheme URIs, they'll also be used on the
cluster -- you'll use the S3A implementation for cluster operations instead
of the EMRFS implementation.

Similarly, if you change core-site.xml locally to use the S3A
implementation for "s3://" scheme URIs, that will cause the cluster to also
use the S3A implementation, when it could have used EMRFS.

Haven't figured out how to work around this, yet, or if it's important.


Re: Java and SparkSession

2016-08-05 Thread Everett Anderson
Hi,

Can you say more about what goes wrong?

I was migrating my code and began using this for initialization:

   SparkConf sparkConf = new SparkConf().setAppName(...)
   SparkSession sparkSession = new
SparkSession.Builder().config(sparkConf).getOrCreate();
   JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());

and then used SparkSession instead of the SQLContext, which seemed to work.


On Thu, Aug 4, 2016 at 9:41 PM, Andy Grove  wrote:

> From some brief experiments using Java with Spark 2.0 it looks like Java
> developers should stick to SparkContext and SQLContext rather than using
> the new SparkSession API?
>
> It would be great if someone could confirm if that is the intention or not.
>
> Thanks,
>
> Andy.
>
> --
>
> Andy Grove
> Chief Architect
> www.agildata.com
>
>


Plans for improved Spark DataFrame/Dataset unit testing?

2016-08-01 Thread Everett Anderson
Hi,

Right now, if any code uses DataFrame/Dataset, I need a test setup that
brings up a local master as in this article

.

That's a lot of overhead for unit testing and the tests can't run in
parallel, so testing is slow -- this is more like what I'd call an
integration test.

Do people have any tricks to get around this? Maybe using spy mocks on fake
DataFrame/Datasets?

Anyone know if there are plans to make more traditional unit testing
possible with Spark SQL, perhaps with a stripped down in-memory
implementation? (I admit this does seem quite hard since there's so much
functionality in these classes!)

Thanks!

- Everett


Re: Role-based S3 access outside of EMR

2016-07-28 Thread Everett Anderson
Hey,

Just wrapping this up --

I ended up following the instructions
<https://spark.apache.org/docs/1.6.2/building-spark.html> to build a custom
Spark release with Hadoop 2.7.2, stealing from Steve's SPARK-7481 PR a bit,
in order to get Spark 1.6.2 + Hadoop 2.7.2 + the hadoop-aws library (which
pulls in the proper AWS Java SDK dependency).

Now that there's an official Spark 2.0 + Hadoop 2.7.x release, this is
probably no longer necessary, but I haven't tried it, yet.

With the custom release, s3a paths work fine with EC2 role credentials
without doing anything special. The only thing I had to do was to add this
extra --conf flag to spark-submit in order to write to encrypted S3 buckets
--

--conf spark.hadoop.fs.s3a.server-side-encryption-algorithm=AES256

Full instructions for building on Mac are here:

1) Download the Spark 1.6.2 source from
https://spark.apache.org/downloads.html

2) Install R

brew tap homebrew/science
brew install r

3) Set JAVA_HOME and the MAVEN_OPTS as in the instructions

4) Modify the root pom.xml to add a hadoop-2.7 profile (mostly stolen from
Spark 2.0)


  hadoop-2.7
  
2.7.2
0.9.3
3.4.6
2.6.0
  
  

  
org.apache.hadoop
hadoop-aws
${hadoop.version}
${hadoop.deps.scope}

  
org.apache.hadoop
hadoop-common
  
  
commons-logging
commons-logging
  

  

  


5) Modify core/pom.xml to include the corresponding hadoop-aws and AWS SDK
libs


  org.apache.hadoop
  hadoop-client


  org.apache.hadoop
  hadoop-aws
  

  org.apache.hadoop
  hadoop-common


  commons-logging
  commons-logging

  


6) Build with

./make-distribution.sh --name custom-hadoop-2.7-2-aws-s3a --tgz -Psparkr
-Phadoop-2.7 -Phive -Phive-thriftserver -Pyarn







On Sat, Jul 23, 2016 at 4:11 AM, Steve Loughran <ste...@hortonworks.com>
wrote:

>
>
> Amazon S3 has stronger consistency guarantees than the ASF s3 clients, it
> uses dynamo to do this.
>
> there is some work underway to do something similar atop S3a, S3guard, see
> https://issues.apache.org/jira/browse/HADOOP-13345  .
>
> Regarding IAM support in Spark, The latest version of S3A, which will ship
> in Hadoop 2.8, adds: IAM, temporary credential, direct env var pickup —and
> the ability to add your own.
>
> Regarding getting the relevant binaries into your app, you need a version
> of the hadoop-aws library consistent with the rest of hadoop, and the
> version of the amazon AWS SDKs that hadoop was built against. APIs in the
> SDK have changed and attempting to upgrade the amazon JAR will fail.
>
> There's a PR attached to SPARK-7481 which does the bundling and adds a
> suite of tests...it's designed to work with Hadoop 2.7+ builds. if you are
> building Spark locally, please try it and provide feedback on the PR
>
> finally, don't try an use s3a  on hadoop-2.6...that was really in preview
> state, and it let bugs surface which were fixed in 2.7.
>
> -Steve
>
> ps: More on S3a in Hadoop 2.8. Things will be way better!
> http://slideshare.net/HadoopSummit/hadoop-cloud-storage-object-store-integration-in-production
>
>
> On 21 Jul 2016, at 17:23, Ewan Leith <ewan.le...@realitymine.com> wrote:
>
> If you use S3A rather than S3N, it supports IAM roles.
>
> I think you can make s3a used for s3:// style URLs so it’s consistent with
> your EMR paths by adding this to your Hadoop config, probably in
> core-site.xml:
>
> fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
> fs.s3.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
> fs.AbstractFileSystem.s3.impl=org.apache.hadoop.fs.s3a.S3A
> fs.AbstractFileSystem.s3a.impl=org.apache.hadoop.fs.s3a.S3A
>
> And make sure the s3a jars are in your classpath
>
> Thanks,
> Ewan
>
> *From:* Everett Anderson [mailto:ever...@nuna.com.INVALID
> <ever...@nuna.com.INVALID>]
> *Sent:* 21 July 2016 17:01
> *To:* Gourav Sengupta <gourav.sengu...@gmail.com>
> *Cc:* Teng Qiu <teng...@gmail.com>; Andy Davidson <
> a...@santacruzintegration.com>; user <user@spark.apache.org>
> *Subject:* Re: Role-based S3 access outside of EMR
>
> Hey,
>
> FWIW, we are using EMR, actually, in production.
>
> The main case I have for wanting to access S3 with Spark outside of EMR is
> that during development, our developers tend to run EC2 sandbox instances
> that have all the rest of our code and access to some of the input data on
> S3. It'd be nice if S3 access "just worked" on these without storing the
> access keys in an expo

Re: Programmatic use of UDFs from Java

2016-07-22 Thread Everett Anderson
Thanks for the pointer, Bryan! Sounds like I was on the right track in
terms of what's available for now.

(And Gourav -- I'm certainly interested in migrating to Scala, but our team
is mostly Java, Python, and R based right now!)


On Thu, Jul 21, 2016 at 11:00 PM, Bryan Cutler <cutl...@gmail.com> wrote:

> Everett, I had the same question today and came across this old thread.
> Not sure if there has been any more recent work to support this.
> http://apache-spark-developers-list.1001551.n3.nabble.com/Using-UDFs-in-Java-without-registration-td12497.html
>
>
> On Thu, Jul 21, 2016 at 10:10 AM, Everett Anderson <
> ever...@nuna.com.invalid> wrote:
>
>> Hi,
>>
>> In the Java Spark DataFrames API, you can create a UDF, register it, and
>> then access it by string name by using the convenience UDF classes in
>> org.apache.spark.sql.api.java
>> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/api/java/package-summary.html>
>> .
>>
>> Example
>>
>> UDF1<String, Long> testUdf1 = new UDF1<>() { ... }
>>
>> sqlContext.udf().register("testfn", testUdf1, DataTypes.LongType);
>>
>> DataFrame df2 = df.withColumn("new_col", *functions.callUDF("testfn"*,
>> df.col("old_col")));
>>
>> However, I'd like to avoid registering these by name, if possible, since
>> I have many of them and would need to deal with name conflicts.
>>
>> There are udf() methods like this that seem to be from the Scala API
>> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html#udf(scala.Function1,%20scala.reflect.api.TypeTags.TypeTag,%20scala.reflect.api.TypeTags.TypeTag)>,
>> where you don't have to register everything by name first.
>>
>> However, using those methods from Java would require interacting with
>> Scala's scala.reflect.api.TypeTags.TypeTag. I'm having a hard time
>> figuring out how to create a TypeTag from Java.
>>
>> Does anyone have an example of using the udf() methods from Java?
>>
>> Thanks!
>>
>> - Everett
>>
>>
>


Re: Creating a DataFrame from scratch

2016-07-22 Thread Everett Anderson
Actually, sorry, my mistake, you're calling

DataFrame df = sqlContext.createDataFrame(data,
org.apache.spark.sql.types.NumericType.class);

and giving it a list of objects which aren't NumericTypes, but the
wildcards in the signature let it happen.

I'm curious what'd happen if you gave it Integer.class, but I suspect it
still won't work because Integer may not have the bean-style getters.


On Fri, Jul 22, 2016 at 9:37 AM, Everett Anderson <ever...@nuna.com> wrote:

> Hey,
>
> I think what's happening is that you're calling this createDataFrame
> method
> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/SQLContext.html#createDataFrame(java.util.List,%20java.lang.Class)>
> :
>
> createDataFrame(java.util.List data, java.lang.Class beanClass)
>
> which expects a JavaBean-style class with get and set methods for the
> members, but Integer doesn't have such a getter.
>
> I bet there's an easier way if you just want a single-column DataFrame of
> a primitive type, but one way that would work is to manually construct the
> Rows using RowFactory.create()
> <https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/RowFactory.html#create(java.lang.Object...)>
> and assemble the DataFrame from that like
>
> List rows = convert your List to this in a loop with
> RowFactory.create()
>
> StructType schema = DataTypes.createStructType(Collections.singletonList(
>  DataTypes.createStructField("int_field", DataTypes.IntegerType,
> true)));
>
> DataFrame intDataFrame = sqlContext.createDataFrame(rows, schema);
>
>
>
> On Fri, Jul 22, 2016 at 7:53 AM, Jean Georges Perrin <j...@jgp.net> wrote:
>
>>
>>
>> I am trying to build a DataFrame from a list, here is the code:
>>
>> private void start() {
>> SparkConf conf = new SparkConf().setAppName("Data Set from Array"
>> ).setMaster("local");
>> SparkContext sc = new SparkContext(conf);
>> SQLContext sqlContext = new SQLContext(sc);
>>
>> Integer[] l = new Integer[] { 1, 2, 3, 4, 5, 6, 7 };
>> List data = Arrays.asList(l);
>>
>> System.out.println(data);
>>
>>
>> DataFrame df = sqlContext.createDataFrame(data,
>> org.apache.spark.sql.types.NumericType.class);
>> df.show();
>> }
>>
>> My result is (unpleasantly):
>>
>> [1, 2, 3, 4, 5, 6, 7]
>> ++
>> ||
>> ++
>> ||
>> ||
>> ||
>> ||
>> ||
>> ||
>> ||
>> ++
>>
>> I also tried with:
>> org.apache.spark.sql.types.NumericType.class
>> org.apache.spark.sql.types.IntegerType.class
>> org.apache.spark.sql.types.ArrayType.class
>>
>> I am probably missing something super obvious :(
>>
>> Thanks!
>>
>> jg
>>
>>
>>
>


Programmatic use of UDFs from Java

2016-07-21 Thread Everett Anderson
Hi,

In the Java Spark DataFrames API, you can create a UDF, register it, and
then access it by string name by using the convenience UDF classes in
org.apache.spark.sql.api.java

.

Example

UDF1 testUdf1 = new UDF1<>() { ... }

sqlContext.udf().register("testfn", testUdf1, DataTypes.LongType);

DataFrame df2 = df.withColumn("new_col", *functions.callUDF("testfn"*,
df.col("old_col")));

However, I'd like to avoid registering these by name, if possible, since I
have many of them and would need to deal with name conflicts.

There are udf() methods like this that seem to be from the Scala API
,
where you don't have to register everything by name first.

However, using those methods from Java would require interacting with
Scala's scala.reflect.api.TypeTags.TypeTag. I'm having a hard time figuring
out how to create a TypeTag from Java.

Does anyone have an example of using the udf() methods from Java?

Thanks!

- Everett


Re: Role-based S3 access outside of EMR

2016-07-21 Thread Everett Anderson
Hey,

FWIW, we are using EMR, actually, in production.

The main case I have for wanting to access S3 with Spark outside of EMR is
that during development, our developers tend to run EC2 sandbox instances
that have all the rest of our code and access to some of the input data on
S3. It'd be nice if S3 access "just worked" on these without storing the
access keys in an exposed manner.

Teng -- when you say you use EMRFS, does that mean you copied AWS's EMRFS
JAR from an EMR cluster and are using it outside? My impression is that AWS
hasn't released the EMRFS implementation as part of the aws-java-sdk, so
I'm wary of using it. Do you know if it's supported?


On Thu, Jul 21, 2016 at 2:32 AM, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi Teng,
>
> This is totally a flashing news for me, that people cannot use EMR in
> production because its not open sourced, I think that even Werner is not
> aware of such a problem. Is EMRFS opensourced? I am curious to know what
> does HA stand for?
>
> Regards,
> Gourav
>
> On Thu, Jul 21, 2016 at 8:37 AM, Teng Qiu <teng...@gmail.com> wrote:
>
>> there are several reasons that AWS users do (can) not use EMR, one
>> point for us is that security compliance problem, EMR is totally not
>> open sourced, we can not use it in production system. second is that
>> EMR do not support HA yet.
>>
>> but to the original question from @Everett :
>>
>> -> Credentials and Hadoop Configuration
>>
>> as you said, best practice should be "rely on machine roles", they
>> called IAM roles.
>>
>> we are using EMRFS impl for accessing s3, it supports IAM role-based
>> access control well. you can take a look here:
>> https://github.com/zalando/spark/tree/branch-1.6-zalando
>>
>> or simply use our docker image (Dockerfile on github:
>> https://github.com/zalando/spark-appliance/tree/master/Dockerfile)
>>
>> docker run -d --net=host \
>>-e START_MASTER="true" \
>>-e START_WORKER="true" \
>>-e START_WEBAPP="true" \
>>-e START_NOTEBOOK="true" \
>>registry.opensource.zalan.do/bi/spark:1.6.2-6
>>
>>
>> -> SDK and File System Dependencies
>>
>> as mentioned above, using EMRFS libs solved this problem:
>>
>> http://docs.aws.amazon.com//ElasticMapReduce/latest/ReleaseGuide/emr-fs.html
>>
>>
>> 2016-07-21 8:37 GMT+02:00 Gourav Sengupta <gourav.sengu...@gmail.com>:
>> > But that would mean you would be accessing data over internet increasing
>> > data read latency, data transmission failures. Why are you not using
>> EMR?
>> >
>> > Regards,
>> > Gourav
>> >
>> > On Thu, Jul 21, 2016 at 1:06 AM, Everett Anderson
>> <ever...@nuna.com.invalid>
>> > wrote:
>> >>
>> >> Thanks, Andy.
>> >>
>> >> I am indeed often doing something similar, now -- copying data locally
>> >> rather than dealing with the S3 impl selection and AWS credentials
>> issues.
>> >> It'd be nice if it worked a little easier out of the box, though!
>> >>
>> >>
>> >> On Tue, Jul 19, 2016 at 2:47 PM, Andy Davidson
>> >> <a...@santacruzintegration.com> wrote:
>> >>>
>> >>> Hi Everett
>> >>>
>> >>> I always do my initial data exploration and all our product
>> development
>> >>> in my local dev env. I typically select a small data set and copy it
>> to my
>> >>> local machine
>> >>>
>> >>> My main() has an optional command line argument ‘- - runLocal’
>> Normally I
>> >>> load data from either hdfs:/// or S3n:// . If the arg is set I read
>> from
>> >>> file:///
>> >>>
>> >>> Sometime I use a CLI arg ‘- -dataFileURL’
>> >>>
>> >>> So in your case I would log into my data cluster and use “AWS s3 cp"
>> to
>> >>> copy the data into my cluster and then use “SCP” to copy the data
>> from the
>> >>> data center back to my local env.
>> >>>
>> >>> Andy
>> >>>
>> >>> From: Everett Anderson <ever...@nuna.com.INVALID>
>> >>> Date: Tuesday, July 19, 2016 at 2:30 PM
>> >>> To: "user @spark" <user@spark.apache.org>
>> >>> Subject: Role-based S3 access outside of EMR
>> >>>
>> >>> Hi,
>> >>>
>&g

Re: Role-based S3 access outside of EMR

2016-07-20 Thread Everett Anderson
Thanks, Andy.

I am indeed often doing something similar, now -- copying data locally
rather than dealing with the S3 impl selection and AWS credentials issues.
It'd be nice if it worked a little easier out of the box, though!


On Tue, Jul 19, 2016 at 2:47 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> Hi Everett
>
> I always do my initial data exploration and all our product development in
> my local dev env. I typically select a small data set and copy it to my
> local machine
>
> My main() has an optional command line argument ‘- - runLocal’ Normally I
> load data from either hdfs:/// or S3n:// . If the arg is set I read from
> file:///
>
> Sometime I use a CLI arg ‘- -dataFileURL’
>
> So in your case I would log into my data cluster and use “AWS s3 cp" to
> copy the data into my cluster and then use “SCP” to copy the data from the
> data center back to my local env.
>
> Andy
>
> From: Everett Anderson <ever...@nuna.com.INVALID>
> Date: Tuesday, July 19, 2016 at 2:30 PM
> To: "user @spark" <user@spark.apache.org>
> Subject: Role-based S3 access outside of EMR
>
> Hi,
>
> When running on EMR, AWS configures Hadoop to use their EMRFS Hadoop
> FileSystem implementation for s3:// URLs and seems to install the
> necessary S3 credentials properties, as well.
>
> Often, it's nice during development to run outside of a cluster even with
> the "local" Spark master, though, which I've found to be more troublesome.
> I'm curious if I'm doing this the right way.
>
> There are two issues -- AWS credentials and finding the right combination
> of compatible AWS SDK and Hadoop S3 FileSystem dependencies.
>
> *Credentials and Hadoop Configuration*
>
> For credentials, some guides recommend setting AWS_SECRET_ACCESS_KEY and
> AWS_ACCESS_KEY_ID environment variables or putting the corresponding
> properties in Hadoop XML config files, but it seems better practice to rely
> on machine roles and not expose these.
>
> What I end up doing is, in code, when not running on EMR, creating a
> DefaultAWSCredentialsProviderChain
> <https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html>
> and then installing the following properties in the Hadoop Configuration
> using it:
>
> fs.s3.awsAccessKeyId
> fs.s3n.awsAccessKeyId
> fs.s3a.awsAccessKeyId
> fs.s3.awsSecretAccessKey
> fs.s3n.awsSecretAccessKey
> fs.s3a.awsSecretAccessKey
>
> I also set the fs.s3.impl and fs.s3n.impl properties to
> org.apache.hadoop.fs.s3a.S3AFileSystem to force them to use the S3A
> implementation since people usually use "s3://" URIs.
>
> *SDK and File System Dependencies*
>
> Some special combination
> <https://issues.apache.org/jira/browse/HADOOP-12420> of the Hadoop
> version, AWS SDK version, and hadoop-aws is necessary.
>
> One working S3A combination with Spark 1.6.1 + Hadoop 2.7.x for me seems
> to be with
>
> --packages
> com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.2
>
> Is this generally what people do? Is there a better way?
>
> I realize this isn't entirely a Spark-specific problem, but as so many
> people seem to be using S3 with Spark, I imagine this community's faced the
> problem a lot.
>
> Thanks!
>
> - Everett
>
>


Role-based S3 access outside of EMR

2016-07-19 Thread Everett Anderson
Hi,

When running on EMR, AWS configures Hadoop to use their EMRFS Hadoop
FileSystem implementation for s3:// URLs and seems to install the necessary
S3 credentials properties, as well.

Often, it's nice during development to run outside of a cluster even with
the "local" Spark master, though, which I've found to be more troublesome.
I'm curious if I'm doing this the right way.

There are two issues -- AWS credentials and finding the right combination
of compatible AWS SDK and Hadoop S3 FileSystem dependencies.

*Credentials and Hadoop Configuration*

For credentials, some guides recommend setting AWS_SECRET_ACCESS_KEY and
AWS_ACCESS_KEY_ID environment variables or putting the corresponding
properties in Hadoop XML config files, but it seems better practice to rely
on machine roles and not expose these.

What I end up doing is, in code, when not running on EMR, creating a
DefaultAWSCredentialsProviderChain

and then installing the following properties in the Hadoop Configuration
using it:

fs.s3.awsAccessKeyId
fs.s3n.awsAccessKeyId
fs.s3a.awsAccessKeyId
fs.s3.awsSecretAccessKey
fs.s3n.awsSecretAccessKey
fs.s3a.awsSecretAccessKey

I also set the fs.s3.impl and fs.s3n.impl properties to
org.apache.hadoop.fs.s3a.S3AFileSystem to force them to use the S3A
implementation since people usually use "s3://" URIs.

*SDK and File System Dependencies*

Some special combination
 of the Hadoop version,
AWS SDK version, and hadoop-aws is necessary.

One working S3A combination with Spark 1.6.1 + Hadoop 2.7.x for me seems to
be with

--packages
com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.2

Is this generally what people do? Is there a better way?

I realize this isn't entirely a Spark-specific problem, but as so many
people seem to be using S3 with Spark, I imagine this community's faced the
problem a lot.

Thanks!

- Everett


Re: Best practice for handing tables between pipeline components

2016-06-28 Thread Everett Anderson
Thanks! Alluxio looks quite promising, but also quite new.

What did people do before?

On Mon, Jun 27, 2016 at 12:33 PM, Gene Pang <gene.p...@gmail.com> wrote:

> Yes, Alluxio (http://www.alluxio.org/) can be used to store data
> in-memory between stages in a pipeline.
>
> Here is more information about running Spark with Alluxio:
> http://www.alluxio.org/documentation/v1.1.0/en/Running-Spark-on-Alluxio.html
>
> Hope that helps,
> Gene
>
> On Mon, Jun 27, 2016 at 10:38 AM, Sathish Kumaran Vairavelu <
> vsathishkuma...@gmail.com> wrote:
>
>> Alluxio off heap memory would help to share cached objects
>>
>> On Mon, Jun 27, 2016 at 11:14 AM Everett Anderson
>> <ever...@nuna.com.invalid> wrote:
>>
>>> Hi,
>>>
>>> We have a pipeline of components strung together via Airflow running on
>>> AWS. Some of them are implemented in Spark, but some aren't. Generally they
>>> can all talk to a JDBC/ODBC end point or read/write files from S3.
>>>
>>> Ideally, we wouldn't suffer the I/O cost of writing all the data to HDFS
>>> or S3 and reading it back in, again, in every component, if it could stay
>>> cached in memory in a Spark cluster.
>>>
>>> Our current investigation seems to lead us towards exploring if the
>>> following things are possible:
>>>
>>>- Using a Hive metastore with S3 as its backing data store to try to
>>>keep a mapping from table name to files on S3 (not sure if one can cache 
>>> a
>>>Hive table in Spark across contexts, though)
>>>- Using something like the spark-jobserver to keep a
>>>Spark SQLContext open across Spark components so they could avoid file 
>>> I/O
>>>for cached tables
>>>
>>> What's the best practice for handing tables between Spark programs? What
>>> about between Spark and non-Spark programs?
>>>
>>> Thanks!
>>>
>>> - Everett
>>>
>>>
>


Best practice for handing tables between pipeline components

2016-06-27 Thread Everett Anderson
Hi,

We have a pipeline of components strung together via Airflow running on
AWS. Some of them are implemented in Spark, but some aren't. Generally they
can all talk to a JDBC/ODBC end point or read/write files from S3.

Ideally, we wouldn't suffer the I/O cost of writing all the data to HDFS or
S3 and reading it back in, again, in every component, if it could stay
cached in memory in a Spark cluster.

Our current investigation seems to lead us towards exploring if the
following things are possible:

   - Using a Hive metastore with S3 as its backing data store to try to
   keep a mapping from table name to files on S3 (not sure if one can cache a
   Hive table in Spark across contexts, though)
   - Using something like the spark-jobserver to keep a Spark SQLContext
   open across Spark components so they could avoid file I/O for cached tables

What's the best practice for handing tables between Spark programs? What
about between Spark and non-Spark programs?

Thanks!

- Everett


Re: Best way to go from RDD to DataFrame of StringType columns

2016-06-17 Thread Everett Anderson
On Fri, Jun 17, 2016 at 1:17 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
wrote:

> Ok a bit of a challenge.
>
> Have you tried using databricks stuff?. they can read compressed files and
> they might work here?
>
> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header",
> "true").load("hdfs://rhes564:9000/data/stg/accounts/nw/10124772")
>
> case class Accounts( TransactionDate: String, TransactionType: String,
> Description: String, Value: Double, Balance: Double, AccountName: String,
> AccountNumber : String)
> // Map the columns to names
> //
> val a = df.filter(col("Date") > "").map(p =>
> Accounts(p(0).toString,p(1).toString,p(2).toString,p(3).toString.toDouble,p(4).toString.toDouble,p(5).toString,p(6).toString))
> //
> // Create a Spark temporary table
> //
> a.toDF.registerTempTable("tmp")
>

Yes, I looked at their spark-csv package -- it'd be great for CSV (or even
a large swath of delimited file formats). In some cases, I have file
formats that aren't delimited in a way compatible with that, though, so was
rolling my own string lines => DataFrames.

Also, there are arbitrary record formats, and I don't want to restrict to a
compile-time value class, hence the need to manually create the schema.




>
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 17 June 2016 at 21:02, Everett Anderson <ever...@nuna.com> wrote:
>
>>
>>
>> On Fri, Jun 17, 2016 at 12:44 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Are these mainly in csv format?
>>>
>>
>> Alas, no -- lots of different formats. Many are fixed width files, where
>> I have outside information to know which byte ranges correspond to which
>> columns. Some have odd null representations or non-comma delimiters (though
>> many of those cases might fit within the configurability of the spark-csv
>> package).
>>
>>
>>
>>
>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 17 June 2016 at 20:38, Everett Anderson <ever...@nuna.com.invalid>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a system with files in a variety of non-standard input formats,
>>>> though they're generally flat text files. I'd like to dynamically create
>>>> DataFrames of string columns.
>>>>
>>>> What's the best way to go from a RDD to a DataFrame of
>>>> StringType columns?
>>>>
>>>> My current plan is
>>>>
>>>>- Call map() on the RDD with a function to split the String
>>>>into columns and call RowFactory.create() with the resulting array,
>>>>creating a RDD
>>>>- Construct a StructType schema using column names and StringType
>>>>- Call SQLContext.createDataFrame(RDD, schema) to create the result
>>>>
>>>> Does that make sense?
>>>>
>>>> I looked through the spark-csv package a little and noticed that it's
>>>> using baseRelationToDataFrame(), but BaseRelation looks like it might be a
>>>> restricted developer API. Anyone know if it's recommended for use?
>>>>
>>>> Thanks!
>>>>
>>>> - Everett
>>>>
>>>>
>>>
>>
>


Re: Best way to go from RDD to DataFrame of StringType columns

2016-06-17 Thread Everett Anderson
On Fri, Jun 17, 2016 at 12:44 PM, Mich Talebzadeh <mich.talebza...@gmail.com
> wrote:

> Are these mainly in csv format?
>

Alas, no -- lots of different formats. Many are fixed width files, where I
have outside information to know which byte ranges correspond to which
columns. Some have odd null representations or non-comma delimiters (though
many of those cases might fit within the configurability of the spark-csv
package).





>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 17 June 2016 at 20:38, Everett Anderson <ever...@nuna.com.invalid>
> wrote:
>
>> Hi,
>>
>> I have a system with files in a variety of non-standard input formats,
>> though they're generally flat text files. I'd like to dynamically create
>> DataFrames of string columns.
>>
>> What's the best way to go from a RDD to a DataFrame of StringType
>> columns?
>>
>> My current plan is
>>
>>- Call map() on the RDD with a function to split the String
>>into columns and call RowFactory.create() with the resulting array,
>>creating a RDD
>>- Construct a StructType schema using column names and StringType
>>- Call SQLContext.createDataFrame(RDD, schema) to create the result
>>
>> Does that make sense?
>>
>> I looked through the spark-csv package a little and noticed that it's
>> using baseRelationToDataFrame(), but BaseRelation looks like it might be a
>> restricted developer API. Anyone know if it's recommended for use?
>>
>> Thanks!
>>
>> - Everett
>>
>>
>


Best way to go from RDD to DataFrame of StringType columns

2016-06-17 Thread Everett Anderson
Hi,

I have a system with files in a variety of non-standard input formats,
though they're generally flat text files. I'd like to dynamically create
DataFrames of string columns.

What's the best way to go from a RDD to a DataFrame of StringType
columns?

My current plan is

   - Call map() on the RDD with a function to split the String into
   columns and call RowFactory.create() with the resulting array, creating a
   RDD
   - Construct a StructType schema using column names and StringType
   - Call SQLContext.createDataFrame(RDD, schema) to create the result

Does that make sense?

I looked through the spark-csv package a little and noticed that it's using
baseRelationToDataFrame(), but BaseRelation looks like it might be a
restricted developer API. Anyone know if it's recommended for use?

Thanks!

- Everett


Re: StackOverflowError even with JavaSparkContext union(JavaRDD... rdds)

2016-06-05 Thread Everett Anderson
Indeed!

I wasn't able to get this to work in cluster mode, yet, but increasing
driver and executor stack sizes in client mode (still running on a YARN EMR
cluster) got it to work! I'll fiddle more.

FWIW, I used

spark-submit --deploy-mode client --conf
"spark.executor.extraJavaOptions=-XX:ThreadStackSize=81920" --conf
"spark.driver.extraJavaOptions=-XX:ThreadStackSize=81920" 

Thank you so much!

On Sun, Jun 5, 2016 at 2:34 PM, Eugene Morozov <evgeny.a.moro...@gmail.com>
wrote:

> Everett,
>
> try to increase thread stack size. To do that run your application with
> the following options (my app is a web application, so you might adjust
> something): -XX:ThreadStackSize=81920
> -Dspark.executor.extraJavaOptions="-XX:ThreadStackSize=81920"
>
> The number 81920 is memory in KB. You could try smth less. It's pretty
> memory consuming to have 80M for each thread (very simply there might be
> 100 of them), but this is just a workaround. This is configuration that I
> use to train random forest with input of 400k samples.
>
> Hope this helps.
>
> --
> Be well!
> Jean Morozov
>
> On Sun, Jun 5, 2016 at 11:17 PM, Everett Anderson <
> ever...@nuna.com.invalid> wrote:
>
>> Hi!
>>
>> I have a fairly simple Spark (1.6.1) Java RDD-based program that's
>> scanning through lines of about 1000 large text files of records and
>> computing some metrics about each line (record type, line length, etc).
>> Most are identical so I'm calling distinct().
>>
>> In the loop over the list of files, I'm saving up the resulting RDDs into
>> a List. After the loop, I use the JavaSparkContext union(JavaRDD...
>> rdds) method to collapse the tables into one.
>>
>> Like this --
>>
>> List<JavaRDD> allMetrics = ...
>> for (int i = 0; i < files.size(); i++) {
>>JavaPairRDD<...> lines = jsc.newAPIHadoopFile(...);
>>JavaRDD distinctFileMetrics =
>> lines.flatMap(fn).distinct();
>>
>>allMetrics.add(distinctFileMetrics);
>> }
>>
>> JavaRDD finalOutput =
>> jsc.union(allMetrics.toArray(...)).coalesce(10);
>> finalOutput.saveAsTextFile(...);
>>
>> There are posts suggesting
>> <https://stackoverflow.com/questions/30522564/spark-when-union-a-lot-of-rdd-throws-stack-overflow-error>
>> that using JavaRDD union(JavaRDD other) many times creates a long
>> lineage that results in a StackOverflowError.
>>
>> However, I'm seeing the StackOverflowError even with JavaSparkContext
>> union(JavaRDD... rdds).
>>
>> Should this still be happening?
>>
>> I'm using the work-around from this 2014 thread
>> <http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tt5649.html#a5752>,
>>  shown
>> below, which requires checkpointing to HDFS every N iterations, but it's
>> ugly and decreases performance.
>>
>> Is there a lighter weight way to compact the lineage? It looks like at
>> some point there might've been a "local checkpoint
>> <https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-checkpointing.html>"
>> feature?
>>
>> Work-around:
>>
>> List<JavaRDD> allMetrics = ...
>> for (int i = 0; i < files.size(); i++) {
>>JavaPairRDD<...> lines = jsc.newAPIHadoopFile(...);
>>JavaRDD distinctFileMetrics =
>> lines.flatMap(fn).distinct();
>>allMetrics.add(distinctFileMetrics);
>>
>>// Union and checkpoint occasionally to reduce lineage
>>if (i % tablesPerCheckpoint == 0) {
>>JavaRDD dataSoFar =
>>jsc.union(allMetrics.toArray(...));
>>dataSoFar.checkpoint();
>>dataSoFar.count();
>>allMetrics.clear();
>>allMetrics.add(dataSoFar);
>>}
>> }
>>
>> When the StackOverflowError happens, it's a long trace starting with --
>>
>> 16/06/05 18:01:29 INFO YarnAllocator: Driver requested a total number of 
>> 20823 executor(s).
>> 16/06/05 18:01:29 WARN ApplicationMaster: Reporter thread fails 3 time(s) in 
>> a row.
>> java.lang.StackOverflowError
>>  at 
>> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>>  at 
>> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>>  at 
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>>  at 
>> scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
>>
>> ...
>>
>> Thanks!
>>
>> - Everett
>>
>>
>


StackOverflowError even with JavaSparkContext union(JavaRDD... rdds)

2016-06-05 Thread Everett Anderson
Hi!

I have a fairly simple Spark (1.6.1) Java RDD-based program that's scanning
through lines of about 1000 large text files of records and computing some
metrics about each line (record type, line length, etc). Most are identical
so I'm calling distinct().

In the loop over the list of files, I'm saving up the resulting RDDs into a
List. After the loop, I use the JavaSparkContext union(JavaRDD... rdds)
method to collapse the tables into one.

Like this --

List allMetrics = ...
for (int i = 0; i < files.size(); i++) {
   JavaPairRDD<...> lines = jsc.newAPIHadoopFile(...);
   JavaRDD distinctFileMetrics =
lines.flatMap(fn).distinct();

   allMetrics.add(distinctFileMetrics);
}

JavaRDD finalOutput =
jsc.union(allMetrics.toArray(...)).coalesce(10);
finalOutput.saveAsTextFile(...);

There are posts suggesting

that using JavaRDD union(JavaRDD other) many times creates a long
lineage that results in a StackOverflowError.

However, I'm seeing the StackOverflowError even with JavaSparkContext
union(JavaRDD... rdds).

Should this still be happening?

I'm using the work-around from this 2014 thread
,
shown
below, which requires checkpointing to HDFS every N iterations, but it's
ugly and decreases performance.

Is there a lighter weight way to compact the lineage? It looks like at some
point there might've been a "local checkpoint
"
feature?

Work-around:

List allMetrics = ...
for (int i = 0; i < files.size(); i++) {
   JavaPairRDD<...> lines = jsc.newAPIHadoopFile(...);
   JavaRDD distinctFileMetrics =
lines.flatMap(fn).distinct();
   allMetrics.add(distinctFileMetrics);

   // Union and checkpoint occasionally to reduce lineage
   if (i % tablesPerCheckpoint == 0) {
   JavaRDD dataSoFar =
   jsc.union(allMetrics.toArray(...));
   dataSoFar.checkpoint();
   dataSoFar.count();
   allMetrics.clear();
   allMetrics.add(dataSoFar);
   }
}

When the StackOverflowError happens, it's a long trace starting with --

16/06/05 18:01:29 INFO YarnAllocator: Driver requested a total number
of 20823 executor(s).
16/06/05 18:01:29 WARN ApplicationMaster: Reporter thread fails 3
time(s) in a row.
java.lang.StackOverflowError
at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.MapLike$MappedValues$$anonfun$foreach$3.apply(MapLike.scala:245)

...

Thanks!

- Everett