Possible bug in DatasourceV2

2018-10-10 Thread assaf.mendelson
Hi,

I created a datasource writer WITHOUT a reader. When I do, I get an
exception: org.apache.spark.sql.AnalysisException: Data source is not
readable: DefaultSource

The reason for this is that when save is called, inside the source match to
WriterSupport we have the following code:

val source = cls.newInstance().asInstanceOf[DataSourceV2]
  source match {
case ws: WriteSupport =>
  val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
source,
df.sparkSession.sessionState.conf)
  val options = sessionOptions ++ extraOptions
-->  val relation = DataSourceV2Relation.create(source, options)

  if (mode == SaveMode.Append) {
runCommand(df.sparkSession, "save") {
  AppendData.byName(relation, df.logicalPlan)
}

  } else {
val writer = ws.createWriter(
  UUID.randomUUID.toString, df.logicalPlan.output.toStructType,
mode,
  new DataSourceOptions(options.asJava))

if (writer.isPresent) {
  runCommand(df.sparkSession, "save") {
WriteToDataSourceV2(writer.get, df.logicalPlan)
  }
}
  }

but DataSourceV2Relation.create actively creates a reader
(source.createReader) to extract the schema: 

def create(
  source: DataSourceV2,
  options: Map[String, String],
  tableIdent: Option[TableIdentifier] = None,
  userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation
= {
val reader = source.createReader(options, userSpecifiedSchema)
val ident = tableIdent.orElse(tableFromOptions(options))
DataSourceV2Relation(
  source, reader.readSchema().toAttributes, options, ident,
userSpecifiedSchema)
  }


This makes me a little confused.

First, the schema is defined by the dataframe itself, not by the data
source, i.e. it should be extracted from df.schema and not by
source.createReader

Second, I see that relation is actually only use if the mode is
SaveMode.append (btw this means if it is needed it should be defined inside
the "if"). I am not sure I understand the portion of the AppendData but why
would reading from the source be included? 

Am I missing something here?

Thanks,
   Assaf



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Docker image to build Spark/Spark doc

2018-10-10 Thread assaf.mendelson
Hi all,
I was wondering if there was a docker image to build spark and/or spark
documentation

The idea would be that I would start the docker image, supplying the
directory with my code and a target directory and it would simply build
everything (maybe with some options).

Any chance there is already something like that which is working and tested?

Thanks, 
Assaf




--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



DataSourceV2 documentation & tutorial

2018-10-08 Thread assaf.mendelson
Hi all,

I have been working on a legacy datasource integration with data source V2
for the last couple of week including upgrading it to the Spark 2.4.0 RC.

During this process I wrote a tutorial with explanation on how to create a
new datasource (it can be found in
https://github.com/assafmendelson/DataSourceV2). 
It is still a work in progress (still a lot of TODOs in it), however, I
figured others might find it useful.

I was wondering if there is some place in the spark documentation where we
can put something like this so this would continually update with the
ongoing changes to the API.

Of course, if I have mistakes in it (which I probably do), I would be happy
to learn of them…

Thanks, 
Assaf




--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Data source V2 in spark 2.4.0

2018-10-04 Thread assaf.mendelson
Thanks for the info.

I have been converting an internal data source to V2 and am now preparing it
for 2.4.0.

I have a couple of suggestions from my experience so far.

First I believe we are missing documentation on this. I am currently writing
an internal tutorial based on what I am learning, I would be happy to share
it once it gets a little better (not sure where it should go though).



The change from using Row to using InternalRow is a little confusing. 
For generic row we can do Row.fromSeq(values) where values are regular java
types (matching the schema). This even includes more complex types like
Array[String] and everything just works.

For IntrenalRow, this doesn't work for non trivial types. I figured out how
to convert strings and timestamps (hopefully I even did it correctly)  but I
couldn't figure Array[String].

Beyond the fact that I would love to learn how to do the conversion
correctly for various types (such as array), I would suggest we should add
some method to create the internal row from base types. In the 2.3.0
version, the row we got from Get would be encoded via an encoder which was
provided. I managed to get it to work by doing:

val encoder = RowEncoder.apply(schema).resolveAndBind() in the constructor
and then encoder.toRow(Row.fromSeq(values))

this simply feels a little weird to me.


Another issue that I encountered is handling bad data. In our legacy source
we have cases where a specific row is bad. What we would do in non spark
code is simply skip it.

The problem is that in spark, if we put next to be true we must have some
row for the get function. This means we always need to read records ahead to
figure out if we actually ha something or not.

Might we instead be allowed to return null from get in which case the line
would just be skipped?


Lastly I would be happy for a means to return metrics from the reading (how
many records we read, how many bad records we have). Perhaps by allowing to
use accumulators in the data source?

Sorry for the long winded message, I will probably have more as I continue
to explore this.

Thanks, 
   Assaf.





--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Data source V2 in spark 2.4.0

2018-10-01 Thread assaf.mendelson
Hi all,
I understood from previous threads that the Data source V2 API will see some
changes in spark 2.4.0, however, I can't seem to find what these changes
are.

Is there some documentation which summarizes the changes?

The only mention I seem to find is this pull request:
https://github.com/apache/spark/pull/22009. Is this all of it?

Thanks,
Assaf. 



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Why is SQLImplicits an abstract class rather than a trait?

2018-08-06 Thread assaf.mendelson
The import will work for the trait but not for anyone implementing the trait. 
As for not having a master, it was just an example, the full example
contains some configurations.


Thanks, 
Assaf





--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Why is SQLImplicits an abstract class rather than a trait?

2018-08-05 Thread assaf.mendelson
Hi all,

I have been playing a bit with SQLImplicits and noticed that it is an
abstract class. I was wondering why is that? It has no constructor.

Because of it being an abstract class it means that adding a test trait
cannot extend it and still be a trait.

Consider the following:

trait MySparkTestTrait extends SQLImplicits {
  lazy val spark: SparkSession = SparkSession.builder().getOrCreate()
  protected override def _sqlContext: SQLContext = spark.sqlContext
}


This would mean that if I can do something like this:


class MyTestClass extends FunSuite with MySparkTestTrait {
test("SomeTest") {
// use spark implicits without needing to do import
spark.implicits._
}
}

Is there a reason for this being an abstract class?



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Data source V2

2018-07-30 Thread assaf.mendelson
Hi all,
I am currently in the middle of developing a new data source (for an
internal tool) using data source V2.
I noticed that  SPARK-24882
   is planned for 2.4 and
includes interface changes.

I was wondering if those are planned in addition to the current interfaces
or are aimed to replace them (specifically the most basic reading as this is
what I am using).

As a side note, I was wondering if there is any means to expose metrics from
the data source, e.g. I would like to expose a metric of the number of rows
read to the application (currently I am adding a per partition index column
and doing a custom idempotent accumulator which collects the maximum index
for each partition). 

Thanks,
Assaf.



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Spark data source resiliency

2018-07-03 Thread assaf.mendelson
You are correct, this solved it.
Thanks



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Spark data source resiliency

2018-07-02 Thread assaf.mendelson
That is what I expected, however, I did a very simple test (using println
just to see when the exception is triggered in the iterator) using local
master and I saw it failed once and cause the entire operation to fail.

Is this something which may be unique to local master (or some default
configuration which should be tested)?  I can't see a specific configuration
to handle this in the documentation.

Thanks,
Assaf.




--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Spark data source resiliency

2018-07-02 Thread assaf.mendelson
Hi All,

I am implemented a data source V2 which integrates with an internal system
and I need to make it resilient to errors in the internal data source.

The issue is that currently, if there is an exception in the data reader,
the exception seems to fail the entire task. I would prefer instead to just
restart the relevant partition.

Is there a way to do it or would I need to solve it inside the iterator
itself?

Thanks,
Assaf.



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Possible bug: inconsistent timestamp behavior

2017-08-15 Thread assaf.mendelson
Hi all,
I encountered weird behavior for timestamp. It seems that when using lit to add 
it to column, the timestamp goes from milliseconds representation to seconds 
representation:


scala> spark.range(1).withColumn("a", lit(new 
java.sql.Timestamp(148550335L)).cast("long")).show()
+---+--+
| id| a|
+---+--+
|  0|1485503350|
+---+--+


scala> spark.range(1).withColumn("a", 
lit(148550335L).cast(org.apache.spark.sql.types.TimestampType).cast(org.apache.spark.sql.types.LongType)).show()
+---+-+
| id|a|
+---+-+
|  0|148550335|
+---+-+


Is this a bug or am I missing something here?

Thanks,
Assaf





--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Possible-bug-inconsistent-timestamp-behavior-tp22144.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

RE: [SS] Why does ConsoleSink's addBatch convert input DataFrame to show it?

2017-07-07 Thread assaf.mendelson
I actually asked the same thing a couple of weeks ago.
Apparently, when you create a structured streaming plan, it is different than 
the batch plan and is fixed in order to properly aggregate. If you perform most 
operations on the dataframe it will recalculate the plan as a batch plan and 
will therefore not work properly. Therefore, you must either collect or turn to 
RDD and then create a new dataframe from the RDD.
It would be very useful IMO if we can "freeze" the plan for the input portion 
and work as if it was a new dataframe (similar to turning it to RDD and then 
creating a new dataframe from the RDD but without the overhead of converting to 
RDD and back to dataframe), however, this is not currently possible.

Thanks,
  Assaf.

From: Jacek Laskowski [via Apache Spark Developers List] 
[mailto:ml+s1001551n21930...@n3.nabble.com]
Sent: Friday, July 07, 2017 11:44 AM
To: Mendelson, Assaf
Subject: [SS] Why does ConsoleSink's addBatch convert input DataFrame to show 
it?

Hi,

Just noticed that the input DataFrame is collect'ed and then
parallelize'd simply to show it to the console [1]. Why so many fairly
expensive operations for show?

I'd appreciate some help understanding this code. Thanks.

[1] 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala#L51-L53

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

-
To unsubscribe e-mail: [hidden 
email]



If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/SS-Why-does-ConsoleSink-s-addBatch-convert-input-DataFrame-to-show-it-tp21930.html
To start a new topic under Apache Spark Developers List, email 
ml+s1001551n1...@n3.nabble.com
To unsubscribe from Apache Spark Developers List, click 
here.
NAML




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/RE-SS-Why-does-ConsoleSink-s-addBatch-convert-input-DataFrame-to-show-it-tp21931.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

RE: [VOTE] Apache Spark 2.2.0 (RC5)

2017-06-26 Thread assaf.mendelson
Not a show stopper, however, I was looking at the structured streaming 
programming guide and under arbitrary stateful operations 
(https://people.apache.org/~pwendell/spark-releases/spark-2.2.0-rc5-docs/structured-streaming-programming-guide.html#arbitrary-stateful-operations)
 the suggestion is to take a look at the examples 
(Scala/Java).
 These link to an non existing file (called StructuredSessionization or 
JavaStructuredSessionization, I couldn’t find either of these files in the 
repository).
If the example file exists, I think it would be nice to add it, otherwise I 
would suggest simply removing the examples link from the programming guide 
(there are examples inside the group state API 
https://people.apache.org/~pwendell/spark-releases/spark-2.2.0-rc5-docs/api/scala/index.html#org.apache.spark.sql.streaming.GroupState).

Thanks,
  Assaf.

From: Michael Armbrust [via Apache Spark Developers List] 
[mailto:ml+s1001551n21815...@n3.nabble.com]
Sent: Wednesday, June 21, 2017 2:50 AM
To: Mendelson, Assaf
Subject: [VOTE] Apache Spark 2.2.0 (RC5)

Please vote on releasing the following candidate as Apache Spark version 2.2.0. 
The vote is open until Friday, June 23rd, 2017 at 18:00 PST and passes if a 
majority of at least 3 +1 PMC votes are cast.

[ ] +1 Release this package as Apache Spark 2.2.0
[ ] -1 Do not release this package because ...


To learn more about Apache Spark, please see https://spark.apache.org/

The tag to be voted on is 
v2.2.0-rc5 
(62e442e73a2fa663892d2edaff5f7d72d7f402ed)

List of JIRA tickets resolved can be found with this 
filter.

The release files, including signatures, digests, etc. can be found at:
https://home.apache.org/~pwendell/spark-releases/spark-2.2.0-rc5-bin/

Release artifacts are signed with the following key:
https://people.apache.org/keys/committer/pwendell.asc

The staging repository for this release can be found at:
https://repository.apache.org/content/repositories/orgapachespark-1243/

The documentation corresponding to this release can be found at:
https://people.apache.org/~pwendell/spark-releases/spark-2.2.0-rc5-docs/


FAQ

How can I help test this release?

If you are a Spark user, you can help us test this release by taking an 
existing Spark workload and running on this release candidate, then reporting 
any regressions.

What should happen to JIRA tickets still targeting 2.2.0?

Committers should look at those and triage. Extremely important bug fixes, 
documentation, and API tweaks that impact compatibility should be worked on 
immediately. Everything else please retarget to 2.3.0 or 2.2.1.

But my bug isn't fixed!??!

In order to make timely releases, we will typically not hold the release unless 
the bug in question is a regression from 2.1.1.


If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/VOTE-Apache-Spark-2-2-0-RC5-tp21815.html
To start a new topic under Apache Spark Developers List, email 
ml+s1001551n1...@n3.nabble.com
To unsubscribe from Apache Spark Developers List, click 
here.
NAML




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/RE-VOTE-Apache-Spark-2-2-0-RC5-tp21863.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

cannot call explain or show on dataframe in structured streaming addBatch dataframe

2017-06-19 Thread assaf.mendelson
Hi all,
I am playing around with structured streaming and looked at the code for 
ConsoleSink.

I see the code has:


data.sparkSession.createDataFrame(
data.sparkSession.sparkContext.parallelize(data.collect()), data.schema)
.show(numRowsToShow, isTruncated)
}

I was wondering why it does not do data directly? Why the collect and 
parallelize?


Thanks,
  Assaf.





--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/cannot-call-explain-or-show-on-dataframe-in-structured-streaming-addBatch-dataframe-tp21792.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

RE: [PYTHON] PySpark typing hints

2017-05-23 Thread assaf.mendelson
Actually there is, at least for pycharm. I actually opened a jira on it 
(https://issues.apache.org/jira/browse/SPARK-17333). It describes two way of 
doing it (I also made a github stub at: 
https://github.com/assafmendelson/ExamplePysparkAnnotation). Unfortunately, I 
never found the time to follow through.
That said, If we make a decision on the way to handle it then I believe it 
would be a good idea to start even with the bare minimum and continue to add to 
it (and therefore make it so many people can contribute). The code I added in 
github were basically the things I needed.

To summarize, there are two main ways of doing it (at least in pycharm):

1.   Give the hints as part of the docstring for the function

2.   Create files with the signatures only and mark it for pycharm to use

The advantage of the first is that it is part of the code which means it is 
easier to make it updated. The main issue with this is that supporting auto 
generated code (as is the case in most functions) can be a little awkward and 
actually is a relate to a separate issue as it means pycharm marks most of the 
functions as an error (i.e. pyspark.sql.functions.XXX is marked as not there…)

The advantage of the second is that it is completely separate so messing around 
with it cannot harm the main code. The disadvantages are that we would need to 
maintain it manually and that to use it in pycharm, one needs to add them to 
the path (in pycharm this means mark them as source, I am not sure how other 
IDEs support this).

Lastly, I only tested these two solutions for pycharm. I am not sure of their 
support in other IDEs.


Thanks,
  Assaf.

From: rxin [via Apache Spark Developers List] 
[mailto:ml+s1001551n21611...@n3.nabble.com]
Sent: Tuesday, May 23, 2017 1:10 PM
To: Mendelson, Assaf
Subject: Re: [PYTHON] PySpark typing hints

Seems useful to do. Is there a way to do this so it doesn't break Python 2.x?


On Sun, May 14, 2017 at 11:44 PM, Maciej Szymkiewicz <[hidden 
email]> wrote:

Hi everyone,

For the last few months I've been working on static type annotations for 
PySpark. For those of you, who are not familiar with the idea, typing hints 
have been introduced by PEP 484 (https://www.python.org/dev/peps/pep-0484/) and 
further extended with PEP 526 (https://www.python.org/dev/peps/pep-0526/) with 
the main goal of providing information required for static analysis. Right now 
there a few tools which support typing hints, including Mypy 
(https://github.com/python/mypy) and PyCharm 
(https://www.jetbrains.com/help/pycharm/2017.1/type-hinting-in-pycharm.html).  
Type hints can be added using function annotations 
(https://www.python.org/dev/peps/pep-3107/, Python 3 only), docstrings, or 
source independent stub files 
(https://www.python.org/dev/peps/pep-0484/#stub-files). Typing is optional, 
gradual and has no runtime impact.

At this moment I've annotated majority of the API, including majority of 
pyspark.sql and pyspark.ml. At this moment project is still 
rough around the edges, and may result in both false positive and false 
negatives, but I think it become mature enough to be useful in practice.
The current version is compatible only with Python 3, but it is possible, with 
some limitations, to backport it to Python 2 (though it is not on my todo list).

There is a number of possible benefits for PySpark users and developers:

  *   Static analysis can detect a number of common mistakes to prevent runtime 
failures. Generic self is still fairly limited, so it is more useful with 
DataFrames, SS and ML than RDD, DStreams or RDD.
  *   Annotations can be used for documenting complex signatures 
(https://git.io/v95JN) including dependencies on arguments and value 
(https://git.io/v95JA).
  *   Detecting possible bugs in Spark (SPARK-20631) .
  *   Showing API inconsistencies.

Roadmap

  *   Update the project to reflect Spark 2.2.
  *   Refine existing annotations.

If there will be enough interest I am happy to contribute this back to Spark or 
submit to Typeshed (https://github.com/python/typeshed -  this would require a 
formal ASF approval, and since Typeshed doesn't provide versioning, is probably 
not the best option in our case).

Further inforamtion:

  *   https://github.com/zero323/pyspark-stubs - GitHub repository

  *   
https://speakerdeck.com/marcobonzanini/static-type-analysis-for-robust-data-products-at-pydata-london-2017
 - interesting presentation by Marco Bonzanini

--

Best,

Maciej



If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/PYTHON-PySpark-typing-hints-tp21560p21611.html
To start a new topic under Apache Spark Developers List, email 
ml+s1001551n1...@n3.nabble.com
To unsubscribe from Apache Spark Developers List, click 
here

RE: Will .count() always trigger an evaluation of each row?

2017-02-19 Thread assaf.mendelson
I am not saying you should cache everything, just that it is a valid use case.


From: Jörn Franke [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n21026...@n3.nabble.com]
Sent: Sunday, February 19, 2017 12:13 PM
To: Mendelson, Assaf
Subject: Re: Will .count() always trigger an evaluation of each row?

I think your example relates to scheduling, e.g. it makes sense to use oozie or 
similar to fetch the data at specific point in times.

I am also not a big fan of caching everything. In a Multi-user cluster with a 
lot of Applications you waste a lot of resources making everybody less 
efficient.

On 19 Feb 2017, at 10:13, assaf.mendelson <[hidden 
email]> wrote:
Actually, when I did a simple test on parquet 
(spark.read.parquet(“somefile”).cache().count()) the UI showed me that the 
entire file is cached. Is this just a fluke?

In any case I believe the question is still valid, how to make sure a dataframe 
is cached.
Consider for example a case where we read from a remote host (which is costly) 
and we want to make sure the original read is done at a specific time (when the 
network is less crowded).
I for one used .count() till now but if this is not guaranteed to cache, then 
how would I do that? Of course I could always save the dataframe to disk but 
that would cost a lot more in performance than I would like…

As for doing a map partitions for the dataset, wouldn’t that cause the row to 
be converted to the case class for each row? That could also be heavy.
Maybe cache should have a lazy parameter which would be false by default but we 
could call .cache(true) to make it materialize (similar to what we have with 
checkpoint).
Assaf.

From: Matei Zaharia [via Apache Spark Developers List] [mailto:ml-node+[hidden 
email]]
Sent: Sunday, February 19, 2017 9:30 AM
To: Mendelson, Assaf
Subject: Re: Will .count() always trigger an evaluation of each row?

Count is different on DataFrames and Datasets from RDDs. On RDDs, it always 
evaluates everything, but on DataFrame/Dataset, it turns into the equivalent of 
"select count(*) from ..." in SQL, which can be done without scanning the data 
for some data formats (e.g. Parquet). On the other hand though, caching a 
DataFrame / Dataset does require everything to be cached.

Matei

On Feb 18, 2017, at 2:16 AM, Sean Owen <[hidden 
email]> wrote:

I think the right answer is "don't do that" but if you really had to you could 
trigger a Dataset operation that does nothing per partition. I presume that 
would be more reliable because the whole partition has to be computed to make 
it available in practice. Or, go so far as to loop over every element.

On Sat, Feb 18, 2017 at 3:15 AM Nicholas Chammas <[hidden 
email]> wrote:

Especially during development, people often use .count() or .persist().count() 
to force evaluation of all rows — exposing any problems, e.g. due to bad data — 
and to load data into cache to speed up subsequent operations.

But as the optimizer gets smarter, I’m guessing it will eventually learn that 
it doesn’t have to do all that work to give the correct count. (This blog 
post<https://databricks.com/blog/2017/02/16/processing-trillion-rows-per-second-single-machine-can-nested-loop-joins-fast.html>
 suggests that something like this is already happening.) This will change 
Spark’s practical behavior while technically preserving semantics.

What will people need to do then to force evaluation or caching?

Nick
​



If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/Will-count-always-trigger-an-evaluation-of-each-row-tp21018p21024.html
To start a new topic under Apache Spark Developers List, email [hidden 
email]
To unsubscribe from Apache Spark Developers List, click here.
NAML<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>


View this message in context: RE: Will .count() always trigger an evaluation of 
each 
row?<http://apache-spark-developers-list.1001551.n3.nabble.com/Will-count-always-trigger-an-evaluation-of-each-row-tp21018p21025.html>
Sent from the Apache Spark Developers List mailing list 
archive<http://apache-spark-developers-list.1001551.n3.nabble.com/> at 
Nabble.com<http://Nabble.com>.


If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/Will-count-always-trigger-an-evaluation-of-each-row-tp21018p21026.html
To start a new

RE: Will .count() always trigger an evaluation of each row?

2017-02-19 Thread assaf.mendelson
Actually, when I did a simple test on parquet 
(spark.read.parquet(“somefile”).cache().count()) the UI showed me that the 
entire file is cached. Is this just a fluke?

In any case I believe the question is still valid, how to make sure a dataframe 
is cached.
Consider for example a case where we read from a remote host (which is costly) 
and we want to make sure the original read is done at a specific time (when the 
network is less crowded).
I for one used .count() till now but if this is not guaranteed to cache, then 
how would I do that? Of course I could always save the dataframe to disk but 
that would cost a lot more in performance than I would like…

As for doing a map partitions for the dataset, wouldn’t that cause the row to 
be converted to the case class for each row? That could also be heavy.
Maybe cache should have a lazy parameter which would be false by default but we 
could call .cache(true) to make it materialize (similar to what we have with 
checkpoint).
Assaf.

From: Matei Zaharia [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n21024...@n3.nabble.com]
Sent: Sunday, February 19, 2017 9:30 AM
To: Mendelson, Assaf
Subject: Re: Will .count() always trigger an evaluation of each row?

Count is different on DataFrames and Datasets from RDDs. On RDDs, it always 
evaluates everything, but on DataFrame/Dataset, it turns into the equivalent of 
"select count(*) from ..." in SQL, which can be done without scanning the data 
for some data formats (e.g. Parquet). On the other hand though, caching a 
DataFrame / Dataset does require everything to be cached.

Matei

On Feb 18, 2017, at 2:16 AM, Sean Owen <[hidden 
email]> wrote:

I think the right answer is "don't do that" but if you really had to you could 
trigger a Dataset operation that does nothing per partition. I presume that 
would be more reliable because the whole partition has to be computed to make 
it available in practice. Or, go so far as to loop over every element.

On Sat, Feb 18, 2017 at 3:15 AM Nicholas Chammas <[hidden 
email]> wrote:

Especially during development, people often use .count() or .persist().count() 
to force evaluation of all rows — exposing any problems, e.g. due to bad data — 
and to load data into cache to speed up subsequent operations.

But as the optimizer gets smarter, I’m guessing it will eventually learn that 
it doesn’t have to do all that work to give the correct count. (This blog 
post
 suggests that something like this is already happening.) This will change 
Spark’s practical behavior while technically preserving semantics.

What will people need to do then to force evaluation or caching?

Nick
​



If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/Will-count-always-trigger-an-evaluation-of-each-row-tp21018p21024.html
To start a new topic under Apache Spark Developers List, email 
ml-node+s1001551n1...@n3.nabble.com
To unsubscribe from Apache Spark Developers List, click 
here.
NAML




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Will-count-always-trigger-an-evaluation-of-each-row-tp21018p21025.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

spark support on windows

2017-01-16 Thread assaf.mendelson
Hi,
In the documentation it says spark is supported on windows.
The problem, however, is that the documentation description on windows is 
lacking. There are sources (such as 
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-tips-and-tricks-running-spark-windows.html
 and many more) which explain how to make spark run on windows, however, they 
all involve downloading a third party winutil.exe file.
Since this file is downloaded from a repository belonging to a private person, 
this can be an issue (e.g. getting approval to install on a company computer 
can be an issue).
There are tons of jira tickets on the subject (most are marked as duplicate or 
not a problem), however, I believe that if we say spark is supported on windows 
there should be a clear explanation on how to run it and one shouldn't have to 
use executable from a private person.

If indeed using winutil.exe is the correct solution, I believe it should be 
bundled to the spark binary distribution along with clear instructions on how 
to add it.
Assaf.




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/spark-support-on-windows-tp20614.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

RE: repeated unioning of dataframes take worse than O(N^2) time

2016-12-29 Thread assaf.mendelson
Hi,

I understand that doing a union creates a nested structures, however why isn’t 
it O(N)? If I look at the code it seems this should be a tree merge of two 
plans, that should occur at O(N) not O(N^2).
Even when running the plan that should be O(N*LOG(N)) instead of O(N^2) or 
worse.
Assaf.

From: Maciej Szymkiewicz [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n20395...@n3.nabble.com]
Sent: Thursday, December 29, 2016 7:39 PM
To: Mendelson, Assaf
Subject: Re: repeated unioning of dataframes take worse than O(N^2) time


Iterative union like this creates a deeply nested recursive structure in a 
similar manner to described here http://stackoverflow.com/q/34461804

You can try something like this http://stackoverflow.com/a/37612978 but there 
is of course on overhead of conversion between Dataset and RDD.


On 12/29/2016 06:21 PM, assaf.mendelson wrote:
Hi,

I have been playing around with doing union between a large number of 
dataframes and saw that the performance of the actual union (not the action) is 
worse than O(N^2). Since a union basically defines a lineage (i.e. current + 
union with of other as a child) this should be almost instantaneous, however in 
practice this can be very costly.

I was wondering why this is and if there is a way to fix this.

A sample test:
def testUnion(n: Int): Long = {
  val dataframes = for {
x <- 0 until n
  } yield spark.range(1000)

  val t0 = System.currentTimeMillis()
  val allDF = dataframes.reduceLeft(_.union(_))
  val t1 = System.currentTimeMillis()
  val totalTime = t1 - t0
  println(s"$totalTime miliseconds")
  totalTime
}

scala> testUnion(100)
193 miliseconds
res5: Long = 193

scala> testUnion(200)
759 miliseconds
res1: Long = 759

scala> testUnion(500)
4438 miliseconds
res2: Long = 4438

scala> testUnion(1000)
18441 miliseconds
res6: Long = 18441

scala> testUnion(2000)
88498 miliseconds
res7: Long = 88498

scala> testUnion(5000)
822305 miliseconds
res8: Long = 822305




View this message in context: repeated unioning of dataframes take worse than 
O(N^2) 
time<http://apache-spark-developers-list.1001551.n3.nabble.com/repeated-unioning-of-dataframes-take-worse-than-O-N-2-time-tp20394.html>
Sent from the Apache Spark Developers List mailing list 
archive<http://apache-spark-developers-list.1001551.n3.nabble.com/> at 
Nabble.com.



--

Maciej Szymkiewicz


If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/repeated-unioning-of-dataframes-take-worse-than-O-N-2-time-tp20394p20395.html
To start a new topic under Apache Spark Developers List, email 
ml-node+s1001551n1...@n3.nabble.com<mailto:ml-node+s1001551n1...@n3.nabble.com>
To unsubscribe from Apache Spark Developers List, click 
here<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==>.
NAML<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/repeated-unioning-of-dataframes-take-worse-than-O-N-2-time-tp20394p20403.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

repeated unioning of dataframes take worse than O(N^2) time

2016-12-29 Thread assaf.mendelson
Hi,

I have been playing around with doing union between a large number of 
dataframes and saw that the performance of the actual union (not the action) is 
worse than O(N^2). Since a union basically defines a lineage (i.e. current + 
union with of other as a child) this should be almost instantaneous, however in 
practice this can be very costly.

I was wondering why this is and if there is a way to fix this.

A sample test:
def testUnion(n: Int): Long = {
  val dataframes = for {
x <- 0 until n
  } yield spark.range(1000)

  val t0 = System.currentTimeMillis()
  val allDF = dataframes.reduceLeft(_.union(_))
  val t1 = System.currentTimeMillis()
  val totalTime = t1 - t0
  println(s"$totalTime miliseconds")
  totalTime
}

scala> testUnion(100)
193 miliseconds
res5: Long = 193

scala> testUnion(200)
759 miliseconds
res1: Long = 759

scala> testUnion(500)
4438 miliseconds
res2: Long = 4438

scala> testUnion(1000)
18441 miliseconds
res6: Long = 18441

scala> testUnion(2000)
88498 miliseconds
res7: Long = 88498

scala> testUnion(5000)
822305 miliseconds
res8: Long = 822305






--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/repeated-unioning-of-dataframes-take-worse-than-O-N-2-time-tp20394.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

RE: Shuffle intermidiate results not being cached

2016-12-27 Thread assaf.mendelson
I understand the actual dataframe is different, but the underlying partitions 
are not (hence the importance of mark's response). The code you suggested would 
not work as allDF and x would have different schema's (x is the original and 
allDF becomes the grouped).
I can do something like this:
  var totalTime: Long = 0
  var allDF: DataFrame = null
  for {
x <- dataframes
  } {
val timeLen = time {
  val grouped = x.groupBy("cat1", "cat2").agg(sum($"valToAdd").alias("v"))
  allDF = if (allDF == null) grouped else {
allDF.union(grouped).groupBy("cat1", "cat2").agg(sum($"v").alias("v"))
  }
  val grouped2 = allDF.groupBy("cat1").agg(sum($"v"), count($"cat2"))
  grouped2.show()
}
totalTime += timeLen
println(s"Took $timeLen miliseconds")
  }
  println(s"Overall time was $totalTime miliseconds")
}

and this indeed improves performance (I actually had a couple more tries) but:

1.   This still gives crappy performance (for 167 slices I get a throughput 
which is 10 times lower than batch after doing some tuning including caching 
and coalescing)

2.   This works because the aggregation here is sum and we don't forget. 
For more general aggregations we would have to join them together (can't do it 
for count distinct for example) and we will need to "forget" frames when moving 
out of the window (we can subtract a sum but not a max).

The best solution I found so far (performance wise) was to write a custom UDAF 
which does the window internally. This was still 8 times lower throughput than 
batch and required a lot of coding and is not a general solution.

I am looking for an approach to improve the performance even more (preferably 
to either be on par with batch or a relatively low factor which remains 
constant when the number of slices rise) and including the option to "forget" 
frames.

Assaf.




From: Liang-Chi Hsieh [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n20371...@n3.nabble.com]
Sent: Wednesday, December 28, 2016 3:59 AM
To: Mendelson, Assaf
Subject: RE: Shuffle intermidiate results not being cached


Hi,

Every iteration the data you run aggregation on it is different. As I showed in 
previous reply:

1st iteration: aggregation(x1 union x2)
2nd iteration: aggregation(x3 union (x1 union x2))
3rd iteration: aggregation(x4 union(x3 union (x1 union x2)))

In 1st you run aggregation on the data of x1 and x2. In 2nd the data is x1, x2 
and x3. Even you work on the same RDD, you won't see reuse of the shuffle data 
because the shuffle data is different.

In your second example, I think the way to reduce the computation is like:

var totalTime: Long = 0
var allDF: org.apache.spark.sql.DataFrame = null
for {
  x <- dataframes
} {
  val timeLen = time {
allDF = if (allDF == null) x else allDF.union(x) // Union previous 
aggregation summary with new dataframe in this window
val grouped = allDF.groupBy("cat1", "cat2").agg(sum($"valToAdd").alias("v"))
val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2"))
grouped2.show()
allDF = grouped  // Replace the union of data with aggregated summary
  }
  totalTime += timeLen
  println(s"Took $timeLen miliseconds")
}
println(s"Total time was $totalTime miliseconds")

You don't need to recompute the aggregation of previous dataframes in each 
iteration. You just need to get the summary and union it with new dataframe to 
compute the newer aggregation summary in next iteration. It is more similar to 
streaming case, I don't think you can/should recompute all the data since the 
beginning of a stream.

assaf.mendelson wrote
The reason I thought some operations would be reused is the fact that spark 
automatically caches shuffle data which means the partial aggregation for 
pervious dataframes would be saved. Unfortunatly, as Mark Hamstra explained 
this is not the case because this is considered a new RDD and therefore the 
previous data is lost.

I am still wondering if there is any way to do high performance streaming of 
SQL. Basically this is not far from what DStream would do assuming we convert a 
sliding window (e.g. 24 hours every 15 minutes) as we would be doing a 
foreachRDD which would do the joining behind the scenes.
The problem is that any attempt to do a streaming like this results in 
performance which is hundreds of times slower than batch.
Is there a correct way to do such an aggregation on streaming data (using 
dataframes rather than RDD operations).
Assaf.



From: Liang-Chi Hsieh [via Apache Spark Developers List] [mailto:[hidden 
email]]
Sent: Monday, December 26, 2016 5:42 PM
To: Mendelson, Assaf
Subject: Re: Shuffle intermidiate results not being cached


Hi,

Let 

RE: Shuffle intermidiate results not being cached

2016-12-26 Thread assaf.mendelson
The reason I thought some operations would be reused is the fact that spark 
automatically caches shuffle data which means the partial aggregation for 
pervious dataframes would be saved. Unfortunatly, as Mark Hamstra explained 
this is not the case because this is considered a new RDD and therefore the 
previous data is lost.

I am still wondering if there is any way to do high performance streaming of 
SQL. Basically this is not far from what DStream would do assuming we convert a 
sliding window (e.g. 24 hours every 15 minutes) as we would be doing a 
foreachRDD which would do the joining behind the scenes.
The problem is that any attempt to do a streaming like this results in 
performance which is hundreds of times slower than batch.
Is there a correct way to do such an aggregation on streaming data (using 
dataframes rather than RDD operations).
Assaf.



From: Liang-Chi Hsieh [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n20361...@n3.nabble.com]
Sent: Monday, December 26, 2016 5:42 PM
To: Mendelson, Assaf
Subject: Re: Shuffle intermidiate results not being cached


Hi,

Let me quote your example codes:

var totalTime: Long = 0
var allDF: org.apache.spark.sql.DataFrame = null
for {
  x <- dataframes
} {
  val timeLen = time {
allDF = if (allDF == null) x else allDF.union(x)
val grouped = allDF.groupBy("cat1", "cat2").agg(sum($"valToAdd").alias("v"))
val grouped2 = grouped.groupBy("cat1").agg(sum($"v"), count($"cat2"))
grouped2.show()
  }
  totalTime += timeLen
  println(s"Took $timeLen miliseconds")
}
println(s"Total time was $totalTime miliseconds")


Basically what you do is to union some dataframes for each iteration, and do 
aggregation on this union data. I don't see any reused operations.

1st iteration: aggregation(x1 union x2)
2nd iteration: aggregation(x3 union (x1 union x2))
3rd iteration: aggregation(x4 union(x3 union (x1 union x2)))
...

Your first example just does two aggregation operations. But your second 
example like above does this aggregation operations for each iteration. So the 
time of second example grows as the iteration increases.

Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/


If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20361.html
To start a new topic under Apache Spark Developers List, email 
ml-node+s1001551n1...@n3.nabble.com
To unsubscribe from Apache Spark Developers List, click 
here.
NAML




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358p20368.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Shuffle intermidiate results not being cached

2016-12-26 Thread assaf.mendelson
Hi,

Sorry to be bothering everyone on the holidays but I have found what may be a 
bug.

I am doing a "manual" streaming (see 
http://stackoverflow.com/questions/41266956/apache-spark-streaming-performance 
for the specific code) where I essentially read an additional dataframe each 
time from file, union it with previous dataframes to create a "window" and then 
do double aggregation on the result.
Having looked at the documentation 
(https://spark.apache.org/docs/latest/programming-guide.html#which-storage-level-to-choose
 right above the headline) I expected spark to automatically cache the partial 
aggregation for each dataframe read and then continue with the aggregations 
from there. Instead it seems it reads each dataframe from file all over again.
Is this a bug? Am I doing something wrong?

Thanks.
Assaf.




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Shuffle-intermidiate-results-not-being-cached-tp20358.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

RE: Aggregating over sorted data

2016-12-22 Thread assaf.mendelson
It seems that this aggregation is for dataset operations only. I would have 
hoped to be able to do dataframe aggregation. Something along the line of: 
sort_df(df).agg(my_agg_func)

In any case, note that this kind of sorting is less efficient than the sorting 
done in window functions for example. Specifically here what is happening is 
that first the data is shuffled and then the entire partition is sorted. It is 
possible to do it another way (although I have no idea how to do it in spark 
without writing a UDAF which is probably very inefficient). The other way would 
be to collect everything by key in each partition, sort within the key (which 
would be a lot faster since there are fewer elements) and then merge the 
results.

I was hoping to find something like: Efficient sortByKey to work with…

From: Koert Kuipers [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n20332...@n3.nabble.com]
Sent: Thursday, December 22, 2016 7:14 AM
To: Mendelson, Assaf
Subject: Re: Aggregating over sorted data

it can also be done with repartition + sortWithinPartitions + mapPartitions.
perhaps not as convenient but it does not rely on undocumented behavior.
i used this approach in spark-sorted. see here:
https://github.com/tresata/spark-sorted/blob/master/src/main/scala/com/tresata/spark/sorted/sql/GroupSortedDataset.scala

On Wed, Dec 21, 2016 at 9:44 PM, Liang-Chi Hsieh <[hidden 
email]> wrote:

I agreed that to make sure this work, you might need to know the Spark
internal implementation for APIs such as `groupBy`.

But without any more changes to current Spark implementation, I think this
is the one possible way to achieve the required function to aggregate on
sorted data per key.





-
Liang-Chi Hsieh | @viirya
Spark Technology Center
http://www.spark.tc/
--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp1p20331.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: [hidden 
email]



If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp1p20332.html
To start a new topic under Apache Spark Developers List, email 
ml-node+s1001551n1...@n3.nabble.com
To unsubscribe from Apache Spark Developers List, click 
here.
NAML




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp1p20334.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

RE: [SPARK-17845] [SQL][PYTHON] More self-evident window function frame boundary API

2016-11-30 Thread assaf.mendelson
I may be mistaken but if I remember correctly spark behaves differently when it 
is bounded in the past and when it is not. Specifically I seem to recall a fix 
which made sure that when there is no lower bound then the aggregation is done 
one by one instead of doing the whole range for each window. So I believe it 
should be configured exactly the same as in scala/java so the optimization 
would take place.
Assaf.

From: rxin [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n20069...@n3.nabble.com]
Sent: Wednesday, November 30, 2016 8:35 PM
To: Mendelson, Assaf
Subject: Re: [SPARK-17845] [SQL][PYTHON] More self-evident window function 
frame boundary API

Yes I'd define unboundedPreceding to -sys.maxsize, but also any value less than 
min(-sys.maxsize, _JAVA_MIN_LONG) are considered unboundedPreceding too. We 
need to be careful with long overflow when transferring data over to Java.


On Wed, Nov 30, 2016 at 10:04 AM, Maciej Szymkiewicz <[hidden 
email]> wrote:

It is platform specific so theoretically can be larger, but 2**63 - 1 is a 
standard on 64 bit platform and 2**31 - 1 on 32bit platform. I can submit a 
patch but I am not sure how to proceed. Personally I would set

unboundedPreceding = -sys.maxsize

unboundedFollowing = sys.maxsize

to keep backwards compatibility.
On 11/30/2016 06:52 PM, Reynold Xin wrote:
Ah ok for some reason when I did the pull request sys.maxsize was much larger 
than 2^63. Do you want to submit a patch to fix this?


On Wed, Nov 30, 2016 at 9:48 AM, Maciej Szymkiewicz <[hidden 
email]> wrote:

The problem is that -(1 << 63) is -(sys.maxsize + 1) so the code which used to 
work before is off by one.
On 11/30/2016 06:43 PM, Reynold Xin wrote:
Can you give a repro? Anything less than -(1 << 63) is considered negative 
infinity (i.e. unbounded preceding).

On Wed, Nov 30, 2016 at 8:27 AM, Maciej Szymkiewicz <[hidden 
email]> wrote:
Hi,

I've been looking at the SPARK-17845 and I am curious if there is any
reason to make it a breaking change. In Spark 2.0 and below we could use:

Window().partitionBy("foo").orderBy("bar").rowsBetween(-sys.maxsize,
sys.maxsize))

In 2.1.0 this code will silently produce incorrect results (ROWS BETWEEN
-1 PRECEDING AND UNBOUNDED FOLLOWING) Couldn't we use
Window.unboundedPreceding equal -sys.maxsize to ensure backward
compatibility?

--

Maciej Szymkiewicz


-
To unsubscribe e-mail: [hidden 
email]



--

Maciej Szymkiewicz



--

Maciej Szymkiewicz



If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/SPARK-17845-SQL-PYTHON-More-self-evident-window-function-frame-boundary-API-tp20064p20069.html
To start a new topic under Apache Spark Developers List, email 
ml-node+s1001551n1...@n3.nabble.com
To unsubscribe from Apache Spark Developers List, click 
here.
NAML




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/SPARK-17845-SQL-PYTHON-More-self-evident-window-function-frame-boundary-API-tp20064p20074.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

RE: Handling questions in the mailing lists

2016-11-24 Thread assaf.mendelson
I am not sure what is enough traffic. Some of the SE groups already existing do 
not have that much traffic.
Specifically the  user mailing list has ~50 emails per day. It wouldn’t be much 
of a stretch to extract 1-2 questions per day from that.  In the regular 
stackoverflow the apache-spark had more than 50 new questions in the last 24 
hours alone 
(http://stackoverflow.com/questions/tagged/apache-spark?sort=newest&pageSize=50).

I believe this should be enough traffic (and the traffic would rise once 
quality answers begin to appear).


From: Sean Owen [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n2000...@n3.nabble.com]
Sent: Thursday, November 24, 2016 12:32 PM
To: Mendelson, Assaf
Subject: Re: Handling questions in the mailing lists

I don't think there's nearly enough traffic to sustain a stand-alone SE. I 
helped mod the Data Science SE and it's still not technically critical mass 
after 2 years. It would just fracture the discussion to yet another place.
On Thu, Nov 24, 2016 at 6:52 AM assaf.mendelson <[hidden 
email]> wrote:
Sorry to reawaken this, but I just noticed it is possible to propose new topic 
specific sites (http://area51.stackexchange.com/faq)  for stack overflow. So 
for example we might have a 
spark.stackexchange.com<http://spark.stackexchange.com> spark specific site.
The advantage of such a site are many. First of all it is spark specific. 
Secondly the reputation of people would be on spark and not on general 
questions and lastly (and most importantly in my opinion) it would have spark 
based moderators (which are all spark moderator as opposed to general 
technology).

The process of creating such a site is not complicated. Basically someone 
creates a proposal (I have no problem doing so). Then creating 5 example 
questions (something we want on the site) and get 5 people need to ‘follow’ it 
within 3 days. This creates a “definition” phase. The goal is to get at least 
40 questions that embody the goal of the site and have at least 10 net votes 
and enough people follow it. When enough traction has been made (enough 
questions and enough followers) then the site moves to commitment phase. In 
this phase users “commit” to being on the site (basically this is aimed to see 
the community of experts is big enough). Once all this happens the site moves 
into beta. This means the site becomes active and it will become a full site if 
it sees enough traction.

I would suggest trying to set this up.

Thanks,
Assaf



If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/Handling-questions-in-the-mailing-lists-tp19690p20007.html
To start a new topic under Apache Spark Developers List, email 
ml-node+s1001551n1...@n3.nabble.com<mailto:ml-node+s1001551n1...@n3.nabble.com>
To unsubscribe from Apache Spark Developers List, click 
here<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==>.
NAML<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Handling-questions-in-the-mailing-lists-tp19690p20008.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

RE: Handling questions in the mailing lists

2016-11-23 Thread assaf.mendelson
Sorry to reawaken this, but I just noticed it is possible to propose new topic 
specific sites (http://area51.stackexchange.com/faq)  for stack overflow. So 
for example we might have a spark.stackexchange.com spark specific site.
The advantage of such a site are many. First of all it is spark specific. 
Secondly the reputation of people would be on spark and not on general 
questions and lastly (and most importantly in my opinion) it would have spark 
based moderators (which are all spark moderator as opposed to general 
technology).

The process of creating such a site is not complicated. Basically someone 
creates a proposal (I have no problem doing so). Then creating 5 example 
questions (something we want on the site) and get 5 people need to 'follow' it 
within 3 days. This creates a "definition" phase. The goal is to get at least 
40 questions that embody the goal of the site and have at least 10 net votes 
and enough people follow it. When enough traction has been made (enough 
questions and enough followers) then the site moves to commitment phase. In 
this phase users "commit" to being on the site (basically this is aimed to see 
the community of experts is big enough). Once all this happens the site moves 
into beta. This means the site becomes active and it will become a full site if 
it sees enough traction.

I would suggest trying to set this up.

Thanks,
Assaf


From: Denny Lee [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n19916...@n3.nabble.com]
Sent: Wednesday, November 16, 2016 4:33 PM
To: Mendelson, Assaf
Subject: Re: Handling questions in the mailing lists

Awesome stuff! Thanks Sean! :-)
On Wed, Nov 16, 2016 at 05:57 Sean Owen <[hidden 
email]> wrote:
I updated the wiki to point to the /community.html page. (We're going to 
migrate the wiki real soon now anyway)

I updated the /community.html page per this thread too. PR: 
https://github.com/apache/spark-website/pull/16


On Tue, Nov 15, 2016 at 2:49 PM assaf.mendelson <[hidden 
email]> wrote:

Should probably also update the helping others section in the how to contribute 
section (https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-ContributingbyHelpingOtherUsers<https://cwiki.apache.org/confluence/display/SPARK/Contributing+to&%2343;Spark%23ContributingtoSpark-ContributingbyHelpingOtherUsers>">https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-ContributingbyHelpingOtherUsers)

Assaf.



From: Denny Lee [via Apache Spark Developers List] [mailto:[hidden 
email][hidden 
email]<http://user/SendEmail.jtp?type=node&node=19891&i=0>]
Sent: Sunday, November 13, 2016 8:52 AM



To: Mendelson, Assaf
Subject: Re: Handling questions in the mailing lists







Hey Reynold,






Looks like we all of the proposed changes into Proposed Community Mailing Lists 
/ StackOverflow 
Changes<https://docs.google.com/document/d/1N0pKatcM15cqBPqFWCqIy6jdgNzIoacZlYDCjufBh2s/edit#heading=h.xshc1bv4sn3p>.
 Anything else we can do to update the Spark Community page / welcome email?





Meanwhile, let's all start answering questions on SO, eh?! :)


Denny



On Thu, Nov 10, 2016 at 1:54 PM Holden Karau <[hidden 
email]<http://user/SendEmail.jtp?type=node&node=19835&i=0>> wrote:




That's a good question, looking at 
http://stackoverflow.com/tags/apache-spark/topusers shows a few contributors 
who have already been active on SO including some committers and PMC members 
with very high overall SO reputations for any administrative needs (as well as 
a number of other contributors besides just PMC/committers).



On Wed, Nov 9, 2016 at 2:18 AM, assaf.mendelson <[hidden 
email]<http://user/SendEmail.jtp?type=node&node=19835&i=1>> wrote:



I was just wondering, before we move on to SO.

Do we have enough contributors with enough reputation do manage things in SO?

We would need contributors with enough reputation to have relevant privilages.

For example: creating tags (requires 1500 reputation), edit questions and 
answers (2000), create tag synonums (2500), approve tag wiki edits (5000), 
access to moderator tools (1, this is required to delete questions etc.), 
protect questions (15000).

All of these are important if we plan to have SO as a main resource.

I know I originally suggested SO, however, if we do not have contributors with 
the required privileges and the willingness to help manage everything then I am 
not sure this is a good fit.

Assaf.





From: Denny Lee [via Apache Spark Developers List] [mailto:[hidden 
email]<http://user/SendEmail.jtp?type=node&node=19835&i=2>[hidden 
email]<http://user/SendEmail.jtp?type=node&node=19800&i=0>]




Sent: Wednesday, November 09, 2016 9:54 AM
To: Mendelson, Assaf
Subject: Re: Handling questions in the mailing lists






Agreed that b

Aggregating over sorted data

2016-11-23 Thread assaf.mendelson
Hi,
An issue I have encountered frequently is the need to look at data in an 
ordered manner per key.
A common way of doing this can be seen in the classic map reduce as the shuffle 
stage provides sorted data per key and one can therefore do a lot with that.
It is of course relatively easy to achieve this by using RDD but that would 
mean moving to RDD and back which has a non-insignificant performance penalty 
(beyond the fact that we lose any catalyst optimization).
We can use SQL window functions but that is not an ideal solution either. 
Beyond the fact that window functions are much slower than aggregate functions 
(as we need to generate a result for every record), we also can't join them 
together (i.e. if we have two window functions on the same window, it is still 
two separate scans).

Ideally, I would have liked to see something like: 
df.groupBy(col1).sortBy(col2).agg(...) and have the aggregations work on the 
sorted data. That would enable to use both the existing functions and UDAF 
where we can assume the order (and do any windowing we need as part of the 
function itself which is relatively easy in many cases).

I have tried to look for this and seen many questions on the subject but no 
answers.

I was hoping I missed something (I have seen that the SQL CLUSTER BY command 
repartitions and sorts accordingly but from my understanding it does not 
promise that this would remain true if we do a groupby afterwards). If I 
didn't, I believe this should be a feature to add (I can open a JIRA if people 
think it is a good idea).
Assaf.





--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Aggregating-over-sorted-data-tp1.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

RE: structured streaming and window functions

2016-11-17 Thread assaf.mendelson
It is true that this is sessionizing but I brought it as an example for finding 
an ordered pattern in the data.
In general, using simple window (e.g. 24 hours) in structured streaming is 
explain in the grouping by time and is very clear.
What I was trying to figure out is how to do streaming of cases where you 
actually have to have some sorting to find patterns, especially when some of 
the data may come in late.
I was trying to figure out if there is plan to support this and if so, what 
would be the performance implications.
Assaf.

From: Ofir Manor [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n19935...@n3.nabble.com]
Sent: Thursday, November 17, 2016 5:13 PM
To: Mendelson, Assaf
Subject: Re: structured streaming and window functions

Assaf, I think what you are describing is actually sessionizing, by user, where 
a session is ended by a successful login event.
On each session, you want to count number of failed login events.
If so, this is tracked by https://issues.apache.org/jira/browse/SPARK-10816 
(didn't start yet)


Ofir Manor

Co-Founder & CTO | Equalum

Mobile: +972-54-7801286 | Email: [hidden 
email]

On Thu, Nov 17, 2016 at 2:52 PM, assaf.mendelson <[hidden 
email]> wrote:
Is there a plan to support sql window functions?
I will give an example of use: Let’s say we have login logs. What we want to do 
is for each user we would want to add the number of failed logins for each 
successful login. How would you do it with structured streaming?
As this is currently not supported, is there a plan on how to support it in the 
future?
Assaf.

From: Herman van Hövell tot Westerflier-2 [via Apache Spark Developers List] 
[mailto:[hidden email][hidden 
email]<http://user/SendEmail.jtp?type=node&node=19934&i=0>]
Sent: Thursday, November 17, 2016 1:27 PM
To: Mendelson, Assaf
Subject: Re: structured streaming and window functions

What kind of window functions are we talking about? Structured streaming only 
supports time window aggregates, not the more general sql window function 
(sum(x) over (partition by ... order by ...)) aggregates.

The basic idea is that you use incremental aggregation and store the 
aggregation buffer (not the end result) in a state store after each increment. 
When an new batch comes in, you perform aggregation on that batch, merge the 
result of that aggregation with the buffer in the state store, update the state 
store and return the new result.

This is much harder than it sounds, because you need to maintain state in a 
fault tolerant way and you need to have some eviction policy (watermarks for 
instance) for aggregation buffers to prevent the state store from reaching an 
infinite size.

On Thu, Nov 17, 2016 at 12:19 AM, assaf.mendelson <[hidden 
email]<http://user/SendEmail.jtp?type=node&node=19933&i=0>> wrote:
Hi,
I have been trying to figure out how structured streaming handles window 
functions efficiently.
The portion I understand is that whenever new data arrived, it is grouped by 
the time and the aggregated data is added to the state.
However, unlike operations like sum etc. window functions need the original 
data and can change when data arrives late.
So if I understand correctly, this would mean that we would have to save the 
original data and rerun on it to calculate the window function every time new 
data arrives.
Is this correct? Are there ways to go around this issue?

Assaf.


View this message in context: structured streaming and window 
functions<http://apache-spark-developers-list.1001551.n3.nabble.com/structured-streaming-and-window-functions-tp19930.html>
Sent from the Apache Spark Developers List mailing list 
archive<http://apache-spark-developers-list.1001551.n3.nabble.com/> at 
Nabble.com.



If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/structured-streaming-and-window-functions-tp19930p19933.html
To start a new topic under Apache Spark Developers List, email [hidden 
email]<http://user/SendEmail.jtp?type=node&node=19934&i=1>
To unsubscribe from Apache Spark Developers List, click here.
NAML<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>


View this message in context: RE: structured streaming and window 
functions<http://apache-spark-developers-list.1001551.n3.nabble.com/structured-streaming-and-window-functions-tp19930p19934.html>

Sent from the Apache Spark Developers Li

RE: structured streaming and window functions

2016-11-17 Thread assaf.mendelson
Is there a plan to support sql window functions?
I will give an example of use: Let’s say we have login logs. What we want to do 
is for each user we would want to add the number of failed logins for each 
successful login. How would you do it with structured streaming?
As this is currently not supported, is there a plan on how to support it in the 
future?
Assaf.

From: Herman van Hövell tot Westerflier-2 [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n19933...@n3.nabble.com]
Sent: Thursday, November 17, 2016 1:27 PM
To: Mendelson, Assaf
Subject: Re: structured streaming and window functions

What kind of window functions are we talking about? Structured streaming only 
supports time window aggregates, not the more general sql window function 
(sum(x) over (partition by ... order by ...)) aggregates.

The basic idea is that you use incremental aggregation and store the 
aggregation buffer (not the end result) in a state store after each increment. 
When an new batch comes in, you perform aggregation on that batch, merge the 
result of that aggregation with the buffer in the state store, update the state 
store and return the new result.

This is much harder than it sounds, because you need to maintain state in a 
fault tolerant way and you need to have some eviction policy (watermarks for 
instance) for aggregation buffers to prevent the state store from reaching an 
infinite size.

On Thu, Nov 17, 2016 at 12:19 AM, assaf.mendelson <[hidden 
email]> wrote:
Hi,
I have been trying to figure out how structured streaming handles window 
functions efficiently.
The portion I understand is that whenever new data arrived, it is grouped by 
the time and the aggregated data is added to the state.
However, unlike operations like sum etc. window functions need the original 
data and can change when data arrives late.
So if I understand correctly, this would mean that we would have to save the 
original data and rerun on it to calculate the window function every time new 
data arrives.
Is this correct? Are there ways to go around this issue?

Assaf.


View this message in context: structured streaming and window 
functions<http://apache-spark-developers-list.1001551.n3.nabble.com/structured-streaming-and-window-functions-tp19930.html>
Sent from the Apache Spark Developers List mailing list 
archive<http://apache-spark-developers-list.1001551.n3.nabble.com/> at 
Nabble.com.



If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/structured-streaming-and-window-functions-tp19930p19933.html
To start a new topic under Apache Spark Developers List, email 
ml-node+s1001551n1...@n3.nabble.com<mailto:ml-node+s1001551n1...@n3.nabble.com>
To unsubscribe from Apache Spark Developers List, click 
here<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==>.
NAML<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/structured-streaming-and-window-functions-tp19930p19934.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

structured streaming and window functions

2016-11-17 Thread assaf.mendelson
Hi,
I have been trying to figure out how structured streaming handles window 
functions efficiently.
The portion I understand is that whenever new data arrived, it is grouped by 
the time and the aggregated data is added to the state.
However, unlike operations like sum etc. window functions need the original 
data and can change when data arrives late.
So if I understand correctly, this would mean that we would have to save the 
original data and rerun on it to calculate the window function every time new 
data arrives.
Is this correct? Are there ways to go around this issue?

Assaf.




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/structured-streaming-and-window-functions-tp19930.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

RE: Handling questions in the mailing lists

2016-11-15 Thread assaf.mendelson
Should probably also update the helping others section in the how to contribute 
section 
(https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-ContributingbyHelpingOtherUsers)
Assaf.

From: Denny Lee [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n19835...@n3.nabble.com]
Sent: Sunday, November 13, 2016 8:52 AM
To: Mendelson, Assaf
Subject: Re: Handling questions in the mailing lists

Hey Reynold,

Looks like we all of the proposed changes into Proposed Community Mailing Lists 
/ StackOverflow 
Changes<https://docs.google.com/document/d/1N0pKatcM15cqBPqFWCqIy6jdgNzIoacZlYDCjufBh2s/edit#heading=h.xshc1bv4sn3p>.
  Anything else we can do to update the Spark Community page / welcome email?

Meanwhile, let's all start answering questions on SO, eh?! :)
Denny

On Thu, Nov 10, 2016 at 1:54 PM Holden Karau <[hidden 
email]> wrote:
That's a good question, looking at 
http://stackoverflow.com/tags/apache-spark/topusers shows a few contributors 
who have already been active on SO including some committers and  PMC members 
with very high overall SO reputations for any administrative needs (as well as 
a number of other contributors besides just PMC/committers).

On Wed, Nov 9, 2016 at 2:18 AM, assaf.mendelson <[hidden 
email]> wrote:
I was just wondering, before we move on to SO.
Do we have enough contributors with enough reputation do manage things in SO?
We would need contributors with enough reputation to have relevant privilages.
For example: creating tags (requires 1500 reputation), edit questions and 
answers (2000), create tag synonums (2500), approve tag wiki edits (5000), 
access to moderator tools (1, this is required to delete questions etc.), 
protect questions (15000).
All of these are important if we plan to have SO as a main resource.
I know I originally suggested SO, however, if we do not have contributors with 
the required privileges and the willingness to help manage everything then I am 
not sure this is a good fit.
Assaf.

From: Denny Lee [via Apache Spark Developers List] [mailto:[hidden 
email][hidden 
email]<http://user/SendEmail.jtp?type=node&node=19800&i=0>]
Sent: Wednesday, November 09, 2016 9:54 AM
To: Mendelson, Assaf
Subject: Re: Handling questions in the mailing lists

Agreed that by simply just moving the questions to SO will not solve anything 
but I think the call out about the meta-tags is that we need to abide by SO 
rules and if we were to just jump in and start creating meta-tags, we would be 
violating at minimum the spirit and at maximum the actual conventions around SO.

Saying this, perhaps we could suggest tags that we place in the header of the 
question whether it be SO or the mailing lists that will help us sort through 
all of these questions faster just as you suggested.  The Proposed Community 
Mailing Lists / StackOverflow 
Changes<https://docs.google.com/document/d/1N0pKatcM15cqBPqFWCqIy6jdgNzIoacZlYDCjufBh2s/edit#heading=h.xshc1bv4sn3p>
 has been updated to include suggested tags.  WDYT?

On Tue, Nov 8, 2016 at 11:02 PM assaf.mendelson <[hidden 
email]<http://user/SendEmail.jtp?type=node&node=19799&i=0>> wrote:
I like the document and I think it is good but I still feel like we are missing 
an important part here.

Look at SO today. There are:

-   4658 unanswered questions under apache-spark tag.

-  394 unanswered questions under spark-dataframe tag.

-  639 unanswered questions under apache-spark-sql

-  859 unanswered questions under pyspark

Just moving people to ask there will not help. The whole issue is having people 
answer the questions.

The problem is that many of these questions do not fit SO (but are already 
there so they are noise), are bad (i.e. unclear or hard to answer), orphaned 
etc. while some are simply harder than what people with some experience in 
spark can handle and require more expertise.
The problem is that people with the relevant expertise are drowning in noise. 
This. Is true for the mailing list and this is true for SO.

For this reason I believe that just moving people to SO will not solve anything.

My original thought was that if we had different tags then different people 
could watch open questions on these tags and therefore have a much lower noise. 
I thought that we would have a low tier (current one) of people just not 
following the documentation (which would remain as noise), then a beginner tier 
where we could have people downvoting bad questions but in most cases the 
community can answer the questions because they are common, then a “medium” 
tier which would mean harder questions but that can still be answered by 
advanced users and lastly an “advanced” tier to which committers can actually 
subscribed to (and adding sub tags for subsystems would improve this even more).

I was not aware of SO policy for meta tags (the burnination link is about 
removing 

RE: separate spark and hive

2016-11-15 Thread assaf.mendelson
After looking at the code, I found that spark.sql.catalogImplementation is set 
to “hive”. I would proposed that it should be set to “in-memory” by default (or 
at least have this in the documentation, the configuration documentation at 
http://spark.apache.org/docs/latest/configuration.html has no mentioning of 
hive at all)
Assaf.

From: Mendelson, Assaf
Sent: Tuesday, November 15, 2016 10:11 AM
To: 'rxin [via Apache Spark Developers List]'
Subject: RE: separate spark and hive

Spark shell (and pyspark) by default create the spark session with hive support 
(also true when the session is created using getOrCreate, at least in pyspark)
At a minimum there should be a way to configure it using spark-defaults.conf
Assaf.

From: rxin [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n19882...@n3.nabble.com]
Sent: Tuesday, November 15, 2016 9:46 AM
To: Mendelson, Assaf
Subject: Re: separate spark and hive

If you just start a SparkSession without calling enableHiveSupport it actually 
won't use the Hive catalog support.


On Mon, Nov 14, 2016 at 11:44 PM, Mendelson, Assaf <[hidden 
email]> wrote:
The default generation of spark context is actually a hive context.
I tried to find on the documentation what are the differences between hive 
context and sql context and couldn’t find it for spark 2.0 (I know for previous 
versions there were a couple of functions which required hive context as well 
as window functions but those seem to have all been fixed for spark 2.0).
Furthermore, I can’t seem to find a way to configure spark not to use hive. I 
can only find how to compile it without hive (and having to build from source 
each time is not a good idea for a production system).

I would suggest that working without hive should be either a simple 
configuration or even the default and that if there is any missing 
functionality it should be documented.
Assaf.


From: Reynold Xin [mailto:[hidden 
email]]
Sent: Tuesday, November 15, 2016 9:31 AM
To: Mendelson, Assaf
Cc: [hidden email]
Subject: Re: separate spark and hive

I agree with the high level idea, and thus 
SPARK-15691<https://issues.apache.org/jira/browse/SPARK-15691>.

In reality, it's a huge amount of work to create & maintain a custom catalog. 
It might actually make sense to do, but it just seems a lot of work to do right 
now and it'd take a toll on interoperability.

If you don't need persistent catalog, you can just run Spark without Hive mode, 
can't you?




On Mon, Nov 14, 2016 at 11:23 PM, assaf.mendelson <[hidden 
email]> wrote:
Hi,
Today, we basically force people to use hive if they want to get the full use 
of spark SQL.
When doing the default installation this means that a derby.log and 
metastore_db directory are created where we run from.
The problem with this is that if we run multiple scripts from the same working 
directory we have a problem.
The solution we employ locally is to always run from different directory as we 
ignore hive in practice (this of course means we lose the ability to use some 
of the catalog options in spark session).
The only other solution is to create a full blown hive installation with proper 
configuration (probably for a JDBC solution).

I would propose that in most cases there shouldn’t be any hive use at all. Even 
for catalog elements such as saving a permanent table, we should be able to 
configure a target directory and simply write to it (doing everything file 
based to avoid the need for locking). Hive should be reserved for those who 
actually use it (probably for backward compatibility).

Am I missing something here?
Assaf.


View this message in context: separate spark and 
hive<http://apache-spark-developers-list.1001551.n3.nabble.com/separate-spark-and-hive-tp19879.html>
Sent from the Apache Spark Developers List mailing list 
archive<http://apache-spark-developers-list.1001551.n3.nabble.com/> at 
Nabble.com.




If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/separate-spark-and-hive-tp19879p19882.html
To start a new topic under Apache Spark Developers List, email 
ml-node+s1001551n1...@n3.nabble.com<mailto:ml-node+s1001551n1...@n3.nabble.com>
To unsubscribe from Apache Spark Developers List, click 
here<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==>.
NAML<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.

RE: separate spark and hive

2016-11-15 Thread assaf.mendelson
Spark shell (and pyspark) by default create the spark session with hive support 
(also true when the session is created using getOrCreate, at least in pyspark)
At a minimum there should be a way to configure it using spark-defaults.conf
Assaf.

From: rxin [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n19882...@n3.nabble.com]
Sent: Tuesday, November 15, 2016 9:46 AM
To: Mendelson, Assaf
Subject: Re: separate spark and hive

If you just start a SparkSession without calling enableHiveSupport it actually 
won't use the Hive catalog support.


On Mon, Nov 14, 2016 at 11:44 PM, Mendelson, Assaf <[hidden 
email]> wrote:
The default generation of spark context is actually a hive context.
I tried to find on the documentation what are the differences between hive 
context and sql context and couldn’t find it for spark 2.0 (I know for previous 
versions there were a couple of functions which required hive context as well 
as window functions but those seem to have all been fixed for spark 2.0).
Furthermore, I can’t seem to find a way to configure spark not to use hive. I 
can only find how to compile it without hive (and having to build from source 
each time is not a good idea for a production system).

I would suggest that working without hive should be either a simple 
configuration or even the default and that if there is any missing 
functionality it should be documented.
Assaf.


From: Reynold Xin [mailto:[hidden 
email]]
Sent: Tuesday, November 15, 2016 9:31 AM
To: Mendelson, Assaf
Cc: [hidden email]
Subject: Re: separate spark and hive

I agree with the high level idea, and thus 
SPARK-15691<https://issues.apache.org/jira/browse/SPARK-15691>.

In reality, it's a huge amount of work to create & maintain a custom catalog. 
It might actually make sense to do, but it just seems a lot of work to do right 
now and it'd take a toll on interoperability.

If you don't need persistent catalog, you can just run Spark without Hive mode, 
can't you?




On Mon, Nov 14, 2016 at 11:23 PM, assaf.mendelson <[hidden 
email]> wrote:
Hi,
Today, we basically force people to use hive if they want to get the full use 
of spark SQL.
When doing the default installation this means that a derby.log and 
metastore_db directory are created where we run from.
The problem with this is that if we run multiple scripts from the same working 
directory we have a problem.
The solution we employ locally is to always run from different directory as we 
ignore hive in practice (this of course means we lose the ability to use some 
of the catalog options in spark session).
The only other solution is to create a full blown hive installation with proper 
configuration (probably for a JDBC solution).

I would propose that in most cases there shouldn’t be any hive use at all. Even 
for catalog elements such as saving a permanent table, we should be able to 
configure a target directory and simply write to it (doing everything file 
based to avoid the need for locking). Hive should be reserved for those who 
actually use it (probably for backward compatibility).

Am I missing something here?
Assaf.


View this message in context: separate spark and 
hive<http://apache-spark-developers-list.1001551.n3.nabble.com/separate-spark-and-hive-tp19879.html>
Sent from the Apache Spark Developers List mailing list 
archive<http://apache-spark-developers-list.1001551.n3.nabble.com/> at 
Nabble.com.




If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/separate-spark-and-hive-tp19879p19882.html
To start a new topic under Apache Spark Developers List, email 
ml-node+s1001551n1...@n3.nabble.com<mailto:ml-node+s1001551n1...@n3.nabble.com>
To unsubscribe from Apache Spark Developers List, click 
here<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==>.
NAML<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/separate-spark-and-hive-tp19879p19883.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

separate spark and hive

2016-11-14 Thread assaf.mendelson
Hi,
Today, we basically force people to use hive if they want to get the full use 
of spark SQL.
When doing the default installation this means that a derby.log and 
metastore_db directory are created where we run from.
The problem with this is that if we run multiple scripts from the same working 
directory we have a problem.
The solution we employ locally is to always run from different directory as we 
ignore hive in practice (this of course means we lose the ability to use some 
of the catalog options in spark session).
The only other solution is to create a full blown hive installation with proper 
configuration (probably for a JDBC solution).

I would propose that in most cases there shouldn't be any hive use at all. Even 
for catalog elements such as saving a permanent table, we should be able to 
configure a target directory and simply write to it (doing everything file 
based to avoid the need for locking). Hive should be reserved for those who 
actually use it (probably for backward compatibility).

Am I missing something here?
Assaf.




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/separate-spark-and-hive-tp19879.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

RE: statistics collection and propagation for cost-based optimizer

2016-11-14 Thread assaf.mendelson
I am not sure I understand when the statistics would be calculated. Would they 
always be calculated or just when analyze is called?
Would it be possible to save analysis results as part of dataframe saving (e.g. 
when writing it to parquet) or do we have to have a consistent hive 
installation?
Would it be possible to provide the hints manually? For example for streaming 
if I know the data in the beginning is not a representative of the entire 
stream?

From: rxin [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n19873...@n3.nabble.com]
Sent: Tuesday, November 15, 2016 8:48 AM
To: Mendelson, Assaf
Subject: Re: statistics collection and propagation for cost-based optimizer

They are not yet complete. The benchmark was done with an implementation of 
cost-based optimizer Huawei had internally for Spark 1.5 (or some even older 
version).

On Mon, Nov 14, 2016 at 10:46 PM, Yogesh Mahajan <[hidden 
email]> wrote:
It looks like Huawei team have run TPC-H benchmark and some real-world test 
cases and their results show good performance gain in 2X-5X speedup depending 
on data volume.
Can we share the numbers and query wise rational behind the gain? Are there 
anything done on spark master yet? Or the implementation is not yet completed?

Thanks,
Yogesh Mahajan
http://www.snappydata.io/blog

On Tue, Nov 15, 2016 at 12:03 PM, Yogesh Mahajan <[hidden 
email]> wrote:

Thanks Reynold for the detailed proposals. A few questions/clarifications -

1) How the existing rule based operator co-exist with CBO? The existing rules 
are heuristics/empirical based, i am assuming rules like predicate pushdown or 
project pruning will co-exist with CBO and we just want to accurately estimate 
the filter factor and cardinality to make it more accurate? With predicate 
pushdown, a filter is mostly executed at an early stage of a query plan and the 
cardinality estimate of a predicate can improve the precision of cardinality 
estimates.

2. Will the query transformations be now based on the cost calculation? If yes, 
then what happens when the cost of execution of the transformed statement is 
higher than the cost of untransformed query?

3. Is there any upper limit on space used for storing the frequency histogram? 
255? And in case of more distinct values, we can even consider height balanced 
histogram in Oracle.

4. The first three proposals are new and not mentioned in the CBO design spec. 
CMS is good but it's less accurate compared the traditional histograms. This is 
a  major trade-off  we need to consider.

5. Are we going to consider system statistics- such as speed of CPU or disk 
access as a cost function? How about considering shuffle cost, output 
partitioning etc?

6. Like the current rule based optimizer, will this CBO also be an 'extensible 
optimizer'? If yes, what functionality users can extend?

7. Why this CBO will be disabled by default? “spark.sql.cbo" is false by 
default as it's just experimental ?

8. ANALYZE TABLE, analyzeColumns etc ... all look good.

9. From the release point of view, how this is planned ? Will all this be 
implemented in one go or in phases?

Thanks,
Yogesh Mahajan
http://www.snappydata.io/blog

On Mon, Nov 14, 2016 at 11:25 PM, Reynold Xin <[hidden 
email]> wrote:
Historically tpcds and tpch. There is certainly a chance of overfitting one or 
two benchmarks. Note that those will probably be impacted more by the way we 
set the parameters for CBO rather than using x or y for summary statistics.


On Monday, November 14, 2016, Shivaram Venkataraman <[hidden 
email]> wrote:
Do we have any query workloads for which we can benchmark these
proposals in terms of performance ?

Thanks
Shivaram

On Sun, Nov 13, 2016 at 5:53 PM, Reynold Xin <[hidden 
email]> wrote:
> One additional note: in terms of size, the size of a count-min sketch with
> eps = 0.1% and confidence 0.87, uncompressed, is 48k bytes.
>
> To look up what that means, see
> http://spark.apache.org/docs/latest/api/java/org/apache/spark/util/sketch/CountMinSketch.html
>
>
>
>
>
> On Sun, Nov 13, 2016 at 5:30 PM, Reynold Xin <[hidden 
> email]> wrote:
>>
>> I want to bring this discussion to the dev list to gather broader
>> feedback, as there have been some discussions that happened over multiple
>> JIRA tickets (SPARK-16026, etc) and GitHub pull requests about what
>> statistics to collect and how to use them.
>>
>> There are some basic statistics on columns that are obvious to use and we
>> don't need to debate these: estimated size (in bytes), row count, min, max,
>> number of nulls, number of distinct values, average column length, max
>> column length.
>>
>> In addition, we want to be able to estimate selectivity for equality and
>> range predicates better, especially taking into account skewed values and
>> outliers.
>>
>> Before I dive into the different options, let me first explain count-min
>> sketch: Count-min sketch is a common sketch algorithm that tracks frequency

RE: [ANNOUNCE] Apache Spark 2.0.2

2016-11-14 Thread assaf.mendelson
While you can download spark 2.0.2, the description is still spark 2.0.1:
Our latest stable version is Apache Spark 2.0.1, released on Oct 3, 2016 
(release notes) (git 
tag)


From: rxin [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n19870...@n3.nabble.com]
Sent: Tuesday, November 15, 2016 7:15 AM
To: Mendelson, Assaf
Subject: [ANNOUNCE] Apache Spark 2.0.2

We are happy to announce the availability of Spark 2.0.2!

Apache Spark 2.0.2 is a maintenance release containing 90 bug fixes along with 
Kafka 0.10 support and runtime metrics for Structured Streaming. This release 
is based on the branch-2.0 maintenance branch of Spark. We strongly recommend 
all 2.0.x users to upgrade to this stable release.

To download Apache Spark 2.0.12 visit http://spark.apache.org/downloads.html

We would like to acknowledge all community members for contributing patches to 
this release.




If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/ANNOUNCE-Apache-Spark-2-0-2-tp19870.html
To start a new topic under Apache Spark Developers List, email 
ml-node+s1001551n1...@n3.nabble.com
To unsubscribe from Apache Spark Developers List, click 
here.
NAML




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/RE-ANNOUNCE-Apache-Spark-2-0-2-tp19874.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

RE: how does isDistinct work on expressions

2016-11-13 Thread assaf.mendelson
Thanks for the pointer. It makes more sense now.
Assaf.

From: Herman van Hövell tot Westerflier-2 [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n19842...@n3.nabble.com]
Sent: Sunday, November 13, 2016 10:03 PM
To: Mendelson, Assaf
Subject: Re: how does isDistinct work on expressions

Hi,

You should take a look at 
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala

Spark SQL does not directly support the aggregation of multiple distinct 
groups. For example select count(distinct a), count(distinct b) from tbl_x 
containts distinct groups a  & b. The RewriteDistinctAggregates rewrites this 
into an two aggregates, the first aggregate takes care of deduplication and the 
second aggregate does the actual aggregation.

HTH

On Sun, Nov 13, 2016 at 11:46 AM, Jacek Laskowski <[hidden 
email]> wrote:
Hi,

I might not have been there yet, but since I'm with the code every day
I might be close...

When you say "aggregate functions", are you about typed or untyped
ones? Just today I reviewed the typed ones and honestly took me some
time to figure out what belongs to where. Are you creating a new UDAF?
What have you done already? GitHub perhaps?

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Sun, Nov 13, 2016 at 12:03 PM, assaf.mendelson
<[hidden email]> wrote:
> Hi,
>
> I am trying to understand how aggregate functions are implemented
> internally.
>
> I see that the expression is wrapped using toAggregateExpression using
> isDistinct.
>
> I can’t figure out where the code that makes the data distinct is located. I
> am trying to figure out how the input data is converted into a distinct
> version.
>
> Thanks,
>
> Assaf.
>
>
> 
> View this message in context: how does isDistinct work on expressions
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
-
To unsubscribe e-mail: [hidden 
email]



If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/how-does-isDistinct-work-on-expressions-tp19836p19842.html
To start a new topic under Apache Spark Developers List, email 
ml-node+s1001551n1...@n3.nabble.com<mailto:ml-node+s1001551n1...@n3.nabble.com>
To unsubscribe from Apache Spark Developers List, click 
here<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==>.
NAML<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/how-does-isDistinct-work-on-expressions-tp19836p19847.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Converting spark types and standard scala types

2016-11-13 Thread assaf.mendelson
Hi,
I am trying to write a new aggregate function 
(https://issues.apache.org/jira/browse/SPARK-17691) and I wanted it to support 
all ordered types.
I have several  issues though:

1.   How to convert the type of the child expression to a Scala standard 
type (e.g. I need an Array[Int] for IntegerType and an Array[Double] for 
DoubleType). The only method I found so far is to do a match for each of the 
types. Is there a better way?

2.   What would be the corresponding scala type for DecimalType, 
TimestampType, DateType and BinaryType? I also couldn't figure out how to do a 
case for DecimalType. Do I need to do a specific case for each of its internal 
types?

3.   Should BinaryType be a legal type for such a function?

4.   I need to serialize the relevant array of type (i.e. turn it into an 
Array[Byte] for working with TypedImperativeAggregate). Currently, I use 
java.io.{ByteArrayOutputStream, ByteArrayInputStream, ObjectInputStream, 
ObjectOutputStream}. Is there another way which is more standard (e.g. get a 
"Serialize" function which knows what to use:  java serialization, kyro 
serialization etc. based on spark configuration?)
Thanks,
Assaf.





--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Converting-spark-types-and-standard-scala-types-tp19837.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

how does isDistinct work on expressions

2016-11-13 Thread assaf.mendelson
Hi,
I am trying to understand how aggregate functions are implemented internally.
I see that the expression is wrapped using toAggregateExpression using 
isDistinct.
I can't figure out where the code that makes the data distinct is located. I am 
trying to figure out how the input data is converted into a distinct version.
Thanks,
Assaf.




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/how-does-isDistinct-work-on-expressions-tp19836.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

RE: Handling questions in the mailing lists

2016-11-09 Thread assaf.mendelson
I was just wondering, before we move on to SO.
Do we have enough contributors with enough reputation do manage things in SO?
We would need contributors with enough reputation to have relevant privilages.
For example: creating tags (requires 1500 reputation), edit questions and 
answers (2000), create tag synonums (2500), approve tag wiki edits (5000), 
access to moderator tools (1, this is required to delete questions etc.), 
protect questions (15000).
All of these are important if we plan to have SO as a main resource.
I know I originally suggested SO, however, if we do not have contributors with 
the required privileges and the willingness to help manage everything then I am 
not sure this is a good fit.
Assaf.

From: Denny Lee [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n19799...@n3.nabble.com]
Sent: Wednesday, November 09, 2016 9:54 AM
To: Mendelson, Assaf
Subject: Re: Handling questions in the mailing lists

Agreed that by simply just moving the questions to SO will not solve anything 
but I think the call out about the meta-tags is that we need to abide by SO 
rules and if we were to just jump in and start creating meta-tags, we would be 
violating at minimum the spirit and at maximum the actual conventions around SO.

Saying this, perhaps we could suggest tags that we place in the header of the 
question whether it be SO or the mailing lists that will help us sort through 
all of these questions faster just as you suggested.  The Proposed Community 
Mailing Lists / StackOverflow 
Changes<https://docs.google.com/document/d/1N0pKatcM15cqBPqFWCqIy6jdgNzIoacZlYDCjufBh2s/edit#heading=h.xshc1bv4sn3p>
 has been updated to include suggested tags.  WDYT?

On Tue, Nov 8, 2016 at 11:02 PM assaf.mendelson <[hidden 
email]> wrote:
I like the document and I think it is good but I still feel like we are missing 
an important part here.

Look at SO today. There are:

-   4658 unanswered questions under apache-spark tag.

-  394 unanswered questions under spark-dataframe tag.

-  639 unanswered questions under apache-spark-sql

-  859 unanswered questions under pyspark

Just moving people to ask there will not help. The whole issue is having people 
answer the questions.

The problem is that many of these questions do not fit SO (but are already 
there so they are noise), are bad (i.e. unclear or hard to answer), orphaned 
etc. while some are simply harder than what people with some experience in 
spark can handle and require more expertise.
The problem is that people with the relevant expertise are drowning in noise. 
This. Is true for the mailing list and this is true for SO.

For this reason I believe that just moving people to SO will not solve anything.

My original thought was that if we had different tags then different people 
could watch open questions on these tags and therefore have a much lower noise. 
I thought that we would have a low tier (current one) of people just not 
following the documentation (which would remain as noise), then a beginner tier 
where we could have people downvoting bad questions but in most cases the 
community can answer the questions because they are common, then a “medium” 
tier which would mean harder questions but that can still be answered by 
advanced users and lastly an “advanced” tier to which committers can actually 
subscribed to (and adding sub tags for subsystems would improve this even more).

I was not aware of SO policy for meta tags (the burnination link is about 
removing tags completely so I am not sure how it applies, I believe this link 
https://stackoverflow.blog/2010/08/the-death-of-meta-tags/ is more relevant).
There was actually a discussion along the lines in SO 
(http://meta.stackoverflow.com/questions/253338/filtering-questions-by-difficulty-level).

The fact that SO did not solve this issue, does not mean we shouldn’t either.

The way I see it, some tags can easily be used even with the meta tags 
limitation. For example, using spark-internal-development tag can be used to 
ask questions for development of spark. There are already tags for some spark 
subsystems (there is a apachae-spark-sql tag, a pyspark tag, a spark-streaming 
tag etc.). The main issue I see and the one we can’t seem to get around is 
dividing between simple questions that the community should answer and hard 
questions which only advanced users can answer.

Maybe SO isn’t the correct platform for that but even within it we can try to 
find a non meta name for spark beginner questions vs. spark advanced questions.
Assaf.


From: Denny Lee [via Apache Spark Developers List] [mailto:[hidden 
email][hidden 
email]<http://user/SendEmail.jtp?type=node&node=19798&i=0>]
Sent: Tuesday, November 08, 2016 7:53 AM
To: Mendelson, Assaf

Subject: Re: Handling questions in the mailing lists

To help track and get the verbiage for the Spark community page and welcome 
email jump started, here's a working docum

RE: Handling questions in the mailing lists

2016-11-08 Thread assaf.mendelson
and tools (much 
easier). Isn't it?

On 2 November 2016 at 16:36, Cody Koeninger <[hidden 
email]> wrote:
So concrete things people could do

- users could tag subject lines appropriately to the component they're
asking about

- contributors could monitor user@ for tags relating to components
they've worked on.
I'd be surprised if my miss rate for any mailing list questions
well-labeled as Kafka was higher than 5%

- committers could be more aggressive about soliciting and merging PRs
to improve documentation.
It's a lot easier to answer even poorly-asked questions with a link to
relevant docs.

On Wed, Nov 2, 2016 at 7:39 AM, Sean Owen <[hidden 
email]> wrote:
> There's already reviews@ and issues@. dev@ is for project development itself
> and I think is OK. You're suggesting splitting up user@ and I sympathize
> with the motivation. Experience tells me that we'll have a beginner@ that's
> then totally ignored, and people will quickly learn to post to advanced@ to
> get attention, and we'll be back where we started. Putting it in JIRA
> doesn't help. I don't think this a problem that is merely down to lack of
> process. It actually requires cultivating a culture change on the community
> list.
>
> On Wed, Nov 2, 2016 at 12:11 PM Mendelson, Assaf <[hidden 
> email]>
> wrote:
>>
>> What I am suggesting is basically to fix that.
>>
>> For example, we might say that mailing list A is only for voting, mailing
>> list B is only for PR and have something like stack overflow for developer
>> questions (I would even go as far as to have beginner, intermediate and
>> advanced mailing list for users and beginner/advanced for dev).
>>
>>
>>
>> This can easily be done using stack overflow tags, however, that would
>> probably be harder to manage.
>>
>> Maybe using special jira tags and manage it in jira?
>>
>>
>>
>> Anyway as I said, the main issue is not user questions (except maybe
>> advanced ones) but more for dev questions. It is so easy to get lost in the
>> chatter that it makes it very hard for people to learn spark internals…
>>
>> Assaf.
>>
>>
>>
>> From: Sean Owen [mailto:[hidden 
>> email]]
>> Sent: Wednesday, November 02, 2016 2:07 PM
>> To: Mendelson, Assaf; [hidden 
>> email]
>> Subject: Re: Handling questions in the mailing lists
>>
>>
>>
>> I think that unfortunately mailing lists don't scale well. This one has
>> thousands of subscribers with different interests and levels of experience.
>> For any given person, most messages will be irrelevant. I also find that a
>> lot of questions on user@ are not well-asked, aren't an SSCCE
>> (http://sscce.org/), not something most people are going to bother replying
>> to even if they could answer. I almost entirely ignore user@ because there
>> are higher-priority channels like PRs to deal with, that already have
>> hundreds of messages per day. This is why little of it gets an answer -- too
>> noisy.
>>
>>
>>
>> We have to have official mailing lists, in any event, to have some
>> official channel for things like votes and announcements. It's not wrong to
>> ask questions on user@ of course, but a lot of the questions I see could
>> have been answered with research of existing docs or looking at the code. I
>> think that given the scale of the list, it's not wrong to assert that this
>> is sort of a prerequisite for asking thousands of people to answer one's
>> question. But we can't enforce that.
>>
>>
>>
>> The situation will get better to the extent people ask better questions,
>> help other people ask better questions, and answer good questions. I'd
>> encourage anyone feeling this way to try to help along those dimensions.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Nov 2, 2016 at 11:32 AM assaf.mendelson <[hidden 
>> email]>
>> wrote:
>>
>> Hi,
>>
>> I know this is a little off topic but I wanted to raise an issue about
>> handling questions in the mailing list (this is true both for the user
>> mailing list and the dev but since there are other options such as stack
>> overflow for user questions, this is more problematic in dev).
>>
>> Let’s say I ask a question (as I recently did). Unfortunately this was
>> during spark summit in Europe so probably people were busy. In any case no
>> one answered.
>>
>> The problem is, that if no one answers very soon, the question will almost

RE: Handling questions in the mailing lists

2016-11-06 Thread assaf.mendelson
e can’t 
change our official communication tools due to those very tools…)

Nick
​

On Wed, Nov 2, 2016 at 12:24 PM Ricardo Almeida <[hidden 
email]> wrote:
I fell Assaf point is quite relevant if we want to move this project forward 
from the Spark user perspective (as I do). In fact, we're still using 20th 
century tools (mailing lists) with some add-ons (like Stack Overflow).

As usually, Sean and Cody's contributions are very to the point.
I fell it is indeed a matter of of culture (hard to enforce) and tools (much 
easier). Isn't it?

On 2 November 2016 at 16:36, Cody Koeninger <[hidden 
email]> wrote:
So concrete things people could do

- users could tag subject lines appropriately to the component they're
asking about

- contributors could monitor user@ for tags relating to components
they've worked on.
I'd be surprised if my miss rate for any mailing list questions
well-labeled as Kafka was higher than 5%

- committers could be more aggressive about soliciting and merging PRs
to improve documentation.
It's a lot easier to answer even poorly-asked questions with a link to
relevant docs.

On Wed, Nov 2, 2016 at 7:39 AM, Sean Owen <[hidden 
email]> wrote:
> There's already reviews@ and issues@. dev@ is for project development itself
> and I think is OK. You're suggesting splitting up user@ and I sympathize
> with the motivation. Experience tells me that we'll have a beginner@ that's
> then totally ignored, and people will quickly learn to post to advanced@ to
> get attention, and we'll be back where we started. Putting it in JIRA
> doesn't help. I don't think this a problem that is merely down to lack of
> process. It actually requires cultivating a culture change on the community
> list.
>
> On Wed, Nov 2, 2016 at 12:11 PM Mendelson, Assaf <[hidden 
> email]>
> wrote:
>>
>> What I am suggesting is basically to fix that.
>>
>> For example, we might say that mailing list A is only for voting, mailing
>> list B is only for PR and have something like stack overflow for developer
>> questions (I would even go as far as to have beginner, intermediate and
>> advanced mailing list for users and beginner/advanced for dev).
>>
>>
>>
>> This can easily be done using stack overflow tags, however, that would
>> probably be harder to manage.
>>
>> Maybe using special jira tags and manage it in jira?
>>
>>
>>
>> Anyway as I said, the main issue is not user questions (except maybe
>> advanced ones) but more for dev questions. It is so easy to get lost in the
>> chatter that it makes it very hard for people to learn spark internals…
>>
>> Assaf.
>>
>>
>>
>> From: Sean Owen [mailto:[hidden 
>> email]]
>> Sent: Wednesday, November 02, 2016 2:07 PM
>> To: Mendelson, Assaf; [hidden 
>> email]
>> Subject: Re: Handling questions in the mailing lists
>>
>>
>>
>> I think that unfortunately mailing lists don't scale well. This one has
>> thousands of subscribers with different interests and levels of experience.
>> For any given person, most messages will be irrelevant. I also find that a
>> lot of questions on user@ are not well-asked, aren't an SSCCE
>> (http://sscce.org/), not something most people are going to bother replying
>> to even if they could answer. I almost entirely ignore user@ because there
>> are higher-priority channels like PRs to deal with, that already have
>> hundreds of messages per day. This is why little of it gets an answer -- too
>> noisy.
>>
>>
>>
>> We have to have official mailing lists, in any event, to have some
>> official channel for things like votes and announcements. It's not wrong to
>> ask questions on user@ of course, but a lot of the questions I see could
>> have been answered with research of existing docs or looking at the code. I
>> think that given the scale of the list, it's not wrong to assert that this
>> is sort of a prerequisite for asking thousands of people to answer one's
>> question. But we can't enforce that.
>>
>>
>>
>> The situation will get better to the extent people ask better questions,
>> help other people ask better questions, and answer good questions. I'd
>> encourage anyone feeling this way to try to help along those dimensions.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Nov 2, 2016 at 11:32 AM assaf.mendelson <[hidden 
>> email]>
>> wrote:
>>
>> Hi,
>>
>> I know this is a little off topic but I wanted to raise an issue about
>> handl

Handling questions in the mailing lists

2016-11-02 Thread assaf.mendelson
Hi,
I know this is a little off topic but I wanted to raise an issue about handling 
questions in the mailing list (this is true both for the user mailing list and 
the dev but since there are other options such as stack overflow for user 
questions, this is more problematic in dev).
Let's say I ask a question (as I recently did). Unfortunately this was during 
spark summit in Europe so probably people were busy. In any case no one 
answered.
The problem is, that if no one answers very soon, the question will almost 
certainly remain unanswered because new messages will simply drown it.

This is a common issue not just for questions but for any comment or idea which 
is not immediately picked up.

I believe we should have a method of handling this.
Generally, I would say these types of things belong in stack overflow, after 
all, the way it is built is perfect for this. More seasoned spark contributors 
and committers can periodically check out unanswered questions and answer them.
The problem is that stack overflow (as well as other targets such as the 
databricks forums) tend to have a more user based orientation. This means that 
any spark internal question will almost certainly remain unanswered.

I was wondering if we could come up with a solution for this.

Assaf.





--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Handling-questions-in-the-mailing-lists-tp19690.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

RE: Watermarking in Structured Streaming to drop late data

2016-10-27 Thread assaf.mendelson
Thanks.
This article is excellent. It completely explains everything.
I would add it as a reference to any and all explanations of structured 
streaming (and in the case of watermarking, I simply didn’t understand the 
definition before reading this).

Thanks,
Assaf.


From: kostas papageorgopoylos [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n19592...@n3.nabble.com]
Sent: Thursday, October 27, 2016 10:17 AM
To: Mendelson, Assaf
Subject: Re: Watermarking in Structured Streaming to drop late data

Hi all

I would highly recommend to all users-devs interested in the design suggestions 
/ discussions for Structured Streaming Spark API watermarking
to take a look on the following links along with the design document. It would 
help to understand the notions of watermark , out of order data and possible 
use cases.

https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102

Kind Regards


2016-10-27 9:46 GMT+03:00 assaf.mendelson <[hidden 
email]>:
Hi,
Should comments come here or in the JIRA?
Any, I am a little confused on the need to expose this as an API to begin with.
Let’s consider for a second the most basic behavior: We have some input stream 
and we want to aggregate a sum over a time window.
This means that the window we should be looking at would be the maximum time 
across our data and back by the window interval. Everything older can be 
dropped.
When new data arrives, the maximum time cannot move back so we generally drop 
everything tool old.
This basically means we save only the latest time window.
This simpler model would only break if we have a secondary aggregation which 
needs the results of multiple windows.
Is this the use case we are trying to solve?
If so, wouldn’t just calculating the bigger time window across the entire 
aggregation solve this?
Am I missing something here?

From: Michael Armbrust [via Apache Spark Developers List] [mailto:[hidden 
email][hidden 
email]<http://user/SendEmail.jtp?type=node&node=19591&i=0>]
Sent: Thursday, October 27, 2016 3:04 AM
To: Mendelson, Assaf
Subject: Re: Watermarking in Structured Streaming to drop late data

And the JIRA: https://issues.apache.org/jira/browse/SPARK-18124

On Wed, Oct 26, 2016 at 4:56 PM, Tathagata Das <[hidden 
email]<http://user/SendEmail.jtp?type=node&node=19590&i=0>> wrote:
Hey all,

We are planning implement watermarking in Structured Streaming that would allow 
us handle late, out-of-order data better. Specially, when we are aggregating 
over windows on event-time, we currently can end up keeping unbounded amount 
data as state. We want to define watermarks on the event time in order mark and 
drop data that are "too late" and accordingly age out old aggregates that will 
not be updated any more.

To enable the user to specify details like lateness threshold, we are 
considering adding a new method to Dataset. We would like to get more feedback 
on this API. Here is the design doc

https://docs.google.com/document/d/1z-Pazs5v4rA31azvmYhu4I5xwqaNQl6ZLIS03xhkfCQ/

Please comment on the design and proposed APIs.

Thank you very much!

TD



If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/Watermarking-in-Structured-Streaming-to-drop-late-data-tp19589p19590.html
To start a new topic under Apache Spark Developers List, email [hidden 
email]<http://user/SendEmail.jtp?type=node&node=19591&i=1>
To unsubscribe from Apache Spark Developers List, click here.
NAML<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>


View this message in context: RE: Watermarking in Structured Streaming to drop 
late 
data<http://apache-spark-developers-list.1001551.n3.nabble.com/Watermarking-in-Structured-Streaming-to-drop-late-data-tp19589p19591.html>
Sent from the Apache Spark Developers List mailing list 
archive<http://apache-spark-developers-list.1001551.n3.nabble.com/> at 
Nabble.com.



If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/Watermarking-in-Structured-Streaming-to-drop-late-data-tp19589p19592.html
To start a new topic under Apache Spark Developers List, email 
ml-node+s1001551n1...@n3.nabble.com<mailto:ml-node+s1001551n1...@n3.nabble.com>
To unsubscribe from Apache Spark Developers List, click 
here<http://apache-spa

RE: Watermarking in Structured Streaming to drop late data

2016-10-26 Thread assaf.mendelson
Hi,
Should comments come here or in the JIRA?
Any, I am a little confused on the need to expose this as an API to begin with.
Let’s consider for a second the most basic behavior: We have some input stream 
and we want to aggregate a sum over a time window.
This means that the window we should be looking at would be the maximum time 
across our data and back by the window interval. Everything older can be 
dropped.
When new data arrives, the maximum time cannot move back so we generally drop 
everything tool old.
This basically means we save only the latest time window.
This simpler model would only break if we have a secondary aggregation which 
needs the results of multiple windows.
Is this the use case we are trying to solve?
If so, wouldn’t just calculating the bigger time window across the entire 
aggregation solve this?
Am I missing something here?

From: Michael Armbrust [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n19590...@n3.nabble.com]
Sent: Thursday, October 27, 2016 3:04 AM
To: Mendelson, Assaf
Subject: Re: Watermarking in Structured Streaming to drop late data

And the JIRA: https://issues.apache.org/jira/browse/SPARK-18124

On Wed, Oct 26, 2016 at 4:56 PM, Tathagata Das <[hidden 
email]> wrote:
Hey all,

We are planning implement watermarking in Structured Streaming that would allow 
us handle late, out-of-order data better. Specially, when we are aggregating 
over windows on event-time, we currently can end up keeping unbounded amount 
data as state. We want to define watermarks on the event time in order mark and 
drop data that are "too late" and accordingly age out old aggregates that will 
not be updated any more.

To enable the user to specify details like lateness threshold, we are 
considering adding a new method to Dataset. We would like to get more feedback 
on this API. Here is the design doc

https://docs.google.com/document/d/1z-Pazs5v4rA31azvmYhu4I5xwqaNQl6ZLIS03xhkfCQ/

Please comment on the design and proposed APIs.

Thank you very much!

TD



If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/Watermarking-in-Structured-Streaming-to-drop-late-data-tp19589p19590.html
To start a new topic under Apache Spark Developers List, email 
ml-node+s1001551n1...@n3.nabble.com
To unsubscribe from Apache Spark Developers List, click 
here.
NAML




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Watermarking-in-Structured-Streaming-to-drop-late-data-tp19589p19591.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Using SPARK_WORKER_INSTANCES and SPARK-15781

2016-10-26 Thread assaf.mendelson
As of applying SPARK-15781 the documentation of SPARK_WORKER_INSTANCES have 
been removed. This was due to a warning in spark-submit which suggested:
WARN SparkConf:
SPARK_WORKER_INSTANCES was detected (set to '4').
This is deprecated in Spark 1.0+.

Please instead use:
- ./spark-submit with --num-executors to specify the number of executors
- Or set SPARK_EXECUTOR_INSTANCES
- spark.executor.instances to configure the number of instances in the spark 
config.



The problem is that there is no replacement method to launch spark standalone 
with multiple workers per node. The options -num-executors and 
SPARK_EXECUTOR_INSTANCES configure the job rather than the resource manager 
behavior.

If I look at the spark standalone scripts, the only way to set multiple workers 
per node is the use of SPARK_WORKER_INSTANCES. The fixed in SPARK-15781 fixed 
the documentation without solving the problem.
A possible simple solution would be to add a SPARK_STANDALONE_WORKERS variable 
and add it to the start-slave.sh script and update the documentation 
accordingly.

Am I missing something here? Should I open a new JIRA issue?
Thanks,
Assaf




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Using-SPARK-WORKER-INSTANCES-and-SPARK-15781-tp19571.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Converting spark types and standard scala types

2016-10-25 Thread assaf.mendelson
Hi,
I am trying to write a new aggregate function 
(https://issues.apache.org/jira/browse/SPARK-17691) and I wanted it to support 
all ordered types.
I have several  issues though:

1.   How to convert the type of the child expression to a Scala standard 
type (e.g. I need an Array[Int] for IntegerType and an Array[Double] for 
DoubleType). The only method I found so far is to do a match for each of the 
types. Is there a better way?

2.   What would be the corresponding scala type for DecimalType, 
TimestampType, DateType and BinaryType?

3.   Should BinaryType be a legal type for such a function?

4.   I need to serialize the relevant array of type (i.e. turn it into an 
Array[Byte] for working with TypedImperativeAggregate). I can do a match for 
standard types such as Int and Double but I do not know of a generic way to do 
it.
Thanks,
Assaf.




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Converting-spark-types-and-standard-scala-types-tp19552.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

RE: StructuredStreaming status

2016-10-20 Thread assaf.mendelson
My thoughts were of handling just the “current” state of the sliding window 
(i.e. the “last” window). The idea is that at least in cases which I 
encountered, the sliding window is used to “forget” irrelevant information and 
therefore when a step goes out of  date for the “current” window it becomes 
irrelevant.
I agree that this use case is just an example and will also have issues if 
there is a combination of windows. My main issue was that if we need to have a 
relatively large buffer (such as full distinct count) then the memory overhead 
of this can be very high.

As for the example of the map you gave, If I understand correctly how this 
would occur behind the scenes, this just provides the map but the memory cost 
of having multiple versions of the data remain. As I said, my issue is with the 
high memory overhead.

Consider a simple example: I do a sliding window of 1 day with a 1 minute step. 
There are 1440 minutes per day which means the groupby has a cost of 
multiplying all aggregations by 1440. For something such as a count or sum, 
this might not be a big issue but if we have an array of say 100 elements then 
this can quickly become very costly.

As I said, it is just an idea for optimization for specific use cases.


From: Michael Armbrust [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n1952...@n3.nabble.com]
Sent: Thursday, October 20, 2016 11:16 AM
To: Mendelson, Assaf
Subject: Re: StructuredStreaming status

let’s say we would have implemented distinct count by saving a map with the key 
being the distinct value and the value being the last time we saw this value. 
This would mean that we wouldn’t really need to save all the steps in the 
middle and copy the data, we could only save the last portion.

I don't think you can calculate count distinct in each event time window 
correctly using this map if there is late data, which is one of the key 
problems we are trying to solve with this API.  If you are only tracking the 
last time you saw this value, how do you know if a late data item was already 
accounted for in any given window that is earlier than this "last time"?

We would currently need to track the items seen in each window (though much 
less space is required for approx count distinct).  However, the state eviction 
I mentioned above should also let you give us a boundary on how late data can 
be, and thus how many windows we need retain state for.  You should also be 
able to group by processing time instead of event time if you want something 
closer to the semantics of DStreams.

Finally, you can already construct the map you describe using structured 
streaming and use its result to output statistics at each trigger window:

df.groupBy($"value")
  .select(max($"eventTime") as 'lastSeen)
  .writeStream
  .outputMode("complete")
  .trigger(ProcessingTime("5 minutes"))
  .foreach(  )


If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/StructuredStreaming-status-tp19490p19520.html
To start a new topic under Apache Spark Developers List, email 
ml-node+s1001551n1...@n3.nabble.com
To unsubscribe from Apache Spark Developers List, click 
here.
NAML




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/StructuredStreaming-status-tp19490p19521.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

RE: StructuredStreaming status

2016-10-19 Thread assaf.mendelson
There is one issue I was thinking of.
If I understand correctly, structured streaming basically groups by a bucket 
for time in sliding window (of the step). My problem is that in some cases 
(e.g. distinct count and any other case where the buffer is relatively large) 
this would mean copying the buffer for each step. The can have a very large 
memory overhead.
There are other solutions for this. For example, let's say we would have 
implemented distinct count by saving a map with the key being the distinct 
value and the value being the last time we saw this value. This would mean that 
we wouldn't really need to save all the steps in the middle and copy the data, 
we could only save the last portion.
This is just an idea for optimization though, certainly nothing of high 
priority.


From: Matei Zaharia [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n19513...@n3.nabble.com]
Sent: Thursday, October 20, 2016 3:42 AM
To: Mendelson, Assaf
Subject: Re: StructuredStreaming status

I'm also curious whether there are concerns other than latency with the way 
stuff executes in Structured Streaming (now that the time steps don't have to 
act as triggers), as well as what latency people want for various apps.

The stateful operator designs for streaming systems aren't inherently "better" 
than micro-batching -- they lose a lot of stuff that is possible in Spark, such 
as load balancing work dynamically across nodes, speculative execution for 
stragglers, scaling clusters up and down elastically, etc. Moreover, Spark 
itself could execute the current model with much lower latency. The question is 
just what combinations of latency, throughput, fault recovery, etc to target.

Matei

On Oct 19, 2016, at 2:18 PM, Amit Sela <[hidden 
email]> wrote:


On Thu, Oct 20, 2016 at 12:07 AM Shivaram Venkataraman <[hidden 
email]> wrote:
At the AMPLab we've been working on a research project that looks at
just the scheduling latencies and on techniques to get lower
scheduling latency. It moves away from the micro-batch model, but
reuses the fault tolerance etc. in Spark. However we haven't yet
figure out all the parts in integrating this with the rest of
structured streaming. I'll try to post a design doc / SIP about this
soon.

On a related note - are there other problems users face with
micro-batch other than latency ?
I think that the fact that they serve as an output trigger is a problem, but 
Structured Streaming seems to resolve this now.

Thanks
Shivaram

On Wed, Oct 19, 2016 at 1:29 PM, Michael Armbrust
<[hidden email]> wrote:
> I know people are seriously thinking about latency.  So far that has not
> been the limiting factor in the users I've been working with.
>
> On Wed, Oct 19, 2016 at 1:11 PM, Cody Koeninger <[hidden 
> email]> wrote:
>>
>> Is anyone seriously thinking about alternatives to microbatches?
>>
>> On Wed, Oct 19, 2016 at 2:45 PM, Michael Armbrust
>> <[hidden email]> wrote:
>> > Anything that is actively being designed should be in JIRA, and it seems
>> > like you found most of it.  In general, release windows can be found on
>> > the
>> > wiki.
>> >
>> > 2.1 has a lot of stability fixes as well as the kafka support you
>> > mentioned.
>> > It may also include some of the following.
>> >
>> > The items I'd like to start thinking about next are:
>> >  - Evicting state from the store based on event time watermarks
>> >  - Sessionization (grouping together related events by key / eventTime)
>> >  - Improvements to the query planner (remove some of the restrictions on
>> > what queries can be run).
>> >
>> > This is roughly in order based on what I've been hearing users hit the
>> > most.
>> > Would love more feedback on what is blocking real use cases.
>> >
>> > On Tue, Oct 18, 2016 at 1:51 AM, Ofir Manor <[hidden 
>> > email]>
>> > wrote:
>> >>
>> >> Hi,
>> >> I hope it is the right forum.
>> >> I am looking for some information of what to expect from
>> >> StructuredStreaming in its next releases to help me choose when / where
>> >> to
>> >> start using it more seriously (or where to invest in workarounds and
>> >> where
>> >> to wait). I couldn't find a good place where such planning discussed
>> >> for 2.1
>> >> (like, for example ML and SPARK-15581).
>> >> I'm aware of the 2.0 documented limits
>> >>
>> >> (http://spark.apache.org/docs/2.0.1/structured-streaming-programming-guide.html#unsupported-operations),
>> >> like no support for multiple aggregations levels, joins are strictly to
>> >> a
>> >> static dataset (no SCD or stream-stream) etc, limited sources / sinks
>> >> (like
>> >> no sink for interactive queries) etc etc
>> >> I'm also aware of some changes that have landed in master, like the new
>> >> Kafka 0.10 source (and its on-going improvements) in SPARK-15406, the
>> >> metrics in SPARK-17731, and some improvements for the file source.
>> >> If I remember correctly, the discussion on Spark release cadence
>> >> concluded
>> >> with a preference to a four-month cycles, with l

RE: Python Spark Improvements (forked from Spark Improvement Proposals)

2016-10-13 Thread assaf.mendelson
Hi,
We are actually using pyspark heavily.
I agree with all of your points,  for me I see the following as the main 
hurdles:

1.   Pyspark does not have support for UDAF. We have had multiple needs for 
UDAF and needed to go to java/scala to support these. Having python UDAF would 
have made life much easier (especially at earlier stages when we prototype).

2.   Performance. I cannot stress this enough. Currently we have engineers 
who take python UDFs and convert them to scala UDFs for performance. We are 
currently even looking at writing UDFs and UDAFs in a more native way (e.g. 
using expressions) to improve performance but working with pyspark can be 
really problematic.

BTW, other than using jython or arrow, I believe there are a couple of other 
ways to get improve performance:

1.   Python provides tool to generate AST for python code 
(https://docs.python.org/2/library/ast.html). This means we can use the AST to 
construct scala code very similar to how expressions are build for native spark 
functions in scala. Of course doing full conversion is very hard but at least 
handling simple cases should be simple.

2.   The above would of course be limited if we use python packages but 
over time it is possible to add some "translation" tools (i.e. take python 
packages and find the appropriate scala equivalent. We can even provide this to 
the user to supply their own conversions thereby looking as a regular python 
code but being converted to scala code behind the scenes).

3.   In scala, it is possible to use codegen to actually generate code from 
a string. There is no reason why we can't write the expression in python and 
provide a scala string. This would mean learning some scala but would mean we 
do not have to create a separate code tree.

BTW, the fact that all of the tools to access java are marked as private has me 
a little worried. Nearly all of our UDFs (and all of our UDAFs) are written in 
scala for performance. The wrapping to provide them in python uses way too many 
private elements for my taste.


From: msukmanowsky [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n19426...@n3.nabble.com]
Sent: Thursday, October 13, 2016 3:51 AM
To: Mendelson, Assaf
Subject: Re: Python Spark Improvements (forked from Spark Improvement Proposals)

As very heavy Spark users at Parse.ly, I just wanted to give a +1 to all of the 
issues raised by Holden and Ricardo. I'm also giving a talk at PyCon Canada on 
PySpark https://2016.pycon.ca/en/schedule/096-mike-sukmanowsky/.

Being a Python shop, we were extremely pleased to learn about PySpark a few 
years ago as our main ETL pipeline used Apache Pig at the time. I was one of 
the only folks who understood Pig and Java so collaborating on this as a team 
was difficult.

Spark provided a means for the entire team to collaborate, but we've hit our 
fair share of issues all of which are enumerated in this thread.

Besides giving a +1 here, I think if I were to force rank these items for us, 
it'd be:

1. Configuration difficulties: we've lost literally weeks to troubleshooting 
memory issues for larger jobs. It took a long time to even understand *why* 
certain jobs were failing since Spark would just report executors being lost. 
Finally we tracked things down to understanding that 
spark.yarn.executor.memoryOverhead controls the portion of memory reserved for 
Python processes, but none of this is documented anywhere as far as I can tell. 
We discovered this via trial and error. Both documentation and better defaults 
for this setting when running a PySpark application are probably sufficient. 
We've also had a number of troubles with saving Parquet output as part of an 
ETL flow, but perhaps we'll save that for a blog post of its own.

2. Dependency management: I've tried to help move the conversation on 
https://issues.apache.org/jira/browse/SPARK-13587 but it seems we're a bit 
stalled. Installing the required dependencies for a PySpark application is a 
really messy ordeal right now.

3. Development workflow: I'd combine both "incomprehensible error messages" and 
"
difficulty using PySpark from outside of spark-submit / pyspark shell" here. 
When teaching PySpark to new users, I'm reminded of how much inside knowledge 
is needed to overcome esoteric errors. As one example is hitting 
"PicklingError: Could not pickle object as excessively deep recursion 
required." errors. New users often do something innocent like try to pickle a 
global logging object and hit this and begin the Google -> stackoverflow search 
to try to comprehend what's going on. You can lose days to errors like these 
and they completely kill the productivity flow and send you hunting for 
alternatives.

4. Speed/performance: we are trying to use DataFrame/DataSets where we can and 
do as much in Java as possible but when we do move to Python, we're well aware 
that we're about to take a hit on performance. We're very keen to see what 
Apache Arro

RE: Official Stance on Not Using Spark Submit

2016-10-13 Thread assaf.mendelson
I actually not use spark submit for several use cases, all of them currently 
revolve around running it directly with python.
One of the most important ones is developing in pycharm.
Basically I have am using pycharm and configure it with a remote interpreter 
which runs on the server while my pycharm runs on my local windows machine.
In order for me to be able to effectively debug (stepping etc.), I want to 
define a run configuration in pycharm which would integrate fully with its 
debug tools. Unfortunately I couldn’t figure out a way to use spark-submit 
effectively. Instead I chose the following solution:
I defined the project to use the remorete interpreter running on the driver in 
the cluster.
I defined environment variables in the run configuration including setting 
PYTHONPATH to include pyspark and py4j manually, set up the relevant 
PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON, set up PYSPARK_SUBMIT_ARGS to include 
relevant configurations (e.g. relevant jars) and made sure it ended with 
pyspark-shell.

By providing this type of behavior I could debug spark remotely as if it was 
local.

Similar use cases include using standard tools that know how to run “python” 
script but are not aware of spark-submit.

I haven’t found similar reasons for scala/java code though (although I wish 
there was a similar “remote” setup for scala).
Assaf.


From: RussS [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n19384...@n3.nabble.com]
Sent: Monday, October 10, 2016 9:14 PM
To: Mendelson, Assaf
Subject: Re: Official Stance on Not Using Spark Submit

Just folks who don't want to use spark-submit, no real use-cases I've seen yet.

I didn't know about SparkLauncher myself and I don't think there are any 
official docs on that or launching spark as an embedded library for tests.

On Mon, Oct 10, 2016 at 11:09 AM Matei Zaharia <[hidden 
email]> wrote:
What are the main use cases you've seen for this? Maybe we can add a page to 
the docs about how to launch Spark as an embedded library.

Matei

On Oct 10, 2016, at 10:21 AM, Russell Spitzer <[hidden 
email]> wrote:

I actually had not seen SparkLauncher before, that looks pretty great :)

On Mon, Oct 10, 2016 at 10:17 AM Russell Spitzer <[hidden 
email]> wrote:
I'm definitely only talking about non-embedded uses here as I also use embedded 
Spark (cassandra, and kafka) to run tests. This is almost always safe since 
everything is in the same JVM. It's only once we get to launching against a 
real distributed env do we end up with issues.

Since Pyspark uses spark submit in the java gateway i'm not sure if that 
matters :)

The cases I see are usually usually going through main directly, adding jars 
programatically.

Usually ends up with classpath errors (Spark not on the CP, their jar not on 
the CP, dependencies not on the cp),
conf errors (executors have the incorrect environment, executor classpath 
broken, not understanding spark-defaults won't do anything),
Jar version mismatches
Etc ...

On Mon, Oct 10, 2016 at 10:05 AM Sean Owen <[hidden 
email]> wrote:
I have also 'embedded' a Spark driver without much trouble. It isn't that it 
can't work.

The Launcher API is ptobably the recommended way to do that though. 
spark-submit is the way to go for non programmatic access.

If you're not doing one of those things and it is not working, yeah I think 
people would tell you you're on your own. I think that's consistent with all 
the JIRA discussions I have seen over time.

On Mon, Oct 10, 2016, 17:33 Russell Spitzer <[hidden 
email]> wrote:
I've seen a variety of users attempting to work around using Spark Submit with 
at best middling levels of success. I think it would be helpful if the project 
had a clear statement that submitting an application without using Spark Submit 
is truly for experts only or is unsupported entirely.

I know this is a pretty strong stance and other people have had different 
experiences than me so please let me know what you think :)



If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/Official-Stance-on-Not-Using-Spark-Submit-tp19376p19384.html
To start a new topic under Apache Spark Developers List, email 
ml-node+s1001551n1...@n3.nabble.com
To unsubscribe from Apache Spark Developers List, click 
here.
NAML

RE: Spark Improvement Proposals

2016-10-09 Thread assaf.mendelson
I agree with most of what Cody said.
Two things:
First we can always have other people suggest SIPs but mark them as 
"unreviewed" and have committers basically move them forward. The problem is 
that writing a good document takes time. This way we can leverage non 
committers to do some of this work (it is just another way to contribute).

As for strategy, in many cases implementation strategy can affect the goals. I 
will give  a small example: In the current structured streaming strategy, we 
group by the time to achieve a sliding window. This is definitely an 
implementation decision and not a goal. However, I can think of several 
aggregation functions which have the time inside their calculation buffer. For 
example, let's say we want to return a set of all distinct values. One way to 
implement this would be to make the set into a map and have the value contain 
the last time seen. Multiplying it across the groupby would cost a lot in 
performance. So adding such a strategy would have a great effect on the type of 
aggregations and their performance which does affect the goal. Without adding 
the strategy, it is easy for whoever goes to the design document to not think 
about these cases. Furthermore, it might be decided that these cases are rare 
enough so that the strategy is still good enough but how would we know it 
without user feedback?
I believe this example is exactly what Cody was talking about. Since many times 
implementation strategies have a large effect on the goal, we should have it 
discussed when discussing the goals. In addition, while it is often easy to 
throw out completely infeasible goals, it is often much harder to figure out 
that the goals are unfeasible without fine tuning.


Assaf.

From: Cody Koeninger-2 [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n19359...@n3.nabble.com]
Sent: Monday, October 10, 2016 2:25 AM
To: Mendelson, Assaf
Subject: Re: Spark Improvement Proposals

Only committers should formally submit SIPs because in an apache
project only commiters have explicit political power.  If a user can't
find a commiter willing to sponsor an SIP idea, they have no way to
get the idea passed in any case.  If I can't find a committer to
sponsor this meta-SIP idea, I'm out of luck.

I do not believe unrealistic goals can be found solely by inspection.
We've managed to ignore unrealistic goals even after implementation!
Focusing on APIs can allow people to think they've solved something,
when there's really no way of implementing that API while meeting the
goals.  Rapid iteration is clearly the best way to address this, but
we've already talked about why that hasn't really worked.  If adding a
non-binding API section to the template is important to you, I'm not
against it, but I don't think it's sufficient.

On your PRD vs design doc spectrum, I'm saying this is closer to a
PRD.  Clear agreement on goals is the most important thing and that's
why it's the thing I want binding agreement on.  But I cannot agree to
goals unless I have enough minimal technical info to judge whether the
goals are likely to actually be accomplished.



On Sun, Oct 9, 2016 at 5:35 PM, Matei Zaharia <[hidden 
email]> wrote:

> Well, I think there are a few things here that don't make sense. First, why
> should only committers submit SIPs? Development in the project should be
> open to all contributors, whether they're committers or not. Second, I think
> unrealistic goals can be found just by inspecting the goals, and I'm not
> super worried that we'll accept a lot of SIPs that are then infeasible -- we
> can then submit new ones. But this depends on whether you want this process
> to be a "design doc lite", where people also agree on implementation
> strategy, or just a way to agree on goals. This is what I asked earlier
> about PRDs vs design docs (and I'm open to either one but I'd just like
> clarity). Finally, both as a user and designer of software, I always want to
> give feedback on APIs, so I'd really like a culture of having those early.
> People don't argue about prettiness when they discuss APIs, they argue about
> the core concepts to expose in order to meet various goals, and then they're
> stuck maintaining those for a long time.
>
> Matei
>
> On Oct 9, 2016, at 3:10 PM, Cody Koeninger <[hidden 
> email]> wrote:
>
> Users instead of people, sure.  Commiters and contributors are (or at least
> should be) a subset of users.
>
> Non goals, sure. I don't care what the name is, but we need to clearly say
> e.g. 'no we are not maintaining compatibility with XYZ right now'.
>
> API, what I care most about is whether it allows me to accomplish the goals.
> Arguing about how ugly or pretty it is can be saved for design/
> implementation imho.
>
> Strategy, this is necessary because otherwise goals can be out of line with
> reality.  Don't propose goals you don't have at least some idea of how to
> implement.
>
> Rejected strategies, given that commiters are the only ones I'm 

RE: Improving volunteer management / JIRAs (split from Spark Improvement Proposals thread)

2016-10-08 Thread assaf.mendelson
I don’t really have much experience with large open source projects but I have 
some experience with having lots of issues with no one handling them. 
Automation proved a good solution in my experience, but one thing that I found 
which was really important is giving people a chance to say “don’t close this 
please”.
Basically, because closing you can send an email to the reporter (and probably 
people who are watching the issue) and tell them this is going to be closed. 
Allow them an option to ping back saying “don’t close this please” which would 
ping committers for input (as if there were 5+ votes as described by Nick).
The main reason for this is that many times people fine solutions and the issue 
does become stale but at other times, the issue is still important, it is just 
that no one noticed it because of the noise of other issues.
Thanks,
Assaf.



From: Nicholas Chammas [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n19310...@n3.nabble.com]
Sent: Saturday, October 08, 2016 12:42 AM
To: Mendelson, Assaf
Subject: Re: Improving volunteer management / JIRAs (split from Spark 
Improvement Proposals thread)


I agree with Cody and others that we need some automation — or at least an 
adjusted process — to help us manage organic contributions better.

The objections about automated closing being potentially abrasive are 
understood, but I wouldn’t accept that as a defeat for automation. Instead, it 
seems like a constraint we should impose on any proposed solution: Make sure it 
doesn’t turn contributors off. Rolling as we have been won’t cut it, and I 
don’t think adding committers will ever be a sufficient solution to this 
particular problem.

To me, it seems like we need a way to filter out viable contributions with 
community support from other contributions when it comes to deciding that 
automated action is appropriate. Our current tooling isn’t perfect, but perhaps 
we can leverage it to create such a filter.

For example, consider the following strawman proposal for how to cut down on 
the number of pending but unviable proposals, and simultaneously help 
contributors organize to promote viable proposals and get the attention of 
committers:
1.  Have a bot scan for stale JIRA issues and PRs—i.e. they haven’t been 
updated in 20+ days (or D+ days, if you prefer).
2.  Depending on the level of community support, either close the item or 
ping specific people for action. Specifically:
a. If the JIRA/PR has no input from a committer and the JIRA/PR has 5+ votes 
(or V+ votes), ping committers for input. (For PRs, you could count comments 
from different people, or thumbs up on the initial PR post.)
b. If the JIRA/PR has no input from a committer and the JIRA/PR has less than V 
votes, close it with a gentle message asking the contributor to solicit support 
from either the community or a committer, and try again later.
c. If the JIRA/PR has input from a committer or committers, ping them for an 
update.

This is just a rough idea. The point is that when contributors have stale 
proposals that they don’t close, committers need to take action. A little 
automation to selectively bring contributions to the attention of committers 
can perhaps help them manage the backlog of stale contributions. The 
“selective” part is implemented in this strawman proposal by using JIRA votes 
as a crude proxy for when the community is interested in something, but it 
could be anything.

Also, this doesn’t have to be used just to clear out stale proposals. Once the 
initial backlog is trimmed down, you could set D to 5 days and use this as a 
regular way to bring contributions to the attention of committers.

I dunno if people think this is perhaps too complex, but at our scale I feel we 
need some kind of loose but automated system for funneling contributions 
through some kind of lifecycle. The status quo is just not that good (e.g. 474 
open PRs against Spark as of this 
moment).

Nick
​

On Fri, Oct 7, 2016 at 4:48 PM Cody Koeninger <[hidden 
email]> wrote:
Matei asked:


> I agree about empowering people interested here to contribute, but I'm 
> wondering, do you think there are technical things that people don't want to 
> work on, or is it a matter of what there's been time to do?


It's a matter of mismanagement and miscommunication.

The structured streaming kafka jira sat with multiple unanswered
requests for someone who was a committer to communicate whether they
were working on it and what the plan was.  I could have done that
implementation and had it in users' hands months ago.  I didn't
pre-emptively do it because I didn't want to then have to argue with
committers about why my code did or did not meet their uncommunicated
expectations.


I don't want to re-hash that particular circumstance, I just want to
make sure it never happens again.


Hopefully the SIP thread results in clearer expectations, but there
are still some id

https://issues.apache.org/jira/browse/SPARK-17691

2016-09-27 Thread assaf.mendelson
Hi,

I wanted to try to implement https://issues.apache.org/jira/browse/SPARK-17691.
So I started by looking at the implementation of collect_list. My idea was, do 
the same as they but when adding a new element, if there are already more than 
the threshold, remove one instead.
The problem with this is that since collect_list has no partial aggregation we 
would end up shuffling all the data anyway. So while it would mean the actual 
resulting column might be smaller, the whole process would be as expensive as 
collect_list.
So I thought of adding partial aggregation. The problem is that the merge 
function receives a buffer which is in a row format. Collect_list doesn't use 
the buffer and uses its own data structure for collecting the data.
I can change the implementation to use a spark ArrayType instead, however, 
since ArrayType is immutable it would mean that I would need to copy it 
whenever I do anything.
Consider the simplest implementation of the update function:
If there are few elements => add an element to the array (if I use regular 
Array this would mean copy as I grow it which is fine for this stage)
If there are enough elements => we do not grow the array. Instead we need to 
decide what to replace. If we want to have the top 10 for example and there are 
10 elements, we need to drop the lowest and put the new one.
This means that if we simply loop across the array we would create a new copy 
and pay the copy + loop. If we keep it sorted then adding, sorting and removing 
the low one means 3 copies.
If I would have been able to use scala's array then I would basically copy 
whenever I grow and then when we grown to the max, all I would need to do is 
REPLACE the relevant element which is much cheaper.

The only other solution I see is to simply provide "take first N" agg function 
and have the user sort beforehand but this seems a bad solution to me both 
because sort is expensive and because if we do multiple aggregations we can't 
sort in two different ways.


I can't find a way to convert an internal buffer the way collect_list does it 
to an internal buffer before the merge.
I also can't find any way to use an array in the internal buffer as a mutable 
array. If I look at GenericMutableRow implementation then updating an array 
means creating a new one. I thought maybe of adding a function 
update_array_element which would change the relevant element (and similarly 
get_array_element to get an array element) which would allow to easily make the 
array mutable but if I look at the documentation it states this is not allowed.

Can anyone give me a tip on where to try to go from here?




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/https-issues-apache-org-jira-browse-SPARK-17691-tp19107.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

RE: Memory usage for spark types

2016-09-20 Thread assaf.mendelson
Thanks for the pointer.

I have been reading the code and trying to understand how to create an 
efficient aggregate function but I must be missing something because it seems 
to me that creating any kind of aggregation function which uses non primitive 
types would have a high overhead.
Consider the following simple example: We have a column which contains the 
numbers 1-10. We want to calculate a histogram for these values.
In an equivalent to the hand written code in 
https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html
 The trivial solution (a student solution) would look something like this:
var hist = new int[10]
for (v in col) {
  hist[v] += 1
}

The problem is that as far as I understand, spark wouldn’t create it this way.
Instead I would need to do something like “update hist in position v by +1” 
which in practice would mean the array will be copied at least 3 times:
First it will be copied from its unsafe implementation to a scala sequence 
(even worse, since arrays always use offsets, the copying would have to be done 
element by element instead of a single memcopy), then since the array is 
immutable, we will have to create a new version of it (by copying and changing 
just the relevant element) and then we copy it back to the unsafe version.

I tried to look at examples in the code which have an intermediate buffer which 
is not a simple structure. Basically, I see two such types of examples: 
distinct operations (which, if I understand correctly, somehow internally has a 
hashmap to contain the distinct values but I can’t find the code which 
generates it) and collect functions (collect_list, collect_set) which do not 
appear to do any code generation BUT define their own buffer as they will (the 
buffer is NOT of a regular type).


So I was wondering, what is the right way to implement an efficient logic as 
above.
I see two options:

1.   Using UDAF – In this case I would define the buffer to have 10 integer 
fields and manipulate each. This solution suffers from two problems: First it 
is slow (especially if there are other aggregations which are using spark sql 
expressions) and second it is limited (I can’t change the size of the array in 
the middle. For example, assuming the above histogram is made on a groupby and 
I know beforehand that in 99% of the cases there are 3 values but in 1% of the 
cases there are 100 values. If I would have used an array I would just convert 
to a bigger array the first time I see a value from the 100)

2.   Implement similar to collect_list and collect_set. If I look at the 
documentation for collect class, this uses the slower sort based aggregation 
path because the number of elmenets can not be determined in advance even 
though in the basic case above, we do know the size. (although I am not sure 
how its performance would compare to the UDAF option). This appears to be 
simpler than UDAF because I can use the data types I want directly, however I 
can’t figure out how the code generation is done as I do not see the relevant 
functions when doing debugCodegen on the result
I also believe there should be a third option by actually implementing the 
proper expression, but I have no idea how to do that.


Can anyone point me in the right direction?


From: rxin [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n18985...@n3.nabble.com]
Sent: Monday, September 19, 2016 12:23 AM
To: Mendelson, Assaf
Subject: Re: Memory usage for spark types

Take a look at UnsafeArrayData and UnsafeMapData.


On Sun, Sep 18, 2016 at 9:06 AM, assaf.mendelson <[hidden 
email]> wrote:
Hi,
I am trying to understand how spark types are kept in memory and accessed.
I tried to look at the code at the definition of MapType and ArrayType for 
example and I can’t seem to find the relevant code for its actual 
implementation.

I am trying to figure out how these two types are implemented to understand how 
they match my needs.
In general, it appears the size of a map is the same as two arrays which is 
about double the naïve array implementation: if I have 1000 rows, each with a 
map from 10K integers to 10K integers, I find through caching the dataframe 
that the total is is ~150MB (the naïve implementation of two arrays would code 
1000*1*(4+4) or a total of ~80MB). I see the same size if I use two arrays. 
Second, what would be the performance of updating the map/arrays as they are 
immutable (i.e. some copying is required).

The reason I am asking this is because I wanted to do an aggregate function 
which calculates a variation of a histogram.
The most naïve solution for this would be to have a map from the bin to the 
count. But since we are talking about an immutable map, wouldn’t that cost a 
lot more?
An even further optimization would be to use a mutable array where we combine 
the key and value to a single value (key and value are both int in my case). 
Assuming the maximum num

Memory usage for spark types

2016-09-18 Thread assaf.mendelson
Hi,
I am trying to understand how spark types are kept in memory and accessed.
I tried to look at the code at the definition of MapType and ArrayType for 
example and I can't seem to find the relevant code for its actual 
implementation.

I am trying to figure out how these two types are implemented to understand how 
they match my needs.
In general, it appears the size of a map is the same as two arrays which is 
about double the naïve array implementation: if I have 1000 rows, each with a 
map from 10K integers to 10K integers, I find through caching the dataframe 
that the total is is ~150MB (the naïve implementation of two arrays would code 
1000*1*(4+4) or a total of ~80MB). I see the same size if I use two arrays. 
Second, what would be the performance of updating the map/arrays as they are 
immutable (i.e. some copying is required).

The reason I am asking this is because I wanted to do an aggregate function 
which calculates a variation of a histogram.
The most naïve solution for this would be to have a map from the bin to the 
count. But since we are talking about an immutable map, wouldn't that cost a 
lot more?
An even further optimization would be to use a mutable array where we combine 
the key and value to a single value (key and value are both int in my case). 
Assuming the maximum number of bins is small (e.g. less than 10), it is often 
cheaper to just search the array for the right key (and in this case the size 
of the data is expected to be significantly smaller than map). In my case, most 
of the type (90%) there are less than 3 elements in the bin and If I have more 
than 10 bins I basically do a combination to reduce the number.

For few elements, a map becomes very inefficient  - If I create 10M rows with 1 
map from int to int each I get an overall of ~380MB meaning ~38 bytes per 
element (instead of just 8). For array, again it is too large (229MB, i.e. ~23 
bytes per element).

Is there a way to implement a simple mutable array type to use in the 
aggregation buffer? Where is the portion of the code that handles the actual 
type handling?
Thanks,
Assaf.




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Memory-usage-for-spark-types-tp18984.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

RE: UDF and native functions performance

2016-09-12 Thread assaf.mendelson
What is the constraint framework?
How would I add the same optimization to the sample function I created?


From: rxin [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n18932...@n3.nabble.com]
Sent: Tuesday, September 13, 2016 3:37 AM
To: Mendelson, Assaf
Subject: Re: UDF and native functions performance

Not sure if this is why but perhaps the constraint framework?

On Tuesday, September 13, 2016, Mendelson, Assaf <[hidden 
email]> wrote:
I did, they look the same:

scala> my_func.explain(true)
== Parsed Logical Plan ==
Filter smaller#3L < 10
+- Project [id#0L AS smaller#3L]
   +- Range (0, 5, splits=1)

== Analyzed Logical Plan ==
smaller: bigint
Filter smaller#3L < 10
+- Project [id#0L AS smaller#3L]
   +- Range (0, 5, splits=1)

== Optimized Logical Plan ==
Filter smaller#3L < 10
+- InMemoryRelation [smaller#3L], true, 1, StorageLevel(disk, memory, 
deserialized, 1 replicas)
   :  +- *Project [id#0L AS smaller#3L]
   : +- *Range (0, 5, splits=1)

== Physical Plan ==
*Filter smaller#3L < 10
+- InMemoryTableScan [smaller#3L], [smaller#3L < 10]
   :  +- InMemoryRelation [smaller#3L], true, 1, StorageLevel(disk, memory, 
deserialized, 1 replicas)
   : :  +- *Project [id#0L AS smaller#3L]
   : : +- *Range (0, 5, splits=1)

scala> base_filter_df.explain(true)
== Parsed Logical Plan ==
'Filter (smaller#3L < 10)
+- Project [id#0L AS smaller#3L]
   +- Range (0, 5, splits=1)

== Analyzed Logical Plan ==
smaller: bigint
Filter (smaller#3L < cast(10 as bigint))
+- Project [id#0L AS smaller#3L]
   +- Range (0, 5, splits=1)

== Optimized Logical Plan ==
Filter (smaller#3L < 10)
+- InMemoryRelation [smaller#3L], true, 1, StorageLevel(disk, memory, 
deserialized, 1 replicas)
   :  +- *Project [id#0L AS smaller#3L]
   : +- *Range (0, 5, splits=1)

== Physical Plan ==
*Filter (smaller#3L < 10)
+- InMemoryTableScan [smaller#3L], [(smaller#3L < 10)]
   :  +- InMemoryRelation [smaller#3L], true, 1, StorageLevel(disk, memory, 
deserialized, 1 replicas)
   : :  +- *Project [id#0L AS smaller#3L]
   : : +- *Range (0, 5, splits=1)


Also when I do:

import org.apache.spark.sql.execution.debug._
df.debugCodegen

on both of them they are identical.
I did notice that if I change the code to do > instead of < then they have 
almost the same performance so I imagine this has something to do with some 
optimization that understands that range is ordered and therefore once the 
first condition fails, all would fail.
The problem is I don’t see this in the plan, nor can I find it in the code.


From: Takeshi Yamamuro [mailto:linguin.m.s@<mailto:linguin.m.s@>...]
Sent: Monday, September 12, 2016 7:12 PM
To: Mendelson, Assaf
Cc: dev@...
Subject: Re: UDF and native functions performance

Hi,

I think you'd better off comparing the gen'd code of `df.filter` and your gen'd 
code
by using .debugCodegen().

// maropu

On Mon, Sep 12, 2016 at 7:43 PM, assaf.mendelson <assaf.mendelson@<mailto:assaf.mendelson@>...> wrote:
I am trying to create UDFs with improved performance. So I decided to compare 
several ways of doing it.
In general I created a dataframe using range with 50M elements, cached it and 
counted it to manifest it.

I then implemented a simple predicate (x<10) in 4 different ways, counted the 
elements and timed it.
The 4 ways were:

-  Standard expression (took 90 millisonds)

-  Udf  (took 539 miliseconds)

-  Codegen (took 358 miliseconds)

-  Dataset filter (took 1022 miliseconds)

I understand why filter is so much slower. I also understand why UDF is slower 
(with volcano model taking up processing time).
I do not understand why the codegen I created is so slow. What am I missing?

The code to generate the numbers is followed:

import org.apache.spark.sql.codegenFuncs._
val df = spark.range(5000).withColumnRenamed("id","smaller")
df.cache().count()

val base_filter_df = df.filter(df("smaller") < 10)

import org.apache.spark.sql.functions.udf
def asUdf=udf((x: Int) => x < 10)
val udf_filter_df = df.filter(asUdf(df("smaller")))

val my_func = df.filter(genf_func(df("smaller")))

case class tmpclass(smaller: BigInt)

val simpleFilter = df.as<http://df.as>[tmpclass].filter((x: tmpclass) => 
(x.smaller < 10))

def time[R](block: => R) = {
val t0 = System.nanoTime()
val result = block// call-by-name
val t1 = System.nanoTime()
(t1 - t0)/100
}

def avgTime[R](block: => R) = {
val times = for (i <- 1 to 5) yield time(block)
times.sum / 5
}


println("base " + avgTime(base_filter_df.count()))
//>> got a result of 90
println("udf " + avgTime(udf_filter_df.count()))
//>> got a result of 539
println("codegen " + avgTime(my_func.count()))
//&

UDF and native functions performance

2016-09-12 Thread assaf.mendelson
I am trying to create UDFs with improved performance. So I decided to compare 
several ways of doing it.
In general I created a dataframe using range with 50M elements, cached it and 
counted it to manifest it.

I then implemented a simple predicate (x<10) in 4 different ways, counted the 
elements and timed it.
The 4 ways were:

-  Standard expression (took 90 millisonds)

-  Udf  (took 539 miliseconds)

-  Codegen (took 358 miliseconds)

-  Dataset filter (took 1022 miliseconds)

I understand why filter is so much slower. I also understand why UDF is slower 
(with volcano model taking up processing time).
I do not understand why the codegen I created is so slow. What am I missing?

The code to generate the numbers is followed:

import org.apache.spark.sql.codegenFuncs._
val df = spark.range(5000).withColumnRenamed("id","smaller")
df.cache().count()

val base_filter_df = df.filter(df("smaller") < 10)

import org.apache.spark.sql.functions.udf
def asUdf=udf((x: Int) => x < 10)
val udf_filter_df = df.filter(asUdf(df("smaller")))

val my_func = df.filter(genf_func(df("smaller")))

case class tmpclass(smaller: BigInt)

val simpleFilter = df.as[tmpclass].filter((x: tmpclass) => (x.smaller < 10))

def time[R](block: => R) = {
val t0 = System.nanoTime()
val result = block// call-by-name
val t1 = System.nanoTime()
(t1 - t0)/100
}

def avgTime[R](block: => R) = {
val times = for (i <- 1 to 5) yield time(block)
times.sum / 5
}


println("base " + avgTime(base_filter_df.count()))
//>> got a result of 90
println("udf " + avgTime(udf_filter_df.count()))
//>> got a result of 539
println("codegen " + avgTime(my_func.count()))
//>> got a result of 358
println("filter " + avgTime(simpleFilter.count()))
//>> got a result of 1022

And the code for the genf_func:

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.expressions._

object codegenFuncs {
  case class genf(child: Expression) extends UnaryExpression with Predicate 
with ImplicitCastInputTypes {

override def inputTypes: Seq[AbstractDataType] = Seq(IntegerType)

override def toString: String = s"$child < 10"

override def eval(input: InternalRow): Any = {
  val value = child.eval(input)
  if (value == null)
  {
false
  } else {
child.dataType match {
  case IntegerType => value.asInstanceOf[Int] < 10
}
  }
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
  defineCodeGen(ctx, ev, c => s"($c) < 10")
}
  }

  private def withExpr(expr: Expression): Column = Column(expr)

  def genf_func(v: Column): Column = withExpr { genf(v.expr) }
}






--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/UDF-and-native-functions-performance-tp18920.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Test fails when compiling spark with tests

2016-09-11 Thread assaf.mendelson
Hi,
I am trying to set up a spark development environment. I forked the spark git 
project and cloned the fork. I then checked out branch-2.0 tag (which I assume 
is the released source code).
I then compiled spark twice.
The first using:
mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -DskipTests clean package
This compiled successfully.
The second using mvn -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 clean package
This got a failure in Spark Project Core with the following test failing:
- caching in memory and disk, replicated
- caching in memory and disk, serialized, replicated *** FAILED ***
  java.util.concurrent.TimeoutException: Can't find 2 executors before 3 
milliseconds elapsed
  at 
org.apache.spark.ui.jobs.JobProgressListener.waitUntilExecutorsUp(JobProgressListener.scala:573)
  at 
org.apache.spark.DistributedSuite.org$apache$spark$DistributedSuite$$testCaching(DistributedSuite.scala:154)
  at 
org.apache.spark.DistributedSuite$$anonfun$32$$anonfun$apply$1.apply$mcV$sp(DistributedSuite.scala:191)
  at 
org.apache.spark.DistributedSuite$$anonfun$32$$anonfun$apply$1.apply(DistributedSuite.scala:191)
  at 
org.apache.spark.DistributedSuite$$anonfun$32$$anonfun$apply$1.apply(DistributedSuite.scala:191)
  at 
org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
  at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
  at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
  at org.scalatest.Transformer.apply(Transformer.scala:22)
  at org.scalatest.Transformer.apply(Transformer.scala:20)
  ...
- compute without caching when no partitions fit in memory

I made no changes to the code whatsoever. Can anyone help me figure out what is 
wrong with my environment?
BTW I am using maven 3.3.9 and java 1.8.0_101-b13

Thanks,
Assaf




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Test-fails-when-compiling-spark-with-tests-tp18919.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

implement UDF/UDAF supporting whole stage codegen

2016-09-07 Thread assaf.mendelson
Hi,
I want to write a UDF/UDAF which provides native processing performance. 
Currently, when creating a UDF/UDAF in a normal manner the performance is hit 
because it breaks optimizations.
For a simple example I wanted to create a UDF which tests whether the value is 
smaller than 10.
I tried something like this :

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
import org.apache.spark.sql.catalyst.expressions._

case class genf(child: Expression) extends UnaryExpression with Predicate with 
ImplicitCastInputTypes {

  override def inputTypes: Seq[AbstractDataType] = Seq(IntegerType)

  override def toString: String = s"$child < 10"

  override def eval(input: InternalRow): Any = {
val value = child.eval(input)
if (value == null)
{
  false
} else {
  child.dataType match {
case IntegerType => value.asInstanceOf[Int] < 10
  }
}
  }

  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
   defineCodeGen(ctx, ev, c => s"($c) < 10")
  }
}


However, this doesn't work as some of the underlying classes/traits are private 
(e.g. AbstractDataType is private) making it problematic to create a new case 
class.
Is there a way to do it? The idea is to provide a couple of jars with a bunch 
of functions our team needs.
Thanks,
Assaf.





--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/implement-UDF-UDAF-supporting-whole-stage-codegen-tp18874.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.