some joins stopped working with spark 2.0.0 SNAPSHOT

2016-02-26 Thread Koert Kuipers
dataframe df1:
schema:
StructType(StructField(x,IntegerType,true))
explain:
== Physical Plan ==
MapPartitions , obj#135: object, [if (input[0, object].isNullAt)
null else input[0, object].get AS x#128]
+- MapPartitions , createexternalrow(if (isnull(x#9)) null else
x#9), [input[0, object] AS obj#135]
   +- WholeStageCodegen
  :  +- Project [_1#8 AS x#9]
  : +- Scan ExistingRDD[_1#8]
show:
+---+
|  x|
+---+
|  2|
|  3|
+---+


dataframe df2:
schema:
StructType(StructField(x,IntegerType,true), StructField(y,StringType,true))
explain:
== Physical Plan ==
MapPartitions , createexternalrow(x#2, if (isnull(y#3)) null
else y#3.toString), [if (input[0, object].isNullAt) null else input[0,
object].get AS x#130,if (input[0, object].isNullAt) null else
staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType,
fromString, input[0, object].get, true) AS y#131]
+- WholeStageCodegen
   :  +- Project [_1#0 AS x#2,_2#1 AS y#3]
   : +- Scan ExistingRDD[_1#0,_2#1]
show:
+---+---+
|  x|  y|
+---+---+
|  1|  1|
|  2|  2|
|  3|  3|
+---+---+


i run:
df1.join(df2, Seq("x")).show

i get:
java.lang.UnsupportedOperationException: No size estimation available for
objects.
at org.apache.spark.sql.types.ObjectType.defaultSize(ObjectType.scala:41)
at
org.apache.spark.sql.catalyst.plans.logical.UnaryNode$$anonfun$6.apply(LogicalPlan.scala:323)
at
org.apache.spark.sql.catalyst.plans.logical.UnaryNode$$anonfun$6.apply(LogicalPlan.scala:323)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.immutable.List.map(List.scala:285)
at
org.apache.spark.sql.catalyst.plans.logical.UnaryNode.statistics(LogicalPlan.scala:323)
at
org.apache.spark.sql.execution.SparkStrategies$CanBroadcast$.unapply(SparkStrategies.scala:87)

now sure what changed, this ran about a week ago without issues (in our
internal unit tests). it is fully reproducible, however when i tried to
minimize the issue i could not reproduce it by just creating data frames in
the repl with the same contents, so it probably has something to do with
way these are created (from Row objects and StructTypes).

best, koert


Re: Spark 1.6.1

2016-02-26 Thread Josh Rosen
I updated the release packaging scripts to use SFTP via the *lftp* client:
https://github.com/apache/spark/pull/11350

I'm starting the process of cutting a 1.6.1-RC1 tag and release artifacts
right now, so please be extra careful about merging into branch-1.6 until
after the release. Once the RC packaging completes, Michael or I will email
the list to start a vote thread.

- Josh

On Wed, Feb 24, 2016 at 5:44 PM Yin Yang  wrote:

> Have you tried using scp ?
>
> scp file i...@people.apache.org
>
> Thanks
>
> On Wed, Feb 24, 2016 at 5:04 PM, Michael Armbrust 
> wrote:
>
>> Unfortunately I don't think thats sufficient as they don't seem to
>> support sftp in the same way they did before.  We'll still need to update
>> our release scripts.
>>
>> On Wed, Feb 24, 2016 at 2:09 AM, Yin Yang  wrote:
>>
>>> Looks like access to people.apache.org has been restored.
>>>
>>> FYI
>>>
>>> On Mon, Feb 22, 2016 at 10:07 PM, Luciano Resende 
>>>  wrote:
>>>


 On Mon, Feb 22, 2016 at 9:08 PM, Michael Armbrust <
 mich...@databricks.com> wrote:

> An update: people.apache.org has been shut down so the release
> scripts are broken. Will try again after we fix them.
>
>
 If you skip uploading to people.a.o, it should still be available in
 nexus for review.

 The other option is to add the RC into
 https://dist.apache.org/repos/dist/dev/



 --
 Luciano Resende
 http://people.apache.org/~lresende
 http://twitter.com/lresende1975
 http://lresende.blogspot.com/


>>
>


Re: Upgrading to Kafka 0.9.x

2016-02-26 Thread Joel Koshy
The 0.9 release still has the old consumer as Jay mentioned but this
specific release is a little unusual in that it also provides a completely
new consumer client.

Based on what I understand, users of Kafka need to upgrade their brokers to
> Kafka 0.9.x first, before they upgrade their clients to Kafka 0.9.x.



However, that presents a problem to other projects that integrate with
> Kafka (Spark, Flume, Storm, etc.).


This is true and we faced a similar issue at LinkedIn - there are scenarios
where it is useful/necessary to allow the client to be upgraded before the
broker. This improvement

can help with that although if users want to leverage newer server-side
features they would obviously need to upgrade the brokers.

Thanks,

Joel

On Fri, Feb 26, 2016 at 4:22 PM, Mark Grover  wrote:

> Thanks Jay. Yeah, if we were able to use the old consumer API from 0.9
> clients to work with 0.8 brokers that would have been super helpful here. I
> am just trying to avoid a scenario where Spark cares about new features
> from every new major release of Kafka (which is a good thing) but ends up
> having to keep multiple profiles/artifacts for it - one for 0.8.x, one for
> 0.9.x and another one, once 0.10.x gets released.
>
> So, anything that the Kafka community can do to alleviate the situation
> down the road would be great. Thanks again!
>
> On Fri, Feb 26, 2016 at 11:36 AM, Jay Kreps  wrote:
>
> > Hey, yeah, we'd really like to make this work well for you guys.
> >
> > I think there are actually maybe two questions here:
> > 1. How should this work in steady state?
> > 2. Given that there was a major reworking of the kafka consumer java
> > library for 0.9 how does that impact things right now? (
> >
> >
> http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client
> > )
> >
> > Quick recap of how we do compatibility, just so everyone is on the same
> > page:
> > 1. The protocol is versioned and the cluster supports multiple versions.
> > 2. As we evolve Kafka we always continue to support older versions of the
> > protocol an hence older clients continue to work with newer Kafka
> versions.
> > 2. In general we don't try to have the clients support older versions of
> > Kafka since, after all, the whole point of the new client is to add
> > features which often require those features to be in the broker.
> >
> > So I think in steady state the answer is to choose a conservative version
> > to build against and it's on us to keep that working as Kafka evolves. As
> > always there will be some tradeoff between using the newest features and
> > being compatible with old stuff.
> >
> > But that steady state question ignores the fact that we did a complete
> > rewrite of the consumer in 0.9. The old consumer is still there,
> supported,
> > and still works as before but the new consumer is the path forward and
> what
> > we are adding features to. At some point you will want to migrate to this
> > new api, which will be a non-trivial change to your code.
> >
> > This api has a couple of advantages for you guys (1) it supports
> security,
> > (2) It allows low-level control over partition assignment and offsets
> > without the crazy fiddliness of the old "simple consumer" api, (3) it no
> > longer directly accesses ZK, (4) no scala dependency and no dependency on
> > Kafka core. I think all four of these should be desirable for Spark et
> al.
> >
> > One thing we could discuss is the possibility of doing forwards and
> > backwards compatibility in the clients. I'm not sure this would actually
> > make things better, that would probably depend on the details of how it
> > worked.
> >
> > -Jay
> >
> >
> > On Fri, Feb 26, 2016 at 9:46 AM, Mark Grover  wrote:
> >
> > > Hi Kafka devs,
> > > I come to you with a dilemma and a request.
> > >
> > > Based on what I understand, users of Kafka need to upgrade their
> brokers
> > to
> > > Kafka 0.9.x first, before they upgrade their clients to Kafka 0.9.x.
> > >
> > > However, that presents a problem to other projects that integrate with
> > > Kafka (Spark, Flume, Storm, etc.). From here on, I will speak for
> Spark +
> > > Kafka, since that's the one I am most familiar with.
> > >
> > > In the light of compatibility (or the lack thereof) between 0.8.x and
> > > 0.9.x, Spark is faced with a problem of what version(s) of Kafka to be
> > > compatible with, and has 2 options (discussed in this PR
> > > ):
> > > 1. We either upgrade to Kafka 0.9, dropping support for 0.8. Storm and
> > > Flume are already on this path.
> > > 2. We introduce complexity in our code to support both 0.8 and 0.9 for
> > the
> > > entire duration of our next major release (Apache Spark 2.x).
> > >
> > > I'd love to hear your thoughts on which option, you recommend.
> > >
> > > Long 

Re: Upgrading to Kafka 0.9.x

2016-02-26 Thread Mark Grover
Thanks Jay. Yeah, if we were able to use the old consumer API from 0.9
clients to work with 0.8 brokers that would have been super helpful here. I
am just trying to avoid a scenario where Spark cares about new features
from every new major release of Kafka (which is a good thing) but ends up
having to keep multiple profiles/artifacts for it - one for 0.8.x, one for
0.9.x and another one, once 0.10.x gets released.

So, anything that the Kafka community can do to alleviate the situation
down the road would be great. Thanks again!

On Fri, Feb 26, 2016 at 11:36 AM, Jay Kreps  wrote:

> Hey, yeah, we'd really like to make this work well for you guys.
>
> I think there are actually maybe two questions here:
> 1. How should this work in steady state?
> 2. Given that there was a major reworking of the kafka consumer java
> library for 0.9 how does that impact things right now? (
>
> http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client
> )
>
> Quick recap of how we do compatibility, just so everyone is on the same
> page:
> 1. The protocol is versioned and the cluster supports multiple versions.
> 2. As we evolve Kafka we always continue to support older versions of the
> protocol an hence older clients continue to work with newer Kafka versions.
> 2. In general we don't try to have the clients support older versions of
> Kafka since, after all, the whole point of the new client is to add
> features which often require those features to be in the broker.
>
> So I think in steady state the answer is to choose a conservative version
> to build against and it's on us to keep that working as Kafka evolves. As
> always there will be some tradeoff between using the newest features and
> being compatible with old stuff.
>
> But that steady state question ignores the fact that we did a complete
> rewrite of the consumer in 0.9. The old consumer is still there, supported,
> and still works as before but the new consumer is the path forward and what
> we are adding features to. At some point you will want to migrate to this
> new api, which will be a non-trivial change to your code.
>
> This api has a couple of advantages for you guys (1) it supports security,
> (2) It allows low-level control over partition assignment and offsets
> without the crazy fiddliness of the old "simple consumer" api, (3) it no
> longer directly accesses ZK, (4) no scala dependency and no dependency on
> Kafka core. I think all four of these should be desirable for Spark et al.
>
> One thing we could discuss is the possibility of doing forwards and
> backwards compatibility in the clients. I'm not sure this would actually
> make things better, that would probably depend on the details of how it
> worked.
>
> -Jay
>
>
> On Fri, Feb 26, 2016 at 9:46 AM, Mark Grover  wrote:
>
> > Hi Kafka devs,
> > I come to you with a dilemma and a request.
> >
> > Based on what I understand, users of Kafka need to upgrade their brokers
> to
> > Kafka 0.9.x first, before they upgrade their clients to Kafka 0.9.x.
> >
> > However, that presents a problem to other projects that integrate with
> > Kafka (Spark, Flume, Storm, etc.). From here on, I will speak for Spark +
> > Kafka, since that's the one I am most familiar with.
> >
> > In the light of compatibility (or the lack thereof) between 0.8.x and
> > 0.9.x, Spark is faced with a problem of what version(s) of Kafka to be
> > compatible with, and has 2 options (discussed in this PR
> > ):
> > 1. We either upgrade to Kafka 0.9, dropping support for 0.8. Storm and
> > Flume are already on this path.
> > 2. We introduce complexity in our code to support both 0.8 and 0.9 for
> the
> > entire duration of our next major release (Apache Spark 2.x).
> >
> > I'd love to hear your thoughts on which option, you recommend.
> >
> > Long term, I'd really appreciate if Kafka could do something that doesn't
> > make Spark having to support two, or even more versions of Kafka. And, if
> > there is something that I, personally, and Spark project can do in your
> > next release candidate phase to make things easier, please do let us
> know.
> >
> > Thanks!
> > Mark
> >
>


Re: [discuss] DataFrame vs Dataset in Spark 2.0

2016-02-26 Thread Jakob Odersky
I would recommend (non-binding) option 1.

Apart from the API breakage I can see only advantages, and that sole
disadvantage is minimal for a few reasons:

1. the DataFrame API has been "Experimental" since its implementation,
so no stability was ever implied
2. considering that the change is for a major release some
incompatibilities are to be expected
3. using type aliases may break code now, but it will remove the
possibility of library incompatibilities in the future (see Reynold's
second point "[...] and we won't see type mismatches (e.g. a function
expects DataFrame, but user is passing in Dataset[Row]")

On Fri, Feb 26, 2016 at 11:51 AM, Reynold Xin  wrote:
> That's actually not Row vs non-Row.
>
> It's just primitive vs non-primitive. Primitives get automatically
> flattened, to avoid having to type ._1 all the time.
>
> On Fri, Feb 26, 2016 at 2:06 AM, Sun, Rui  wrote:
>>
>> Thanks for the explaination.
>>
>>
>>
>> What confusing me is the different internal semantic of Dataset on non-Row
>> type (primitive types for example) and Row type:
>>
>>
>>
>> Dataset[Int] is internally actually Dataset[Row(value:Int)]
>>
>>
>>
>> scala> val ds = sqlContext.createDataset(Seq(1,2,3))
>>
>> ds: org.apache.spark.sql.Dataset[Int] = [value: int]
>>
>>
>>
>> scala> ds.schema.json
>>
>> res17: String =
>> {"type":"struct","fields":[{"name":"value","type":"integer","nullable":false,"metadata":{}}]}
>>
>>
>>
>> But obviously Dataset[Row] is not internally Dataset[Row(value: Row)].
>>
>>
>>
>> From: Reynold Xin [mailto:r...@databricks.com]
>> Sent: Friday, February 26, 2016 3:55 PM
>> To: Sun, Rui 
>> Cc: Koert Kuipers ; dev@spark.apache.org
>>
>>
>> Subject: Re: [discuss] DataFrame vs Dataset in Spark 2.0
>>
>>
>>
>> The join and joinWith are just two different join semantics, and is not
>> about Dataset vs DataFrame.
>>
>>
>>
>> join is the relational join, where fields are flattened; joinWith is more
>> like a tuple join, where the output has two fields that are nested.
>>
>>
>>
>> So you can do
>>
>>
>>
>> Dataset[A] joinWith Dataset[B] = Dataset[(A, B)]
>>
>>
>> DataFrame[A] joinWith DataFrame[B] = Dataset[(Row, Row)]
>>
>>
>>
>> Dataset[A] join Dataset[B] = Dataset[Row]
>>
>>
>>
>> DataFrame[A] join DataFrame[B] = Dataset[Row]
>>
>>
>>
>>
>>
>>
>>
>> On Thu, Feb 25, 2016 at 11:37 PM, Sun, Rui  wrote:
>>
>> Vote for option 2.
>>
>> Source compatibility and binary compatibility are very important from
>> user’s perspective.
>>
>> It ‘s unfair for Java developers that they don’t have DataFrame
>> abstraction. As you said, sometimes it is more natural to think about
>> DataFrame.
>>
>>
>>
>> I am wondering if conceptually there is slight subtle difference between
>> DataFrame and Dataset[Row]? For example,
>>
>> Dataset[T] joinWith Dataset[U]  produces Dataset[(T, U)]
>>
>> So,
>>
>> Dataset[Row] joinWith Dataset[Row]  produces Dataset[(Row, Row)]
>>
>>
>>
>> While
>>
>> DataFrame join DataFrame is still DataFrame of Row?
>>
>>
>>
>> From: Reynold Xin [mailto:r...@databricks.com]
>> Sent: Friday, February 26, 2016 8:52 AM
>> To: Koert Kuipers 
>> Cc: dev@spark.apache.org
>> Subject: Re: [discuss] DataFrame vs Dataset in Spark 2.0
>>
>>
>>
>> Yes - and that's why source compatibility is broken.
>>
>>
>>
>> Note that it is not just a "convenience" thing. Conceptually DataFrame is
>> a Dataset[Row], and for some developers it is more natural to think about
>> "DataFrame" rather than "Dataset[Row]".
>>
>>
>>
>> If we were in C++, DataFrame would've been a type alias for Dataset[Row]
>> too, and some methods would return DataFrame (e.g. sql method).
>>
>>
>>
>>
>>
>>
>>
>> On Thu, Feb 25, 2016 at 4:50 PM, Koert Kuipers  wrote:
>>
>> since a type alias is purely a convenience thing for the scala compiler,
>> does option 1 mean that the concept of DataFrame ceases to exist from a java
>> perspective, and they will have to refer to Dataset?
>>
>>
>>
>> On Thu, Feb 25, 2016 at 6:23 PM, Reynold Xin  wrote:
>>
>> When we first introduced Dataset in 1.6 as an experimental API, we wanted
>> to merge Dataset/DataFrame but couldn't because we didn't want to break the
>> pre-existing DataFrame API (e.g. map function should return Dataset, rather
>> than RDD). In Spark 2.0, one of the main API changes is to merge DataFrame
>> and Dataset.
>>
>>
>>
>> Conceptually, DataFrame is just a Dataset[Row]. In practice, there are two
>> ways to implement this:
>>
>>
>>
>> Option 1. Make DataFrame a type alias for Dataset[Row]
>>
>>
>>
>> Option 2. DataFrame as a concrete class that extends Dataset[Row]
>>
>>
>>
>>
>>
>> I'm wondering what you think about this. The pros and cons I can think of
>> are:
>>
>>
>>
>>
>>
>> Option 1. Make DataFrame a type alias for Dataset[Row]
>>
>>
>>
>> + Cleaner conceptually, especially in Scala. It will be very clear what
>> libraries or applications 

Re: More Robust DataSource Parameters

2016-02-26 Thread Reynold Xin
Thanks for the email. This sounds great in theory, but might run into two
major problems:

1. Need to support 4+ programming languages (SQL, Python, Java, Scala)

2. API stability (both backward and forward)



On Fri, Feb 26, 2016 at 8:44 AM, Hamel Kothari 
wrote:

> Hi devs,
>
> Has there been any discussion around changing the DataSource parameters
> arguments be something more sophisticated than Map[String, String]? As you
> write more complex DataSources there are likely to be a variety of
> parameters of varying formats which are needed and having to coerce them to
> be strings becomes suboptimal pretty fast.
>
> Quite often I see this combated by people specifying parameters which take
> in Json strings and then parse them into the parameter objects that they
> actually need. Unfortunately having people write Json strings can be a
> really error prone process so to ensure compile time safety people write
> convenience functions written which take in actual POJOs as parameters,
> serialize them to json so they can be passed into the data source API and
> then deserialize them in the constructors of their data sources. There's
> also no real story around discoverability of options with the current
> Map[String, String] setup other than looking at the source code of the
> datasource and hoping that they specified constants somewhere.
>
> Rather than doing all of the above, we could adapt the DataSource API to
> have RelationProviders be templated on a parameter class which could be
> provided to the createRelation call. On the user's side, they could just
> create the appropriate configuration object and provide that object to the
> DataFrameReader.parameters call and it would be possible to guarantee that
> enough parameters were provided to construct a DataFrame in that case.
>
> The key challenge I see with this approach is that I'm not sure how to
> make the above changes in a backwards compatible way that doesn't involve
> duplicating a bunch of methods.
>
> Do people have thoughts regarding this approach? I'm happy to file a JIRA
> and have the discussion there if it makes sense.
>
> Best,
> Hamel
>


Re: [discuss] DataFrame vs Dataset in Spark 2.0

2016-02-26 Thread Reynold Xin
That's actually not Row vs non-Row.

It's just primitive vs non-primitive. Primitives get automatically
flattened, to avoid having to type ._1 all the time.

On Fri, Feb 26, 2016 at 2:06 AM, Sun, Rui  wrote:

> Thanks for the explaination.
>
>
>
> What confusing me is the different internal semantic of Dataset on non-Row
> type (primitive types for example) and Row type:
>
>
>
> Dataset[Int] is internally actually Dataset[Row(value:Int)]
>
>
>
> scala> val ds = sqlContext.createDataset(Seq(1,2,3))
>
> ds: org.apache.spark.sql.Dataset[Int] = [value: int]
>
>
>
> scala> ds.schema.json
>
> res17: String =
> {"type":"struct","fields":[{"name":"value","type":"integer","nullable":false,"metadata":{}}]}
>
>
>
> But obviously Dataset[Row] is not internally Dataset[Row(value: Row)].
>
>
>
> *From:* Reynold Xin [mailto:r...@databricks.com]
> *Sent:* Friday, February 26, 2016 3:55 PM
> *To:* Sun, Rui 
> *Cc:* Koert Kuipers ; dev@spark.apache.org
>
> *Subject:* Re: [discuss] DataFrame vs Dataset in Spark 2.0
>
>
>
> The join and joinWith are just two different join semantics, and is not
> about Dataset vs DataFrame.
>
>
>
> join is the relational join, where fields are flattened; joinWith is more
> like a tuple join, where the output has two fields that are nested.
>
>
>
> So you can do
>
>
>
> Dataset[A] joinWith Dataset[B] = Dataset[(A, B)]
>
>
> DataFrame[A] joinWith DataFrame[B] = Dataset[(Row, Row)]
>
>
>
> Dataset[A] join Dataset[B] = Dataset[Row]
>
>
>
> DataFrame[A] join DataFrame[B] = Dataset[Row]
>
>
>
>
>
>
>
> On Thu, Feb 25, 2016 at 11:37 PM, Sun, Rui  wrote:
>
> Vote for option 2.
>
> Source compatibility and binary compatibility are very important from
> user’s perspective.
>
> It ‘s unfair for Java developers that they don’t have DataFrame
> abstraction. As you said, sometimes it is more natural to think about
> DataFrame.
>
>
>
> I am wondering if conceptually there is slight subtle difference between
> DataFrame and Dataset[Row]? For example,
>
> Dataset[T] joinWith Dataset[U]  produces Dataset[(T, U)]
>
> So,
>
> Dataset[Row] joinWith Dataset[Row]  produces Dataset[(Row, Row)]
>
>
>
> While
>
> DataFrame join DataFrame is still DataFrame of Row?
>
>
>
> *From:* Reynold Xin [mailto:r...@databricks.com]
> *Sent:* Friday, February 26, 2016 8:52 AM
> *To:* Koert Kuipers 
> *Cc:* dev@spark.apache.org
> *Subject:* Re: [discuss] DataFrame vs Dataset in Spark 2.0
>
>
>
> Yes - and that's why source compatibility is broken.
>
>
>
> Note that it is not just a "convenience" thing. Conceptually DataFrame is
> a Dataset[Row], and for some developers it is more natural to think about
> "DataFrame" rather than "Dataset[Row]".
>
>
>
> If we were in C++, DataFrame would've been a type alias for Dataset[Row]
> too, and some methods would return DataFrame (e.g. sql method).
>
>
>
>
>
>
>
> On Thu, Feb 25, 2016 at 4:50 PM, Koert Kuipers  wrote:
>
> since a type alias is purely a convenience thing for the scala compiler,
> does option 1 mean that the concept of DataFrame ceases to exist from a
> java perspective, and they will have to refer to Dataset?
>
>
>
> On Thu, Feb 25, 2016 at 6:23 PM, Reynold Xin  wrote:
>
> When we first introduced Dataset in 1.6 as an experimental API, we wanted
> to merge Dataset/DataFrame but couldn't because we didn't want to break the
> pre-existing DataFrame API (e.g. map function should return Dataset, rather
> than RDD). In Spark 2.0, one of the main API changes is to merge DataFrame
> and Dataset.
>
>
>
> Conceptually, DataFrame is just a Dataset[Row]. In practice, there are two
> ways to implement this:
>
>
>
> Option 1. Make DataFrame a type alias for Dataset[Row]
>
>
>
> Option 2. DataFrame as a concrete class that extends Dataset[Row]
>
>
>
>
>
> I'm wondering what you think about this. The pros and cons I can think of
> are:
>
>
>
>
>
> Option 1. Make DataFrame a type alias for Dataset[Row]
>
>
>
> + Cleaner conceptually, especially in Scala. It will be very clear what
> libraries or applications need to do, and we won't see type mismatches
> (e.g. a function expects DataFrame, but user is passing in Dataset[Row]
>
> + A lot less code
>
> - Breaks source compatibility for the DataFrame API in Java, and binary
> compatibility for Scala/Java
>
>
>
>
>
> Option 2. DataFrame as a concrete class that extends Dataset[Row]
>
>
>
> The pros/cons are basically the inverse of Option 1.
>
>
>
> + In most cases, can maintain source compatibility for the DataFrame API
> in Java, and binary compatibility for Scala/Java
>
> - A lot more code (1000+ loc)
>
> - Less cleaner, and can be confusing when users pass in a Dataset[Row]
> into a function that expects a DataFrame
>
>
>
>
>
> The concerns are mostly with Scala/Java. For Python, it is very easy to
> maintain source compatibility for both (there is no concept of binary
> compatibility), and for R, we are only 

External dependencies in public APIs (was previously: Upgrading to Kafka 0.9.x)

2016-02-26 Thread Reynold Xin
Dropping Kafka list since this is about a slightly different topic.

Every time we expose the API of a 3rd party application as a public Spark
API has caused some problems down the road. This goes from Hadoop, Tachyon,
Kafka, to Guava. Most of these are used for input/output.

The good thing is that in Spark 2.0 we are removing most of those
exposures, and in the new DataFrame/Dataset API we are providing an unified
input/output API for end-users so the internals of the 3rd party
dependencies are no longer exposed directly to users. Unfortunately, some
Spark APIs still depend on Hadoop.

It is important to keep this in mind as we develop Spark. We should avoid
to the best degree possible exposing other projects' APIs for the long term
stability of Spark APIs.


On Fri, Feb 26, 2016 at 9:46 AM, Mark Grover  wrote:

> Hi Kafka devs,
> I come to you with a dilemma and a request.
>
> Based on what I understand, users of Kafka need to upgrade their brokers
> to Kafka 0.9.x first, before they upgrade their clients to Kafka 0.9.x.
>
> However, that presents a problem to other projects that integrate with
> Kafka (Spark, Flume, Storm, etc.). From here on, I will speak for Spark +
> Kafka, since that's the one I am most familiar with.
>
> In the light of compatibility (or the lack thereof) between 0.8.x and
> 0.9.x, Spark is faced with a problem of what version(s) of Kafka to be
> compatible with, and has 2 options (discussed in this PR
> ):
> 1. We either upgrade to Kafka 0.9, dropping support for 0.8. Storm and
> Flume are already on this path.
> 2. We introduce complexity in our code to support both 0.8 and 0.9 for the
> entire duration of our next major release (Apache Spark 2.x).
>
> I'd love to hear your thoughts on which option, you recommend.
>
> Long term, I'd really appreciate if Kafka could do something that doesn't
> make Spark having to support two, or even more versions of Kafka. And, if
> there is something that I, personally, and Spark project can do in your
> next release candidate phase to make things easier, please do let us know.
>
> Thanks!
> Mark
>


Re: make-distribution.sh fails because tachyon-project was renamed to Alluxio

2016-02-26 Thread Jiří Šimša
Hi Jong, the download links should be fixed now.

Best,

On Fri, Feb 26, 2016 at 9:19 AM, Jiří Šimša  wrote:

> Hi Jong,
>
> Thank you for pointing that out. I am one of the maintainers of the
> Alluxio project, formerly known as Tachyon, and will make sure that the old
> download links still work. I will update this thread when it is fixed.
>
> On a related note, any Spark version will work with Alluxio 1.0 (or any
> future version) as there is no longer a compile time dependency.
>
> Best,
>
> Jiří Šimša
> Alluxio, Inc. 
>
> On Fri, Feb 26, 2016 at 7:47 AM, Jong Wook Kim  wrote:
>
>> Hi,
>>
>> Spark's packaging script downloads tachyon from tachyon-project.org
>> 
>> which is now redirected to alluxio.org.
>>
>> I guess the url should be changed to
>> http://alluxio.org/downloads/files/0.8.2/ is it right?
>>
>> Jong Wook
>>
>
>


-- 
Jiří Šimša
Alluxio, Inc. 


Upgrading to Kafka 0.9.x

2016-02-26 Thread Mark Grover
Hi Kafka devs,
I come to you with a dilemma and a request.

Based on what I understand, users of Kafka need to upgrade their brokers to
Kafka 0.9.x first, before they upgrade their clients to Kafka 0.9.x.

However, that presents a problem to other projects that integrate with
Kafka (Spark, Flume, Storm, etc.). From here on, I will speak for Spark +
Kafka, since that's the one I am most familiar with.

In the light of compatibility (or the lack thereof) between 0.8.x and
0.9.x, Spark is faced with a problem of what version(s) of Kafka to be
compatible with, and has 2 options (discussed in this PR
):
1. We either upgrade to Kafka 0.9, dropping support for 0.8. Storm and
Flume are already on this path.
2. We introduce complexity in our code to support both 0.8 and 0.9 for the
entire duration of our next major release (Apache Spark 2.x).

I'd love to hear your thoughts on which option, you recommend.

Long term, I'd really appreciate if Kafka could do something that doesn't
make Spark having to support two, or even more versions of Kafka. And, if
there is something that I, personally, and Spark project can do in your
next release candidate phase to make things easier, please do let us know.

Thanks!
Mark


Re: Hbase in spark

2016-02-26 Thread Ted Malaska
Yes, and I have used HBASE-15271 and successful loaded over 20 billion
records into HBase even with node failures.

On Fri, Feb 26, 2016 at 11:55 AM, Ted Yu  wrote:

> In hbase, there is hbase-spark module which supports bulk load.
> This module is to be backported in the upcoming 1.3.0 release.
>
> There is some pending work, such as HBASE-15271 .
>
> FYI
>
> On Fri, Feb 26, 2016 at 8:50 AM, Renu Yadav  wrote:
>
>> Has anybody implemented bulk load into hbase using spark?
>>
>> I need help to optimize its performance.
>>
>> Please help.
>>
>>
>> Thanks & Regards,
>> Renu Yadav
>>
>
>


Re: make-distribution.sh fails because tachyon-project was renamed to Alluxio

2016-02-26 Thread Jiří Šimša
Hi Jong,

Thank you for pointing that out. I am one of the maintainers of the Alluxio
project, formerly known as Tachyon, and will make sure that the old
download links still work. I will update this thread when it is fixed.

On a related note, any Spark version will work with Alluxio 1.0 (or any
future version) as there is no longer a compile time dependency.

Best,

Jiří Šimša
Alluxio, Inc. 

On Fri, Feb 26, 2016 at 7:47 AM, Jong Wook Kim  wrote:

> Hi,
>
> Spark's packaging script downloads tachyon from tachyon-project.org
> 
> which is now redirected to alluxio.org.
>
> I guess the url should be changed to
> http://alluxio.org/downloads/files/0.8.2/ is it right?
>
> Jong Wook
>


Re: make-distribution.sh fails because tachyon-project was renamed to Alluxio

2016-02-26 Thread Sean Owen
Yes, though more broadly, should this just be removed for 2.x? I had
this sense Tachyon was going away, or at least being put into a corner
of the project. There's probalby at least no need for special builds
for it.

On Fri, Feb 26, 2016 at 3:47 PM, Jong Wook Kim  wrote:
> Hi,
>
> Spark's packaging script downloads tachyon from tachyon-project.org which is
> now redirected to alluxio.org.
>
> I guess the url should be changed to
> http://alluxio.org/downloads/files/0.8.2/ is it right?
>
> Jong Wook

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Hbase in spark

2016-02-26 Thread Ted Yu
In hbase, there is hbase-spark module which supports bulk load.
This module is to be backported in the upcoming 1.3.0 release.

There is some pending work, such as HBASE-15271 .

FYI

On Fri, Feb 26, 2016 at 8:50 AM, Renu Yadav  wrote:

> Has anybody implemented bulk load into hbase using spark?
>
> I need help to optimize its performance.
>
> Please help.
>
>
> Thanks & Regards,
> Renu Yadav
>


More Robust DataSource Parameters

2016-02-26 Thread Hamel Kothari
Hi devs,

Has there been any discussion around changing the DataSource parameters
arguments be something more sophisticated than Map[String, String]? As you
write more complex DataSources there are likely to be a variety of
parameters of varying formats which are needed and having to coerce them to
be strings becomes suboptimal pretty fast.

Quite often I see this combated by people specifying parameters which take
in Json strings and then parse them into the parameter objects that they
actually need. Unfortunately having people write Json strings can be a
really error prone process so to ensure compile time safety people write
convenience functions written which take in actual POJOs as parameters,
serialize them to json so they can be passed into the data source API and
then deserialize them in the constructors of their data sources. There's
also no real story around discoverability of options with the current
Map[String, String] setup other than looking at the source code of the
datasource and hoping that they specified constants somewhere.

Rather than doing all of the above, we could adapt the DataSource API to
have RelationProviders be templated on a parameter class which could be
provided to the createRelation call. On the user's side, they could just
create the appropriate configuration object and provide that object to the
DataFrameReader.parameters call and it would be possible to guarantee that
enough parameters were provided to construct a DataFrame in that case.

The key challenge I see with this approach is that I'm not sure how to make
the above changes in a backwards compatible way that doesn't involve
duplicating a bunch of methods.

Do people have thoughts regarding this approach? I'm happy to file a JIRA
and have the discussion there if it makes sense.

Best,
Hamel


make-distribution.sh fails because tachyon-project was renamed to Alluxio

2016-02-26 Thread Jong Wook Kim
Hi,

Spark's packaging script downloads tachyon from tachyon-project.org

which is now redirected to alluxio.org.

I guess the url should be changed to
http://alluxio.org/downloads/files/0.8.2/ is it right?

Jong Wook


Re: Aggregation + Adding static column + Union + Projection = Problem

2016-02-26 Thread Herman van Hövell tot Westerflier
Hi Jiří,

Thanks for your mail.

Could you create a JIRA ticket for this:
 
https://issues.apache.org/jira/browse/SPARK/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel



?

Kind regards,

Herman van Hövell


2016-02-26 15:11 GMT+01:00 Jiří Syrový :

> Hi,
>
> I've recently noticed a bug in Spark (branch 1.6) that appears if you do
> the following
>
> Let's have some DataFrame called df.
>
> 1) Aggregation of multiple columns on the Dataframe df and store result as
> result_agg_1
> 2) Do another aggregation of multiple columns, but on one less grouping
> columns and store the result as result_agg_2
> 3) Align the result of second aggregation by adding missing grouping
> column with value empty lit("")
> 4) Union result_agg_1 and result_agg_2
> 5) Do the projection from "sum(count_column)" to "count_column" for all
> aggregated columns.
>
> The result is structurally inconsistent DataFrame that has all the data
> coming from result_agg_1 shifted.
>
> An example of stripped down code and example result can be seen here:
>
> https://gist.github.com/xjrk58/e0c7171287ee9bdc8df8
> https://gist.github.com/xjrk58/7a297a42ebb94f300d96
>
> Best,
> Jiri Syrovy
>
>


Fwd: Aggregation + Adding static column + Union + Projection = Problem

2016-02-26 Thread Jiří Syrový
Hi,

I've recently noticed a bug in Spark (branch 1.6) that appears if you do
the following

Let's have some DataFrame called df.

1) Aggregation of multiple columns on the Dataframe df and store result as
result_agg_1
2) Do another aggregation of multiple columns, but on one less grouping
columns and store the result as result_agg_2
3) Align the result of second aggregation by adding missing grouping column
with value empty lit("")
4) Union result_agg_1 and result_agg_2
5) Do the projection from "sum(count_column)" to "count_column" for all
aggregated columns.

The result is structurally inconsistent DataFrame that has all the data
coming from result_agg_1 shifted.

An example of stripped down code and example result can be seen here:

https://gist.github.com/xjrk58/e0c7171287ee9bdc8df8
https://gist.github.com/xjrk58/7a297a42ebb94f300d96

Best,
Jiri Syrovy


Is spark.driver.maxResultSize used correctly ?

2016-02-26 Thread Jeff Zhang
My job get this exception very easily even when I set large value of
spark.driver.maxResultSize. After checking the spark code, I found
spark.driver.maxResultSize is also used in Executor side to decide whether
DirectTaskResult/InDirectTaskResult sent. This doesn't make sense to me.
Using  spark.driver.maxResultSize / taskNum might be more proper. Because
if  spark.driver.maxResultSize is 1g and we have 10 tasks each has 200m
output. Then even the output of each task is less than
 spark.driver.maxResultSize so DirectTaskResult will be sent to driver, but
the total result size is 2g which will cause exception in driver side.


16/02/26 10:10:49 INFO DAGScheduler: Job 4 failed: treeAggregate at
LogisticRegression.scala:283, took 33.796379 s

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Total size of serialized results of 1 tasks (1085.0 MB)
is bigger than spark.driver.maxResultSize (1024.0 MB)


-- 
Best Regards

Jeff Zhang


Re: DirectFileOutputCommiter

2016-02-26 Thread Teng Qiu
Hi, thanks :) performance gain is huge, we have a INSERT INTO query, ca.
30GB in JSON format will be written to s3 at the end, without
DirectOutputCommitter and our hack in hive and InsertIntoHiveTable.scala,
it took more than 40min, with our changes, only 15min then.

DirectOutputCommitter works for SparkContext and SqlContext, but for
HiveContext, it only solved the problem with "staging folder" in target
table, problem for HiveContext is here:
https://github.com/apache/spark/blob/v1.6.0/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala#L132

besides staging folder created by Committer, Hive will use a temp location
as well... so we made some hack on this:
https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando#diff-d579db9a8f27e0bbef37720ab14ec3f6R134

mainly idea is, we added an internal var runID, and use HiveConf
spark.hive.insert.skip.temp to disable Hive to use temp location, but with
this hack, we need to change Hive's implementation... we put our Hive.java
file under
sql/hive/src/main/java/org/apache/hadoop/hive/ql/metadata/Hive.java

you can find the full change using this link:
https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando#diff-d579db9a8f27e0bbef37720ab14ec3f6R134


i would like to forward this discuss to spark-dev, hope spark team can
think about it, and hope there will be a better solution for this, like
some more official hack :D


2016-02-26 7:24 GMT+01:00 Takeshi Yamamuro :

> Hi,
>
> Great work!
> What is the concrete performance gain of the committer on s3?
> I'd like to know.
>
> I think there is no direct committer for files because these kinds of
> committer has risks
> to loss data (See: SPARK-10063).
> Until this resolved, ISTM files cannot support direct commits.
>
> thanks,
>
>
>
> On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu  wrote:
>
>> yes, should be this one
>> https://gist.github.com/aarondav/c513916e72101bbe14ec
>>
>> then need to set it in spark-defaults.conf :
>> https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13
>>
>> Am Freitag, 26. Februar 2016 schrieb Yin Yang :
>> > The header of DirectOutputCommitter.scala says Databricks.
>> > Did you get it from Databricks ?
>> > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu  wrote:
>> >>
>> >> interesting in this topic as well, why the DirectFileOutputCommitter
>> not included?
>> >> we added it in our fork,
>> under core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
>> >> moreover, this DirectFileOutputCommitter is not working for the insert
>> operations in HiveContext, since the Committer is called by hive (means
>> uses dependencies in hive package)
>> >> we made some hack to fix this, you can take a look:
>> >>
>> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
>> >>
>> >> may bring some ideas to other spark contributors to find a better way
>> to use s3.
>> >>
>> >> 2016-02-22 23:18 GMT+01:00 igor.berman :
>> >>>
>> >>> Hi,
>> >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
>> alikes
>> >>> especially when working with s3?
>> >>> I know that there is one impl in spark distro for parquet format, but
>> not
>> >>> for files -  why?
>> >>>
>> >>> Imho, it can bring huge performance boost.
>> >>> Using default FileOutputCommiter with s3 has big overhead at commit
>> stage
>> >>> when all parts are copied one-by-one to destination dir from
>> _temporary,
>> >>> which is bottleneck when number of partitions is high.
>> >>>
>> >>> Also, wanted to know if there are some problems when using
>> >>> DirectFileOutputCommitter?
>> >>> If writing one partition directly will fail in the middle is spark
>> will
>> >>> notice this and will fail job(say after all retries)?
>> >>>
>> >>> thanks in advance
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
>> >>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >>>
>> >>> -
>> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >>> For additional commands, e-mail: user-h...@spark.apache.org
>> >>>
>> >>
>> >
>> >
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


RE: [discuss] DataFrame vs Dataset in Spark 2.0

2016-02-26 Thread Sun, Rui
Thanks for the explaination.

What confusing me is the different internal semantic of Dataset on non-Row type 
(primitive types for example) and Row type:

Dataset[Int] is internally actually Dataset[Row(value:Int)]

scala> val ds = sqlContext.createDataset(Seq(1,2,3))
ds: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> ds.schema.json
res17: String = 
{"type":"struct","fields":[{"name":"value","type":"integer","nullable":false,"metadata":{}}]}

But obviously Dataset[Row] is not internally Dataset[Row(value: Row)].

From: Reynold Xin [mailto:r...@databricks.com]
Sent: Friday, February 26, 2016 3:55 PM
To: Sun, Rui 
Cc: Koert Kuipers ; dev@spark.apache.org
Subject: Re: [discuss] DataFrame vs Dataset in Spark 2.0

The join and joinWith are just two different join semantics, and is not about 
Dataset vs DataFrame.

join is the relational join, where fields are flattened; joinWith is more like 
a tuple join, where the output has two fields that are nested.

So you can do

Dataset[A] joinWith Dataset[B] = Dataset[(A, B)]

DataFrame[A] joinWith DataFrame[B] = Dataset[(Row, Row)]

Dataset[A] join Dataset[B] = Dataset[Row]

DataFrame[A] join DataFrame[B] = Dataset[Row]



On Thu, Feb 25, 2016 at 11:37 PM, Sun, Rui 
> wrote:
Vote for option 2.
Source compatibility and binary compatibility are very important from user’s 
perspective.
It ‘s unfair for Java developers that they don’t have DataFrame abstraction. As 
you said, sometimes it is more natural to think about DataFrame.

I am wondering if conceptually there is slight subtle difference between 
DataFrame and Dataset[Row]? For example,
Dataset[T] joinWith Dataset[U]  produces Dataset[(T, U)]
So,
Dataset[Row] joinWith Dataset[Row]  produces Dataset[(Row, Row)]

While
DataFrame join DataFrame is still DataFrame of Row?

From: Reynold Xin [mailto:r...@databricks.com]
Sent: Friday, February 26, 2016 8:52 AM
To: Koert Kuipers >
Cc: dev@spark.apache.org
Subject: Re: [discuss] DataFrame vs Dataset in Spark 2.0

Yes - and that's why source compatibility is broken.

Note that it is not just a "convenience" thing. Conceptually DataFrame is a 
Dataset[Row], and for some developers it is more natural to think about 
"DataFrame" rather than "Dataset[Row]".

If we were in C++, DataFrame would've been a type alias for Dataset[Row] too, 
and some methods would return DataFrame (e.g. sql method).



On Thu, Feb 25, 2016 at 4:50 PM, Koert Kuipers 
> wrote:
since a type alias is purely a convenience thing for the scala compiler, does 
option 1 mean that the concept of DataFrame ceases to exist from a java 
perspective, and they will have to refer to Dataset?

On Thu, Feb 25, 2016 at 6:23 PM, Reynold Xin 
> wrote:
When we first introduced Dataset in 1.6 as an experimental API, we wanted to 
merge Dataset/DataFrame but couldn't because we didn't want to break the 
pre-existing DataFrame API (e.g. map function should return Dataset, rather 
than RDD). In Spark 2.0, one of the main API changes is to merge DataFrame and 
Dataset.

Conceptually, DataFrame is just a Dataset[Row]. In practice, there are two ways 
to implement this:

Option 1. Make DataFrame a type alias for Dataset[Row]

Option 2. DataFrame as a concrete class that extends Dataset[Row]


I'm wondering what you think about this. The pros and cons I can think of are:


Option 1. Make DataFrame a type alias for Dataset[Row]

+ Cleaner conceptually, especially in Scala. It will be very clear what 
libraries or applications need to do, and we won't see type mismatches (e.g. a 
function expects DataFrame, but user is passing in Dataset[Row]
+ A lot less code
- Breaks source compatibility for the DataFrame API in Java, and binary 
compatibility for Scala/Java


Option 2. DataFrame as a concrete class that extends Dataset[Row]

The pros/cons are basically the inverse of Option 1.

+ In most cases, can maintain source compatibility for the DataFrame API in 
Java, and binary compatibility for Scala/Java
- A lot more code (1000+ loc)
- Less cleaner, and can be confusing when users pass in a Dataset[Row] into a 
function that expects a DataFrame


The concerns are mostly with Scala/Java. For Python, it is very easy to 
maintain source compatibility for both (there is no concept of binary 
compatibility), and for R, we are only supporting the DataFrame operations 
anyway because that's more familiar interface for R users outside of Spark.







Re: how about a custom coalesce() policy?

2016-02-26 Thread Reynold Xin
Using the right email for Nezih


On Fri, Feb 26, 2016 at 12:01 AM, Reynold Xin  wrote:

> I think this can be useful.
>
> The only thing is that we are slowly migrating to the Dataset/DataFrame
> API, and leave RDD mostly as is as a lower level API. Maybe we should do
> both? In either case it would be great to discuss the API on a pull
> request. Cheers.
>
> On Wed, Feb 24, 2016 at 2:08 PM, Nezih Yigitbasi <
> nyigitb...@netflix.com.invalid> wrote:
>
>> Hi Spark devs,
>>
>> I have sent an email about my problem some time ago where I want to merge
>> a large number of small files with Spark. Currently I am using Hive with
>> the CombineHiveInputFormat and I can control the size of the output
>> files with the max split size parameter (which is used for coalescing the
>> input splits by the CombineHiveInputFormat). My first attempt was to use
>> coalesce(), but since coalesce only considers the target number of
>> partitions the output file sizes were varying wildly.
>>
>> What I think can be useful is to have an optional PartitionCoalescer
>> parameter (a new interface) in the coalesce() method (or maybe we can
>> add a new method ?) that the callers can implement for custom coalescing
>> strategies — for my use case I have already implemented a
>> SizeBasedPartitionCoalescer that coalesces partitions by looking at
>> their sizes and by using a max split size parameter, similar to the
>> CombineHiveInputFormat (I also had to expose HadoopRDD to get access to
>> the individual split sizes etc.).
>>
>> What do you guys think about such a change, can it be useful to other
>> users as well? Or do you think that there is an easier way to accomplish
>> the same merge logic? If you think it may be useful, I already have an
>> implementation and I will be happy to work with the community to contribute
>> it.
>>
>> Thanks,
>> Nezih
>> ​
>>
>
>


Re: how about a custom coalesce() policy?

2016-02-26 Thread Reynold Xin
I think this can be useful.

The only thing is that we are slowly migrating to the Dataset/DataFrame
API, and leave RDD mostly as is as a lower level API. Maybe we should do
both? In either case it would be great to discuss the API on a pull
request. Cheers.

On Wed, Feb 24, 2016 at 2:08 PM, Nezih Yigitbasi <
nyigitb...@netflix.com.invalid> wrote:

> Hi Spark devs,
>
> I have sent an email about my problem some time ago where I want to merge
> a large number of small files with Spark. Currently I am using Hive with
> the CombineHiveInputFormat and I can control the size of the output files
> with the max split size parameter (which is used for coalescing the input
> splits by the CombineHiveInputFormat). My first attempt was to use
> coalesce(), but since coalesce only considers the target number of
> partitions the output file sizes were varying wildly.
>
> What I think can be useful is to have an optional PartitionCoalescer
> parameter (a new interface) in the coalesce() method (or maybe we can add
> a new method ?) that the callers can implement for custom coalescing
> strategies — for my use case I have already implemented a
> SizeBasedPartitionCoalescer that coalesces partitions by looking at their
> sizes and by using a max split size parameter, similar to the
> CombineHiveInputFormat (I also had to expose HadoopRDD to get access to
> the individual split sizes etc.).
>
> What do you guys think about such a change, can it be useful to other
> users as well? Or do you think that there is an easier way to accomplish
> the same merge logic? If you think it may be useful, I already have an
> implementation and I will be happy to work with the community to contribute
> it.
>
> Thanks,
> Nezih
> ​
>