to_avro/from_avro inserts extra values from Kafka

2020-05-12 Thread Alex Nastetsky
Hi all,

I create a dataframe, convert it to Avro with to_avro and write it to
Kafka.
Then I read it back out with from_avro.
(Not using Schema Registry.)
The problem is that the values skip every other field in the result.

I expect:
+-++-+---+
|firstName|lastName|color|   mood|
+-++-+---+
| Suzy|  Samson   |  indigo   |  grim |
| Jim|   Johnson  |   blue  | grimmer |
+-++-+---+

Instead I get:

+-++-+---+
|firstName|lastName|color|   mood|
+-++-+---+
| |Suzy| | Samson|
| | Jim| |Johnson|
+-++-+---+

Here's what I'm doing --

$ kt admin -createtopic persons-avro-spark9 -topicdetail <(jsonify
=NumPartitions 1 =ReplicationFactor 1)

$ cat person.avsc
{
  "type": "record",
  "name": "Person",
  "namespace": "com.ippontech.kafkatutorials",
  "fields": [
{
  "name": "firstName",
  "type": "string"
},
{
  "name": "lastName",
  "type": "string"
},
{
  "name": "color",
  "type": "string"
},
{
  "name": "mood",
  "type": "string"
}
  ]

$ spark-shell --packages
org.apache.spark:spark-avro_2.11:2.4.5,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5

scala> :paste
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.avro._
import java.nio.file.Files;
import java.nio.file.Paths;

val topic = "persons-avro-spark9"


// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new
String(Files.readAllBytes(Paths.get("person.avsc")))


val personDF = sc.parallelize(Seq(
("Jim","Johnson","indigo","grim"),
("Suzy","Samson","blue","grimmer")
)).toDF("firstName","lastName","color","mood")

personDF.select(to_avro(struct(personDF.columns.map(column):_*)).alias("value"))
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic",topic)
.option("avroSchema",jsonFormatSchema)
.save()

val stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
.select(from_avro('value, jsonFormatSchema) as 'person)

.select($"person.firstName",$"person.lastName",$"person.color",$"person.mood")
.writeStream
.format("console")
.start()

// Exiting paste mode, now interpreting.

import org.apache.spark.sql.avro._

import java.nio.file.Files
import java.nio.file.Paths
topic: String = persons-avro-spark9
jsonFormatSchema: String =
{
  "type": "record",
  "name": "Person",
  "namespace": "com.ippontech.kafkatutorials",
  "fields": [
{
  "name": "firstName",
  "type": "string"
},
{
  "name": "lastName",
  "type": "string"
},
{
  "name": "color",
  "type": "string"
},
{
  "name": "mood",
  "type": "string"
}
  ]
}
personDF: org.apache.spark.sql.DataFrame = [firstName: string, lastName:
string ... 2 more fields]
stream: org.apache.spark.sql.streaming.StreamingQuery =
org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@3990c36c

scala> ---
Batch: 0
---
+-++-+---+
|firstName|lastName|color|   mood|
+-++-+---+
| |Suzy| | Samson|
| | Jim| |Johnson|
+-++-+---+

See the raw bytes:

$ kt consume -topic persons-avro-spark9
{
  "partition": 0,
  "offset": 0,
  "key": null,
  "value":
"\u\u0008Suzy\u\u000cSamson\u\u0008blue\u\u000egrimmer",
  "timestamp": "2020-05-12T17:18:53.858-04:00"
}
{
  "partition": 0,
  "offset": 1,
  "key": null,
  "value":
"\u\u0006Jim\u\u000eJohnson\u\u000cindigo\u\u0008grim",
  "timestamp": "2020-05-12T17:18:53.859-04:00"
}

Thanks,
Alex.


"where" clause able to access fields not in its schema

2019-02-13 Thread Alex Nastetsky
I don't know if this is a bug or a feature, but it's a bit counter-intuitive 
when reading code.

The "b" dataframe does not have field "bar" in its schema, but is still able to 
filter on that field.

scala> val a = sc.parallelize(Seq((1,10),(2,20))).toDF("foo","bar")
a: org.apache.spark.sql.DataFrame = [foo: int, bar: int]

scala> a.show
+---+---+
|foo|bar|
+---+---+
|  1| 10|
|  2| 20|
+---+---+

scala> val b = a.select($"foo")
b: org.apache.spark.sql.DataFrame = [foo: int]

scala> b.schema
res3: org.apache.spark.sql.types.StructType = 
StructType(StructField(foo,IntegerType,false))

scala> b.select($"bar").show
org.apache.spark.sql.AnalysisException: cannot resolve '`bar`' given input 
columns: [foo];;
[...snip...]

scala> b.where($"bar" === 20).show
+---+
|foo|
+---+
|  2|
+---+



Re: partitionBy with partitioned column in output?

2018-02-26 Thread Alex Nastetsky
Yeah, was just discussing this with a co-worker and came to the same
conclusion -- need to essentially create a copy of the partition column.
Thanks.

Hacky, but it works. Seems counter-intuitive that Spark would remove the
column from the output... should at least give you an option to keep it.

On Mon, Feb 26, 2018 at 5:47 PM, naresh Goud <nareshgoud.du...@gmail.com>
wrote:

> is this helps?
>
> sc.parallelize(List((1,10),(2,20))).toDF("foo","bar").map(("
> foo","bar")=>("foo",("foo","bar"))).partitionBy("foo").json("json-out")
>
>
> On Mon, Feb 26, 2018 at 4:28 PM, Alex Nastetsky <alex.nastet...@verve.com>
> wrote:
>
>> Is there a way to make outputs created with "partitionBy" to contain the
>> partitioned column? When reading the output with Spark or Hive or similar,
>> it's less of an issue because those tools know how to perform partition
>> discovery. But if I were to load the output into an external data warehouse
>> or database, it would have no idea.
>>
>> Example below -- a dataframe with two columns "foo" and "bar" is
>> partitioned by "foo", but the data only contains "bar", since it expects
>> the reader to know how to derive the value of "foo" from the parent
>> directory. Note that it's the same thing with Parquet and Avro as well, I
>> just chose to use JSON in my example.
>>
>> scala> sc.parallelize(List((1,10),(2,20))).toDF("foo","bar").write.
>> partitionBy("foo").json("json-out")
>>
>>
>> $ ls json-out/
>> foo=1  foo=2  _SUCCESS
>> $ cat json-out/foo=1/part-3-18ca93d0-c3b1-424b-8ad5-291d8a2952
>> 3b.json
>> {"bar":10}
>> $ cat json-out/foo=2/part-7-18ca93d0-c3b1-424b-8ad5-291d8a2952
>> 3b.json
>> {"bar":20}
>>
>> Thanks,
>> Alex.
>>
>
>


partitionBy with partitioned column in output?

2018-02-26 Thread Alex Nastetsky
Is there a way to make outputs created with "partitionBy" to contain the
partitioned column? When reading the output with Spark or Hive or similar,
it's less of an issue because those tools know how to perform partition
discovery. But if I were to load the output into an external data warehouse
or database, it would have no idea.

Example below -- a dataframe with two columns "foo" and "bar" is
partitioned by "foo", but the data only contains "bar", since it expects
the reader to know how to derive the value of "foo" from the parent
directory. Note that it's the same thing with Parquet and Avro as well, I
just chose to use JSON in my example.

scala>
sc.parallelize(List((1,10),(2,20))).toDF("foo","bar").write.partitionBy("foo").json("json-out")


$ ls json-out/
foo=1  foo=2  _SUCCESS
$ cat json-out/foo=1/part-3-18ca93d0-c3b1-424b-8ad5-291d8a29523b.json
{"bar":10}
$ cat json-out/foo=2/part-7-18ca93d0-c3b1-424b-8ad5-291d8a29523b.json
{"bar":20}

Thanks,
Alex.


Dataset API inconsistencies

2018-01-09 Thread Alex Nastetsky
I am finding using the Dataset API to be very cumbersome to use, which is
unfortunate, as I was looking forward to the type-safety after coming from
a Dataframe codebase.

This link summarizes my troubles: http://loicdescotte.
github.io/posts/spark2-datasets-type-safety/

The problem is having to continuously switch back and forth between typed
and untyped semantics, which really kills productivity. In contrast, the
RDD API is consistently typed and the Dataframe API is consistently
untyped. I don't have to continuously stop and think about which one to use
for each operation.

I gave the Frameless framework (mentioned in the link) a shot, but
eventually started running into oddities and lack of enough documentation
and community support and did not want to sink too much time into it.

At this point I'm considering just sticking with Dataframes, as I don't
really consider Datasets to be usable. Has anyone had a similar experience
or has had better luck?

Alex.


Re: spark sql aggregate function "Nth"

2016-07-26 Thread Alex Nastetsky
Ah, that gives me an idea.

val window = Window.partitionBy()
val getRand = udf((cnt:Int) =>  )

df
.withColumn("cnt", count().over(window))
.withColumn("rnd", getRand($"cnt"))
.where($"rnd" === $"cnt")

Not sure how performant this would be, but writing a UDF is much simpler
than a UDAF.

On Tue, Jul 26, 2016 at 11:48 AM, ayan guha <guha.a...@gmail.com> wrote:

> You can use rank with window function. Rank=1 is same as calling first().
>
> Not sure how you would randomly pick records though, if there is no Nth
> record. In your example, what happens if data is of only 2 rows?
> On 27 Jul 2016 00:57, "Alex Nastetsky" <alex.nastet...@vervemobile.com>
> wrote:
>
>> Spark SQL has a "first" function that returns the first item in a group.
>> Is there a similar function, perhaps in a third party lib, that allows you
>> to return an arbitrary (e.g. 3rd) item from the group? Was thinking of
>> writing a UDAF for it, but didn't want to reinvent the wheel. My endgoal is
>> to be able to select a random item from the group, using random number
>> generator.
>>
>> Thanks.
>>
>


spark sql aggregate function "Nth"

2016-07-26 Thread Alex Nastetsky
Spark SQL has a "first" function that returns the first item in a group. Is
there a similar function, perhaps in a third party lib, that allows you to
return an arbitrary (e.g. 3rd) item from the group? Was thinking of writing
a UDAF for it, but didn't want to reinvent the wheel. My endgoal is to be
able to select a random item from the group, using random number generator.

Thanks.


Databricks Cloud vs AWS EMR

2016-01-26 Thread Alex Nastetsky
As a user of AWS EMR (running Spark and MapReduce), I am interested in
potential benefits that I may gain from Databricks Cloud. I was wondering
if anyone has used both and done comparison / contrast between the two
services.

In general, which resource manager(s) does Databricks Cloud use for Spark?
If it's YARN, can you also run MapReduce jobs in Databricks Cloud?

Thanks.


Re: Enabling mapreduce.input.fileinputformat.list-status.num-threads in Spark?

2016-01-12 Thread Alex Nastetsky
Ran into this need myself. Does Spark have an equivalent of  "mapreduce.
input.fileinputformat.list-status.num-threads"?

Thanks.

On Thu, Jul 23, 2015 at 8:50 PM, Cheolsoo Park  wrote:

> Hi,
>
> I am wondering if anyone has successfully enabled
> "mapreduce.input.fileinputformat.list-status.num-threads" in Spark jobs. I
> usually set this property to 25 to speed up file listing in MR jobs (Hive
> and Pig). But for some reason, this property does not take effect in Spark
> HadoopRDD resulting in serious delay in file listing.
>
> I verified that the property is indeed set in HadoopRDD by logging the
> value of the property in the getPartitions() function. I also tried to
> attach VisualVM to Spark and Pig clients, which look as follows-
>
> In Pig, I can see 25 threads running in parallel for file listing-
> [image: Inline image 1]
>
> In Spark, I only see 2 threads running in parallel for file listing-
> [image: Inline image 2]
>
> What's strange is that the # of concurrent threads in Spark is throttled
> no matter how high I
> set "mapreduce.input.fileinputformat.list-status.num-threads".
>
> Is anyone using Spark with this property enabled? If so, can you please
> share how you do it?
>
> Thanks!
> Cheolsoo
>


Re: Enabling mapreduce.input.fileinputformat.list-status.num-threads in Spark?

2016-01-12 Thread Alex Nastetsky
Thanks. I was actually able to get mapreduce.input.
fileinputformat.list-status.num-threads working in Spark against a regular
fileset in S3, in Spark 1.5.2 ... looks like the issue is isolated to Hive.

On Tue, Jan 12, 2016 at 6:48 PM, Cheolsoo Park <piaozhe...@gmail.com> wrote:

> Alex, see this jira-
> https://issues.apache.org/jira/browse/SPARK-9926
>
> On Tue, Jan 12, 2016 at 10:55 AM, Alex Nastetsky <
> alex.nastet...@vervemobile.com> wrote:
>
>> Ran into this need myself. Does Spark have an equivalent of  "mapreduce.
>> input.fileinputformat.list-status.num-threads"?
>>
>> Thanks.
>>
>> On Thu, Jul 23, 2015 at 8:50 PM, Cheolsoo Park <piaozhe...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I am wondering if anyone has successfully enabled
>>> "mapreduce.input.fileinputformat.list-status.num-threads" in Spark jobs. I
>>> usually set this property to 25 to speed up file listing in MR jobs (Hive
>>> and Pig). But for some reason, this property does not take effect in Spark
>>> HadoopRDD resulting in serious delay in file listing.
>>>
>>> I verified that the property is indeed set in HadoopRDD by logging the
>>> value of the property in the getPartitions() function. I also tried to
>>> attach VisualVM to Spark and Pig clients, which look as follows-
>>>
>>> In Pig, I can see 25 threads running in parallel for file listing-
>>> [image: Inline image 1]
>>>
>>> In Spark, I only see 2 threads running in parallel for file listing-
>>> [image: Inline image 2]
>>>
>>> What's strange is that the # of concurrent threads in Spark is throttled
>>> no matter how high I
>>> set "mapreduce.input.fileinputformat.list-status.num-threads".
>>>
>>> Is anyone using Spark with this property enabled? If so, can you please
>>> share how you do it?
>>>
>>> Thanks!
>>> Cheolsoo
>>>
>>
>>
>


Spark SQL UDAF works fine locally, OutOfMemory on YARN

2015-11-16 Thread Alex Nastetsky
Hi,

I am using Spark 1.5.1.

I have a Spark SQL UDAF that works fine on a tiny dataset (13 narrow rows)
in local mode, but runs out of memory on YARN about half the time
(OutOfMemory: Java Heap Space). The rest of the time, it works on YARN.

Note that in all instances, the input data is the same.

Here is the UDAF: https://gist.github.com/alexnastetsky/581af2672328c4b8b023

I am also using a trivial UDT to keep track of each unique value and its
count.

The basic idea is to have a secondary grouping and to count the number
occurrences of each value in the group. For example, we want to group on
column X; then for each group, we want to aggregate the rows by column Y
and count how many times each unique value of Y appears.

So, given the following data:

X Y
a 1
a 2
a 2
a 2
b 3
b 3
b 4

I would do

myudaf = new MergeArraysOfElementWithCountUDAF()
df = // load data
df.groupBy($"X")
.agg(
myudaf($"Y").as("aggY")
)

should provide data like the following

X aggY
a [{"element":"1", "count":"1"}, {"element":"2", "count":"3"}]
b [{"element":"3", "count":"2"}, {"element":"4", "count":"1"}]

There's also an option to take as input an array, instead of a scalar, in
which case it just loops through the array and performs the same operation.

I've added some logging to show the Runtime.getRuntime.freeMemory right
before it throws the OOM error, and it shows plenty of memory (16 GB, when
I was running on a large node) still available. So I'm not sure if it's
some huge memory spike, or it's not actually seeing that available memory.

When the OOM does happen, it consistently happens at this line:
https://gist.github.com/alexnastetsky/581af2672328c4b8b023#file-mergearraysofelementwithcountudaf-scala-L59

java.lang.OutOfMemoryError: Java heap space
at
scala.reflect.ManifestFactory$$anon$1.newArray(Manifest.scala:165)
at
scala.reflect.ManifestFactory$$anon$1.newArray(Manifest.scala:164)
at org.apache.spark.sql.types.ArrayData.toArray(ArrayData.scala:108)
at
org.apache.spark.sql.catalyst.CatalystTypeConverters$MapConverter.toScala(CatalystTypeConverters.scala:235)
at
org.apache.spark.sql.catalyst.CatalystTypeConverters$MapConverter.toScala(CatalystTypeConverters.scala:193)
at
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToScalaConverter$2.apply(CatalystTypeConverters.scala:414)
at
org.apache.spark.sql.execution.aggregate.InputAggregationBuffer.get(udaf.scala:298)
at org.apache.spark.sql.Row$class.getAs(Row.scala:316)
at
org.apache.spark.sql.execution.aggregate.InputAggregationBuffer.getAs(udaf.scala:269)
at
com.verve.scorecard.spark.sql.MergeArraysOfElementWithCountUDAF.merge(MergeArraysOfElementWithCountUDAF.scala:59)

Again, the data is tiny and it doesn't explain why it only happens some of
the time on YARN, and never when running in local mode.

Here's how I am running the app:

spark-submit \
--deploy-mode cluster \
--master yarn \
--num-executors 1 \
--executor-cores 1 \
--executor-memory 18g \
--driver-java-options "-XX:MaxPermSize=256m" \
--conf "spark.executor.extraJavaOptions=-XX:MaxPermSize=256m" \
--conf spark.storage.memoryFraction=0.2 \
--conf spark.shuffle.memoryFraction=0.6 \
--conf spark.sql.shuffle.partitions=1000 \
[...app specific stuff...]

Note that I am using a single executor and single core to help with
debugging, but I have the same issue with more executors/nodes.

I am running this on EMR on AWS, so this is unlikely to be a hardware issue
(different hardware each time I launch a cluster).

I've also isolated the issue to this UDAF, as removing it from my Spark SQL
makes the issue go away.

Any ideas would be appreciated.

Thanks,
Alex.


Re: Spark 1.5 UDAF ArrayType

2015-11-10 Thread Alex Nastetsky
Hi,

I believe I ran into the same bug in 1.5.0, although my error looks like
this:

Caused by: java.lang.ClassCastException:
[Lcom.verve.spark.sql.ElementWithCount; cannot be cast to
org.apache.spark.sql.types.ArrayData
at
org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getArray(rows.scala:47)
at
org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getArray(rows.scala:247)
at
org.apache.spark.sql.catalyst.expressions.JoinedRow.getArray(JoinedRow.scala:108)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
Source)
at
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$32.apply(AggregationIterator.scala:373)

...

I confirmed that it's fixed in 1.5.1, but unfortunately I'm using AWS EMR
4.1.0 (the latest), which has Spark 1.5.0. Are there any workarounds in
1.5.0?

Thanks.


> Michael



Thank you for your prompt answer. I will repost after I try this again on
> 1.5.1 or branch-1.5. In addition a blog post on SparkSQL data types would
> be very helpful. I am familiar with the Hive data types, but there is very
> little documentation on Spark SQL data types. Regards
>


Deenar On 22 September 2015 at 19:28, Michael Armbrust <
> mich...@databricks.com>
> wrote:



> I think that you are hitting a bug (which should be fixed in Spark
> > 1.5.1). I'm hoping we can cut an RC for that this week. Until then you
> > could try building branch-1.5.
> >
> > On Tue, Sep 22, 2015 at 11:13 AM, Deenar Toraskar <
> > deenar.toras...@gmail.com> wrote:
> >
> >> Hi
> >>
> >> I am trying to write an UDAF ArraySum, that does element wise sum of
> >> arrays of Doubles returning an array of Double following the sample in
> >>
> >>
> https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-datetimestring-handling-time-intervals-and-udafs.html
> .
> >> I am getting the following error. Any guidance on handle complex type in
> >> Spark SQL would be appreciated.
> >>
> >> Regards
> >> Deenar
> >>
> >> import org.apache.spark.sql.expressions.MutableAggregationBuffer
> >> import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
> >> import org.apache.spark.sql.Row
> >> import org.apache.spark.sql.types._
> >> import org.apache.spark.sql.functions._
> >>
> >> class ArraySum extends UserDefinedAggregateFunction {
> >> def inputSchema: org.apache.spark.sql.types.StructType =
> >> StructType(StructField("value", ArrayType(DoubleType, false)) :: Nil)
> >>
> >> def bufferSchema: StructType =
> >> StructType(StructField("value", ArrayType(DoubleType, false)) :: Nil)
> >>
> >> def dataType: DataType = ArrayType(DoubleType, false)
> >>
> >> def deterministic: Boolean = true
> >>
> >> def initialize(buffer: MutableAggregationBuffer): Unit = {
> >> buffer(0) = Nil
> >> }
> >>
> >> def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
> >> val currentSum : Seq[Double] = buffer.getSeq(0)
> >> val currentRow : Seq[Double] = input.getSeq(0)
> >> buffer(0) = (currentSum, currentRow) match {
> >> case (Nil, Nil) => Nil
> >> case (Nil, row) => row
> >> case (sum, Nil) => sum
> >> case (sum, row) => (seq, anotherSeq).zipped.map{ case (a, b) => a +
> >> b }
> >> // TODO handle different sizes arrays here
> >> }
> >> }
> >>
> >> def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
> >> val currentSum : Seq[Double] = buffer1.getSeq(0)
> >> val currentRow : Seq[Double] = buffer2.getSeq(0)
> >> buffer1(0) = (currentSum, currentRow) match {
> >> case (Nil, Nil) => Nil
> >> case (Nil, row) => row
> >> case (sum, Nil) => sum
> >> case (sum, row) => (seq, anotherSeq).zipped.map{ case (a, b) => a +
> >> b }
> >> // TODO handle different sizes arrays here
> >> }
> >> }
> >>
> >> def evaluate(buffer: Row): Any = {
> >> buffer.getSeq(0)
> >> }
> >> }
> >>
> >> val arraySum = new ArraySum
> >> sqlContext.udf.register("ArraySum", arraySum)
> >>
> >> *%sql select ArraySum(Array(1.0,2.0,3.0)) from pnls where date =
> >> '2015-05-22' limit 10*
> >>
> >> gives me the following error
> >>
> >>
> >> Error in SQL statement: SparkException: Job aborted due to stage
> failure:
> >> Task 0 in stage 219.0 failed 4 times, most recent failure: Lost task
> 0.3 in
> >> stage 219.0 (TID 11242, 10.172.255.236): java.lang.ClassCastException:
> >> scala.collection.mutable.WrappedArray$ofRef cannot be cast to
> >> org.apache.spark.sql.types.ArrayData at
> >>
> org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getArray(rows.scala:47)
> >> at
> >>
> org.apache.spark.sql.catalyst.expressions.GenericMutableRow.getArray(rows.scala:247)
> >> at
> >>
> org.apache.spark.sql.catalyst.expressions.JoinedRow.getArray(JoinedRow.scala:108)
> >> at
> >>
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
> >> Source) at
> >>
> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$32.apply(AggregationIterator.scala:373)
> >> at
> >>
> 

Re: Sort Merge Join

2015-11-02 Thread Alex Nastetsky
Thanks for the response.

Taking the file system based data source as “UnknownPartitioning”, will be
a simple and SAFE way for JOIN, as it’s hard to guarantee the records from
different data sets with the identical join keys will be loaded by the same
node/task , since lots of factors need to be considered, like task pool
size, cluster size, source format, storage, data locality etc.,.

I’ll agree it’s worth to optimize it for performance concerns, and actually
in Hive, it is called bucket join. I am not sure will that happens soon in
Spark SQL.


Yes, this is supported in

   - Hive with bucket join
   - Pig with USING "merge"
   <https://pig.apache.org/docs/r0.15.0/perf.html#merge-joins>
   - MR with CompositeInputFormat

But I guess it's not supported in Spark?

On Mon, Nov 2, 2015 at 12:32 AM, Cheng, Hao <hao.ch...@intel.com> wrote:

> 1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For
> example, in the code below, the two datasets have different number of
> partitions, but it still does a SortMerge join after a "hashpartitioning".
>
>
>
> [Hao:] A distributed JOIN operation (either HashBased or SortBased Join)
> requires the records with the identical join keys MUST BE shuffled to the
> same “reducer” node / task, hashpartitioning is just a strategy to tell
> spark shuffle service how to achieve that, in theory, we even can use the
> `RangePartitioning` instead (but it’s less efficient, that’s why we don’t
> choose it for JOIN). So conceptually the JOIN operator doesn’t care so much
> about the shuffle strategy so much if it satisfies the demand on data
> distribution.
>
>
>
> 2) If both datasets have already been previously partitioned/sorted the
> same and stored on the file system (e.g. in a previous job), is there a way
> to tell Spark this so that it won't want to do a "hashpartitioning" on
> them? It looks like Spark just considers datasets that have been just read
> from the the file system to have UnknownPartitioning. In the example below,
> I try to join a dataframe to itself, and it still wants to hash repartition.
>
>
>
> [Hao:] Take this as example:
>
>
>
> EXPLAIN SELECT a.value, b.value, c.value FROM src a JOIN src b ON
> a.key=b.key JOIN src c ON b.key=c.key
>
>
>
> == Physical Plan ==
>
> TungstenProject [value#20,value#22,value#24]
>
> SortMergeJoin [key#21], [key#23]
>
>   TungstenSort [key#21 ASC], false, 0
>
>TungstenProject [key#21,value#22,value#20]
>
> SortMergeJoin [key#19], [key#21]
>
>  TungstenSort [key#19 ASC], false, 0
>
>   TungstenExchange hashpartitioning(key#19,200)
>
>ConvertToUnsafe
>
> HiveTableScan [key#19,value#20], (MetastoreRelation default, src,
> Some(a))
>
>  TungstenSort [key#21 ASC], false, 0
>
>   TungstenExchange hashpartitioning(key#21,200)
>
>ConvertToUnsafe
>
> HiveTableScan [key#21,value#22], (MetastoreRelation default, src,
> Some(b))
>
>   TungstenSort [key#23 ASC], false, 0
>
>TungstenExchange hashpartitioning(key#23,200)
>
> ConvertToUnsafe
>
>  HiveTableScan [key#23,value#24], (MetastoreRelation default, src,
> Some(c))
>
>
>
> There is no hashpartitioning anymore for the RESULT of “FROM src a JOIN
> src b ON a.key=b.key”, as we didn’t change the data distribution after
> it, so we can join another table “JOIN src c ON b.key=c.key” directly,
> which only require the table “c” for repartitioning on “key”.
>
>
>
> Taking the file system based data source as “UnknownPartitioning”, will
> be a simple and SAFE way for JOIN, as it’s hard to guarantee the records
> from different data sets with the identical join keys will be loaded by the
> same node/task , since lots of factors need to be considered, like task
> pool size, cluster size, source format, storage, data locality etc.,.
>
> I’ll agree it’s worth to optimize it for performance concerns, and
> actually in Hive, it is called bucket join. I am not sure will that happens
> soon in Spark SQL.
>
>
>
> Hao
>
>
>
> *From:* Alex Nastetsky [mailto:alex.nastet...@vervemobile.com]
> *Sent:* Monday, November 2, 2015 11:29 AM
> *To:* user
> *Subject:* Sort Merge Join
>
>
>
> Hi,
>
>
>
> I'm trying to understand SortMergeJoin (SPARK-2213).
>
>
>
> 1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For
> example, in the code below, the two datasets have different number of
> partitions, but it still does a SortMerge join after a "hashpartitioning".
>
>
>
> CODE:
>
>val sparkConf = new SparkConf()
>
>   .setAppName("SortMergeJoinTest")
>
&

Sort Merge Join

2015-11-01 Thread Alex Nastetsky
Hi,

I'm trying to understand SortMergeJoin (SPARK-2213).

1) Once SortMergeJoin is enabled, will it ever use ShuffledHashJoin? For
example, in the code below, the two datasets have different number of
partitions, but it still does a SortMerge join after a "hashpartitioning".


CODE:
   val sparkConf = new SparkConf()
  .setAppName("SortMergeJoinTest")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.eventLog.enabled", "true")
  .set("spark.sql.planner.sortMergeJoin","true")

sparkConf.setMaster("local-cluster[3,1,1024]")

val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

val inputpath = input.gz.parquet

val df1 = sqlContext.read.parquet(inputpath).repartition(3)
val df2 = sqlContext.read.parquet(inputpath).repartition(5)
val result = df1.join(df2.withColumnRenamed("foo","foo2"), $"foo" ===
$"foo2")
result.explain()

OUTPUT:
== Physical Plan ==
SortMergeJoin [foo#0], [foo2#8]
TungstenSort [foo#0 ASC], false, 0
  TungstenExchange hashpartitioning(foo#0)
  ConvertToUnsafe
Repartition 3, true
Scan
ParquetRelation[file:input.gz.parquet][foo#0,bar#1L,somefield#2,anotherfield#3]
TungstenSort [foo2#8 ASC], false, 0
  TungstenExchange hashpartitioning(foo2#8)
  TungstenProject [foo#4 AS foo2#8,bar#5L,somefield#6,anotherfield#7]
Repartition 5, true
Scan
ParquetRelation[file:input.gz.parquet][foo#4,bar#5L,somefield#6,anotherfield#7]

2) If both datasets have already been previously partitioned/sorted the
same and stored on the file system (e.g. in a previous job), is there a way
to tell Spark this so that it won't want to do a "hashpartitioning" on
them? It looks like Spark just considers datasets that have been just read
from the the file system to have UnknownPartitioning. In the example below,
I try to join a dataframe to itself, and it still wants to hash repartition.

CODE:
...
val df1 = sqlContext.read.parquet(inputpath)
val result = df1.join(df1.withColumnRenamed("foo","foo2"), $"foo" ===
$"foo2")
result.explain()

OUTPUT:
== Physical Plan ==
SortMergeJoin [foo#0], [foo2#4]
TungstenSort [foo#0 ASC], false, 0
  TungstenExchange hashpartitioning(foo#0)
  ConvertToUnsafe
Scan
ParquetRelation[file:input.gz.parquet][foo#0,bar#1L,somefield#2,anotherfield#3]
TungstenSort [foo2#4 ASC], false, 0
  TungstenExchange hashpartitioning(foo2#4)
  ConvertToUnsafe
Project [foo#5 AS foo2#4,bar#6L,somefield#7,anotherfield#8]
Scan
ParquetRelation[file:input.gz.parquet][foo#5,bar#6L,somefield#7,anotherfield#8]


Thanks.


foreachPartition

2015-10-30 Thread Alex Nastetsky
I'm just trying to do some operation inside foreachPartition, but I can't
even get a simple println to work. Nothing gets printed.

scala> val a = sc.parallelize(List(1,2,3))
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize
at :21

scala> a.foreachPartition(p => println("foo"))
2015-10-30 23:38:54,643 INFO  [main] spark.SparkContext
(Logging.scala:logInfo(59)) - Starting job: foreachPartition at :24
2015-10-30 23:38:54,644 INFO  [dag-scheduler-event-loop]
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Got job 9
(foreachPartition at :24) with 3 output partitions
(allowLocal=false)
2015-10-30 23:38:54,644 INFO  [dag-scheduler-event-loop]
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Final stage:
ResultStage 9(foreachPartition at :24)
2015-10-30 23:38:54,645 INFO  [dag-scheduler-event-loop]
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Parents of final
stage: List()
2015-10-30 23:38:54,645 INFO  [dag-scheduler-event-loop]
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Missing parents: List()
2015-10-30 23:38:54,646 INFO  [dag-scheduler-event-loop]
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting ResultStage
9 (ParallelCollectionRDD[2] at parallelize at :21), which has no
missing parents
2015-10-30 23:38:54,648 INFO  [dag-scheduler-event-loop]
storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(1224)
called with curMem=14486, maxMem=280496701
2015-10-30 23:38:54,649 INFO  [dag-scheduler-event-loop]
storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_9 stored
as values in memory (estimated size 1224.0 B, free 267.5 MB)
2015-10-30 23:38:54,680 INFO  [dag-scheduler-event-loop]
storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(871)
called with curMem=15710, maxMem=280496701
2015-10-30 23:38:54,681 INFO  [dag-scheduler-event-loop]
storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_9_piece0
stored as bytes in memory (estimated size 871.0 B, free 267.5 MB)
2015-10-30 23:38:54,685 INFO
 [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerInfo
(Logging.scala:logInfo(59)) - Added broadcast_9_piece0 in memory on
10.170.11.94:35814 (size: 871.0 B, free: 267.5 MB)
2015-10-30 23:38:54,688 INFO
 [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerInfo
(Logging.scala:logInfo(59)) - Removed broadcast_4_piece0 on
10.170.11.94:35814 in memory (size: 2.0 KB, free: 267.5 MB)
2015-10-30 23:38:54,691 INFO
 [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerInfo
(Logging.scala:logInfo(59)) - Removed broadcast_4_piece0 on
ip-10-51-144-180.ec2.internal:35111 in memory (size: 2.0 KB, free: 535.0 MB)
2015-10-30 23:38:54,691 INFO
 [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerInfo
(Logging.scala:logInfo(59)) - Removed broadcast_4_piece0 on
ip-10-51-144-180.ec2.internal:49833 in memory (size: 2.0 KB, free: 535.0 MB)
2015-10-30 23:38:54,694 INFO
 [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerInfo
(Logging.scala:logInfo(59)) - Removed broadcast_4_piece0 on
ip-10-51-144-180.ec2.internal:34776 in memory (size: 2.0 KB, free: 535.0 MB)
2015-10-30 23:38:54,702 INFO  [dag-scheduler-event-loop] spark.SparkContext
(Logging.scala:logInfo(59)) - Created broadcast 9 from broadcast at
DAGScheduler.scala:874
2015-10-30 23:38:54,703 INFO  [dag-scheduler-event-loop]
scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting 3 missing
tasks from ResultStage 9 (ParallelCollectionRDD[2] at parallelize at
:21)
2015-10-30 23:38:54,703 INFO  [dag-scheduler-event-loop]
cluster.YarnScheduler (Logging.scala:logInfo(59)) - Adding task set 9.0
with 3 tasks
2015-10-30 23:38:54,708 INFO
 [sparkDriver-akka.actor.default-dispatcher-15] scheduler.TaskSetManager
(Logging.scala:logInfo(59)) - Starting task 0.0 in stage 9.0 (TID 27,
ip-10-51-144-180.ec2.internal, PROCESS_LOCAL, 1313 bytes)
2015-10-30 23:38:54,711 INFO
 [sparkDriver-akka.actor.default-dispatcher-15] scheduler.TaskSetManager
(Logging.scala:logInfo(59)) - Starting task 1.0 in stage 9.0 (TID 28,
ip-10-51-144-180.ec2.internal, PROCESS_LOCAL, 1313 bytes)
2015-10-30 23:38:54,713 INFO  [sparkDriver-akka.actor.default-dispatcher-2]
storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Removed
broadcast_5_piece0 on 10.170.11.94:35814 in memory (size: 802.0 B, free:
267.5 MB)
2015-10-30 23:38:54,714 INFO
 [sparkDriver-akka.actor.default-dispatcher-15] scheduler.TaskSetManager
(Logging.scala:logInfo(59)) - Starting task 2.0 in stage 9.0 (TID 29,
ip-10-51-144-180.ec2.internal, PROCESS_LOCAL, 1313 bytes)
2015-10-30 23:38:54,716 INFO  [sparkDriver-akka.actor.default-dispatcher-2]
storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Removed
broadcast_5_piece0 on ip-10-51-144-180.ec2.internal:34776 in memory (size:
802.0 B, free: 535.0 MB)
2015-10-30 23:38:54,719 INFO  [sparkDriver-akka.actor.default-dispatcher-2]
storage.BlockManagerInfo (Logging.scala:logInfo(59)) - Removed
broadcast_5_piece0 on 

Re: foreachPartition

2015-10-30 Thread Alex Nastetsky
Ahh, makes sense. Knew it was going to be something simple. Thanks.

On Fri, Oct 30, 2015 at 7:45 PM, Mark Hamstra <m...@clearstorydata.com>
wrote:

> The closure is sent to and executed an Executor, so you need to be looking
> at the stdout of the Executors, not on the Driver.
>
> On Fri, Oct 30, 2015 at 4:42 PM, Alex Nastetsky <
> alex.nastet...@vervemobile.com> wrote:
>
>> I'm just trying to do some operation inside foreachPartition, but I can't
>> even get a simple println to work. Nothing gets printed.
>>
>> scala> val a = sc.parallelize(List(1,2,3))
>> a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at
>> parallelize at :21
>>
>> scala> a.foreachPartition(p => println("foo"))
>> 2015-10-30 23:38:54,643 INFO  [main] spark.SparkContext
>> (Logging.scala:logInfo(59)) - Starting job: foreachPartition at :24
>> 2015-10-30 23:38:54,644 INFO  [dag-scheduler-event-loop]
>> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Got job 9
>> (foreachPartition at :24) with 3 output partitions
>> (allowLocal=false)
>> 2015-10-30 23:38:54,644 INFO  [dag-scheduler-event-loop]
>> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Final stage:
>> ResultStage 9(foreachPartition at :24)
>> 2015-10-30 23:38:54,645 INFO  [dag-scheduler-event-loop]
>> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Parents of final
>> stage: List()
>> 2015-10-30 23:38:54,645 INFO  [dag-scheduler-event-loop]
>> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Missing parents: List()
>> 2015-10-30 23:38:54,646 INFO  [dag-scheduler-event-loop]
>> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting ResultStage
>> 9 (ParallelCollectionRDD[2] at parallelize at :21), which has no
>> missing parents
>> 2015-10-30 23:38:54,648 INFO  [dag-scheduler-event-loop]
>> storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(1224)
>> called with curMem=14486, maxMem=280496701
>> 2015-10-30 23:38:54,649 INFO  [dag-scheduler-event-loop]
>> storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_9 stored
>> as values in memory (estimated size 1224.0 B, free 267.5 MB)
>> 2015-10-30 23:38:54,680 INFO  [dag-scheduler-event-loop]
>> storage.MemoryStore (Logging.scala:logInfo(59)) - ensureFreeSpace(871)
>> called with curMem=15710, maxMem=280496701
>> 2015-10-30 23:38:54,681 INFO  [dag-scheduler-event-loop]
>> storage.MemoryStore (Logging.scala:logInfo(59)) - Block broadcast_9_piece0
>> stored as bytes in memory (estimated size 871.0 B, free 267.5 MB)
>> 2015-10-30 23:38:54,685 INFO
>>  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerInfo
>> (Logging.scala:logInfo(59)) - Added broadcast_9_piece0 in memory on
>> 10.170.11.94:35814 (size: 871.0 B, free: 267.5 MB)
>> 2015-10-30 23:38:54,688 INFO
>>  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerInfo
>> (Logging.scala:logInfo(59)) - Removed broadcast_4_piece0 on
>> 10.170.11.94:35814 in memory (size: 2.0 KB, free: 267.5 MB)
>> 2015-10-30 23:38:54,691 INFO
>>  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerInfo
>> (Logging.scala:logInfo(59)) - Removed broadcast_4_piece0 on
>> ip-10-51-144-180.ec2.internal:35111 in memory (size: 2.0 KB, free: 535.0 MB)
>> 2015-10-30 23:38:54,691 INFO
>>  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerInfo
>> (Logging.scala:logInfo(59)) - Removed broadcast_4_piece0 on
>> ip-10-51-144-180.ec2.internal:49833 in memory (size: 2.0 KB, free: 535.0 MB)
>> 2015-10-30 23:38:54,694 INFO
>>  [sparkDriver-akka.actor.default-dispatcher-15] storage.BlockManagerInfo
>> (Logging.scala:logInfo(59)) - Removed broadcast_4_piece0 on
>> ip-10-51-144-180.ec2.internal:34776 in memory (size: 2.0 KB, free: 535.0 MB)
>> 2015-10-30 23:38:54,702 INFO  [dag-scheduler-event-loop]
>> spark.SparkContext (Logging.scala:logInfo(59)) - Created broadcast 9 from
>> broadcast at DAGScheduler.scala:874
>> 2015-10-30 23:38:54,703 INFO  [dag-scheduler-event-loop]
>> scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Submitting 3 missing
>> tasks from ResultStage 9 (ParallelCollectionRDD[2] at parallelize at
>> :21)
>> 2015-10-30 23:38:54,703 INFO  [dag-scheduler-event-loop]
>> cluster.YarnScheduler (Logging.scala:logInfo(59)) - Adding task set 9.0
>> with 3 tasks
>> 2015-10-30 23:38:54,708 INFO
>>  [sparkDriver-akka.actor.default-dispatcher-15] scheduler.TaskSetManager
>> (Logging.scala:logInfo(59)) - Starting task 0.0 in stage 9.0 (TID 27,
>> ip-10-51-144-180.ec2.internal, PROCESS_LOCAL, 1313 bytes)
>> 2015-

CompositeInputFormat in Spark

2015-10-30 Thread Alex Nastetsky
Does Spark have an implementation similar to CompositeInputFormat in
MapReduce?

CompositeInputFormat joins multiple datasets prior to the mapper, that are
partitioned the same way with the same number of partitions, using the
"part" number in the file name in each dataset to figure out which file to
join with its counterparts in the other datasets.

Here is a similar question from earlier this year:

http://mail-archives.us.apache.org/mod_mbox/spark-user/201505.mbox/%3CCADrn=epwl6ghs9hfyo3csuxhshtycsrlbujcmpxrtz4zype...@mail.gmail.com%3E

>From what I can tell, there's no way to tell Spark about how a dataset had
been previously partitioned, other than repartitioning it in order to
achieve a map-side join with a similarly partitioned dataset.


Re: writing avro parquet

2015-10-19 Thread Alex Nastetsky
Figured it out ... needed to use saveAsNewAPIHadoopFile, but was trying to
use it on myDF.rdd instead of converting it to a PairRDD first.

On Mon, Oct 19, 2015 at 2:14 PM, Alex Nastetsky <
alex.nastet...@vervemobile.com> wrote:

> Using Spark 1.5.1, Parquet 1.7.0.
>
> I'm trying to write Avro/Parquet files. I have this code:
>
> sc.hadoopConfiguration.set(ParquetOutputFormat.WRITE_SUPPORT_CLASS,
> classOf[AvroWriteSupport].getName)
> AvroWriteSupport.setSchema(sc.hadoopConfiguration, MyClass.SCHEMA$)
> myDF.write.parquet(outputPath)
>
> The problem is that the write support class gets overwritten in
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation#prepareJobForWrite:
>
> val writeSupportClass =
> if
> (dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
> classOf[MutableRowWriteSupport]
> } else {
> classOf[RowWriteSupport]
> }
> ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass)
>
> So it doesn't seem to actually write Avro data. When look at the metadata
> of the Parquet files it writes, it looks like this:
>
> extra: org.apache.spark.sql.parquet.row.metadata =
> {"type":"struct","fields":[{"name":"foo","type":"string","nullable":true,"metadata":{}},{"name":"bar","type":"long","nullable":true,"metadata":{}}]}
>
> I would expect to see something like "extra:  avro.schema" instead.
>


writing avro parquet

2015-10-19 Thread Alex Nastetsky
Using Spark 1.5.1, Parquet 1.7.0.

I'm trying to write Avro/Parquet files. I have this code:

sc.hadoopConfiguration.set(ParquetOutputFormat.WRITE_SUPPORT_CLASS,
classOf[AvroWriteSupport].getName)
AvroWriteSupport.setSchema(sc.hadoopConfiguration, MyClass.SCHEMA$)
myDF.write.parquet(outputPath)

The problem is that the write support class gets overwritten in
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation#prepareJobForWrite:

val writeSupportClass =
if
(dataSchema.map(_.dataType).forall(ParquetTypesConverter.isPrimitiveType)) {
classOf[MutableRowWriteSupport]
} else {
classOf[RowWriteSupport]
}
ParquetOutputFormat.setWriteSupportClass(job, writeSupportClass)

So it doesn't seem to actually write Avro data. When look at the metadata
of the Parquet files it writes, it looks like this:

extra: org.apache.spark.sql.parquet.row.metadata =
{"type":"struct","fields":[{"name":"foo","type":"string","nullable":true,"metadata":{}},{"name":"bar","type":"long","nullable":true,"metadata":{}}]}

I would expect to see something like "extra:  avro.schema" instead.


spark-avro 2.0.1 generates strange schema (spark-avro 1.0.0 is fine)

2015-10-14 Thread Alex Nastetsky
I save my dataframe to avro with spark-avro 1.0.0 and it looks like this
(using avro-tools tojson):

{"field1":"value1","field2":976200}
{"field1":"value2","field2":976200}
{"field1":"value3","field2":614100}

But when I use spark-avro 2.0.1, it looks like this:

{"field1":{"string":"value1"},"field2":{"long":976200}}
{"field1":{"string":"value2"},"field2":{"long":976200}}
{"field1":{"string":"value3"},"field2":{"long":614100}}

At this point I'd be happy to use spark-avro 1.0.0, except that it doesn't
seem to support specifying a compression codec (I want deflate).


Re: spark-avro 2.0.1 generates strange schema (spark-avro 1.0.0 is fine)

2015-10-14 Thread Alex Nastetsky
Here you go: https://github.com/databricks/spark-avro/issues/92

Thanks.

On Wed, Oct 14, 2015 at 4:41 PM, Josh Rosen <rosenvi...@gmail.com> wrote:

> Can you report this as an issue at
> https://github.com/databricks/spark-avro/issues so that it's easier to
> track? Thanks!
>
> On Wed, Oct 14, 2015 at 1:38 PM, Alex Nastetsky <
> alex.nastet...@vervemobile.com> wrote:
>
>> I save my dataframe to avro with spark-avro 1.0.0 and it looks like this
>> (using avro-tools tojson):
>>
>> {"field1":"value1","field2":976200}
>> {"field1":"value2","field2":976200}
>> {"field1":"value3","field2":614100}
>>
>> But when I use spark-avro 2.0.1, it looks like this:
>>
>> {"field1":{"string":"value1"},"field2":{"long":976200}}
>> {"field1":{"string":"value2"},"field2":{"long":976200}}
>> {"field1":{"string":"value3"},"field2":{"long":614100}}
>>
>> At this point I'd be happy to use spark-avro 1.0.0, except that it
>> doesn't seem to support specifying a compression codec (I want deflate).
>>
>
>


dataframes and numPartitions

2015-10-14 Thread Alex Nastetsky
A lot of RDD methods take a numPartitions parameter that lets you specify
the number of partitions in the result. For example, groupByKey.

The DataFrame counterparts don't have a numPartitions parameter, e.g.
groupBy only takes a bunch of Columns as params.

I understand that the DataFrame API is supposed to be smarter and go
through a LogicalPlan, and perhaps determine the number of optimal
partitions for you, but sometimes you want to specify the number of
partitions yourself. One such use case is when you are preparing to do a
"merge" join with another dataset that is similarly partitioned with the
same number of partitions.


dataframe json schema scan

2015-08-20 Thread Alex Nastetsky
The doc for DataFrameReader#json(RDD[String]) method says

Unless the schema is specified using schema function, this function goes
through the input once to determine the input schema.

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameReader

Why is this necessary? Why can't it create the dataframe at the same time
as it's determining the schema?

Thanks.


Re: restart from last successful stage

2015-07-29 Thread Alex Nastetsky
I meant a restart by the user, as ayan said.

I was thinking of a case where e.g. a Spark conf setting wrong and the job
failed in Stage 1, in my example .. and we want to rerun the job with the
right conf without rerunning Stage 0. Having this re-start capability may
cause some chaos if it would have changed how Stage 0 runs, possibly
creating partition incompatibilities or something else.

Also, another option is to just persist the data from Stage 0 (i.e.
sc.saveAs) and then run a modified version of the job that skips Stage
0, assuming you have a full understanding of the breakdown of stages in
your job.

On Tue, Jul 28, 2015 at 9:28 PM, Tathagata Das t...@databricks.com wrote:

 Okay, may I am confused on the word would be useful to *restart* from the
 output of stage 0 ... did the OP mean restart by the user or restart
 automatically by the system?

 On Tue, Jul 28, 2015 at 3:43 PM, ayan guha guha.a...@gmail.com wrote:

 Hi

 I do not think op asks about attempt failure but stage failure and
 finally leading to job failure. In that case, rdd info from last run is
 gone even if from cache, isn't it?

 Ayan
 On 29 Jul 2015 07:01, Tathagata Das t...@databricks.com wrote:

 If you are using the same RDDs in the both the attempts to run the job,
 the previous stage outputs generated in the previous job will indeed be
 reused.
 This applies to core though. For dataframes, depending on what you do,
 the physical plan may get generated again leading to new RDDs which may
 cause recomputing all the stages. Consider running the job by generating
 the RDD from Dataframe and then using that.

 Of course, you can use caching in both core and DataFrames, which will
 solve all these concerns.

 On Tue, Jul 28, 2015 at 1:03 PM, Alex Nastetsky 
 alex.nastet...@vervemobile.com wrote:

 Is it possible to restart the job from the last successful stage
 instead of from the beginning?

 For example, if your job has stages 0, 1 and 2 .. and stage 0 takes a
 long time and is successful, but the job fails on stage 1, it would be
 useful to be able to restart from the output of stage 0 instead of from the
 beginning.

 Note that I am NOT talking about Spark Streaming, just Spark Core (and
 DataFrames), not sure if the case would be different with Streaming.

 Thanks.






restart from last successful stage

2015-07-28 Thread Alex Nastetsky
Is it possible to restart the job from the last successful stage instead of
from the beginning?

For example, if your job has stages 0, 1 and 2 .. and stage 0 takes a long
time and is successful, but the job fails on stage 1, it would be useful to
be able to restart from the output of stage 0 instead of from the beginning.

Note that I am NOT talking about Spark Streaming, just Spark Core (and
DataFrames), not sure if the case would be different with Streaming.

Thanks.


Calling rdd() on a DataFrame causes stage boundary

2015-06-22 Thread Alex Nastetsky
When I call rdd() on a DataFrame, it ends the current stage and starts a
new one that just maps the DataFrame to rdd and nothing else. It doesn't
seem to do a shuffle (which is good and expected), but then why does why is
there a separate stage?

I also thought that stages only end when there's a shuffle or the job ends
with the action that triggered the job.

Thanks.


different schemas per row with DataFrames

2015-06-18 Thread Alex Nastetsky
I am reading JSON data that has different schemas for every record. That
is, for a given field that would have a null value, it's simply absent from
that record (and therefore, its schema).

I would like to use the DataFrame API to select specific fields from this
data, and for fields that are missing from a record, to default to null or
an empty string.

Is this possible or can DataFrames only handle a single consistent schema
throughout the data?

One thing I noticed is that the schema of the DataFrame is the superset of
all the records in it, so if record A has field X, but record B does not,
it will show up in B as null because it's part of the DataFrame's schema
(because A has it). But if none of the records have field X, then
referencing that field will result in an error about not being able to
resolve that column.

If I know the schema of all possible fields and the order in which they
occur, it may be possible to get the RDD from the DataFrame and build my
own DataFrame with createDataFrame and passing it my fabricated
super-schema. However, this is brittle, as the super-schema is not in my
control and may change in the future.

Thanks for any suggestions,
Alex.