[GitHub] spark issue #23075: [SPARK-26084][SQL] Fixes unresolved AggregateExpression....

2018-11-18 Thread ssimeonov
Github user ssimeonov commented on the issue:

https://github.com/apache/spark/pull/23075
  
@MaxGekk done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23075: [SPARK-26084][SQL] Fixes unresolved AggregateExpression....

2018-11-18 Thread ssimeonov
Github user ssimeonov commented on the issue:

https://github.com/apache/spark/pull/23075
  
@MaxGekk I see no mention of `AggregateExpression` in the 
`sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions` 
subtree. Which suite would you like me to add the test in? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23075: [SPARK-26084][SQL] Fixes unresolved AggregateExpr...

2018-11-17 Thread ssimeonov
GitHub user ssimeonov opened a pull request:

https://github.com/apache/spark/pull/23075

[SPARK-26084][SQL] Fixes unresolved AggregateExpression.references exception

## What changes were proposed in this pull request?

This PR fixes an exception in `AggregateExpression.references` called on 
unresolved expressions. It implements the solution proposed in 
[SPARK-26084](https://issues.apache.org/jira/browse/SPARK-26084), a minor 
refactoring that removes the unnecessary dependence on `AttributeSet.toSeq`, 
which requires expression IDs and, therefore, can only execute successfully for 
resolved expressions.

The refactored implementation is both simpler and faster, eliminating the 
conversion of a `Set` to a
`Seq` and back to `Set`.

## How was this patch tested?

Local tests pass. I added no new tests as (a) the new behavior has no 
failing case and (b) this is a simple refactoring.

@hvanhovell 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/swoop-inc/spark ss_SPARK-26084

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23075.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23075


commit 178f0a5dff9f7eb8887ed711727b2f83af40ae8a
Author: Simeon Simeonov 
Date:   2018-11-18T01:05:07Z

[SPARK-26084][SQL] Fixes unresolved AggregateExpression.references exception

Implements the solution proposed in 
[SPARK-26084](https://issues.apache.org/jira/browse/SPARK-26084),
a minor refactoring that removes the unnecessary dependence on 
`AttributeSet.toSeq`,
which requires expression IDs and, therefore, can only execute successfully 
for resolved expressions.

The refactored implementation is both simpler and faster, eliminating the 
conversion of a `Set` to a
`Seq` and back to `Set`.

I added no new tests as (a) the new behavior has no failing case and (b) 
this is a simple refactoring.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21840: [WIP] New copy() method for Column of StructType

2018-07-22 Thread ssimeonov
Github user ssimeonov commented on a diff in the pull request:

https://github.com/apache/spark/pull/21840#discussion_r204250360
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Column.scala ---
@@ -1234,6 +1234,8 @@ class Column(val expr: Expression) extends Logging {
*/
   def over(): Column = over(Window.spec)
 
+  def copy(field: String, value: Column): Column = 
withExpr(StructCopy(expr, field, value.expr))
--- End diff --

> Can .cast() be expressed via the .copy() method if the former one will 
support add/delete/update operations?

Are you suggesting `Dataset` adds `copy()` as opposed to `cast()`?

Otherwise, certainly, `Dataset.cast()` can be implemented with 
`Dataset.select()` but it's not easy even in the simple cases and becomes 
noticeably more complicated when you consider correctly setting nullability & 
metadata in the edge cases 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21840: Initial implementation of copy function

2018-07-22 Thread ssimeonov
Github user ssimeonov commented on a diff in the pull request:

https://github.com/apache/spark/pull/21840#discussion_r204246252
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Column.scala ---
@@ -1234,6 +1234,8 @@ class Column(val expr: Expression) extends Logging {
*/
   def over(): Column = over(Window.spec)
 
+  def copy(field: String, value: Column): Column = 
withExpr(StructCopy(expr, field, value.expr))
--- End diff --

BTW, to fully unify top-level and nested column manipulation, we should add 
`Dataset.cast(schema: StructType)` as `cast()` can be used effectively to 
change the schema of a struct, e.g.,

```scala
spark.range(1, 2).toDF("x")
  .withColumn("st", struct('x.as("x"), 'x.cast(DoubleType).as("y")))
  .select('st.cast(new StructType().add("y", LongType).add("x", 
IntegerType)))
```

For `Dataset`, outside of `DataFrameReader.schema()`, which may be too 
early in the transformation flow, there is no easy way for end users to change 
a schema. Experienced Spark developers will have no problem coming up with 
`spark.createDataFrame(df.rdd, newSchema)` but that's neither consistent with 
how `cast` works nor chainable as a transformation unless used with 
`transform()` which takes us quite far away from simple Spark user code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21840: Initial implementation of copy function

2018-07-22 Thread ssimeonov
Github user ssimeonov commented on a diff in the pull request:

https://github.com/apache/spark/pull/21840#discussion_r204243887
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3858,3 +3858,29 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+case class StructCopy(
+struct: Expression,
+fieldName: String,
+fieldValue: Expression) extends Expression with CodegenFallback {
+
+  override def children: Seq[Expression] = Seq(struct, fieldValue)
+  override def nullable: Boolean = struct.nullable
+
+  lazy val fieldIndex = 
struct.dataType.asInstanceOf[StructType].fieldIndex(fieldName)
+
+  override def dataType: DataType = {
+val structType = struct.dataType.asInstanceOf[StructType]
+val field = structType.fields(fieldIndex).copy(dataType = 
fieldValue.dataType)
--- End diff --

Nullability information should come from `fieldValue`.

Also, I would suggest extending the constructor to allow for `Metadata` 
associated with the `fieldValue` expression, which can be added to the schema. 
It is not reasonable to assume that (a) any existing metadata should be 
preserved or (b) that there will be no need to associate new metadata.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21840: Initial implementation of copy function

2018-07-22 Thread ssimeonov
Github user ssimeonov commented on a diff in the pull request:

https://github.com/apache/spark/pull/21840#discussion_r204244143
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3858,3 +3858,29 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+case class StructCopy(
+struct: Expression,
+fieldName: String,
+fieldValue: Expression) extends Expression with CodegenFallback {
+
+  override def children: Seq[Expression] = Seq(struct, fieldValue)
+  override def nullable: Boolean = struct.nullable
+
+  lazy val fieldIndex = 
struct.dataType.asInstanceOf[StructType].fieldIndex(fieldName)
+
+  override def dataType: DataType = {
+val structType = struct.dataType.asInstanceOf[StructType]
--- End diff --

Can refactor `struct.dataType.asInstanceOf[StructType]` into a method and 
add a meaningful error message if the cast fails.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21840: Initial implementation of copy function

2018-07-22 Thread ssimeonov
Github user ssimeonov commented on a diff in the pull request:

https://github.com/apache/spark/pull/21840#discussion_r204245778
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Column.scala ---
@@ -1234,6 +1234,8 @@ class Column(val expr: Expression) extends Logging {
*/
   def over(): Column = over(Window.spec)
 
+  def copy(field: String, value: Column): Column = 
withExpr(StructCopy(expr, field, value.expr))
--- End diff --

Some things to consider about the API:

- How is custom metadata associated with the updated field?
- How can a field be deleted?
- How can a field be added?
- When a field is added, where does it go in the schema? The only 
logical place is at the end but that may not be what's desired in some cases.

Simply for discussion purposes (overloaded methods are not shown):

```scala
class Column(val expr: Expression) extends Logging {

  // ...

  // matches Dataset.schema semantics; errors on non-struct columns
  def schema: StructType

  // matches Dataset.select() semantics, errors on non-struct columns
  // '* support allows multiple new fields to be added easily, saving 
cumbersome repeated withColumn() calls
  def select(cols: Column*): Column

  // matches Dataset.withColumn() semantics of add or replace
  def withColumn(colName: String, col: Column): Column

  // matches Dataset.drop() semantics
  def drop(colName: String): Column

}
```

The benefit of the above API is that it unifies manipulating top-level & 
nested columns, which I would argue is very desirable. The addition of `schema` 
and `select()` allows for nested field reordering, casting, etc., which is 
important in data exchange scenarios where field position matters.

/cc @rxin


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21840: Initial implementation of copy function

2018-07-22 Thread ssimeonov
Github user ssimeonov commented on a diff in the pull request:

https://github.com/apache/spark/pull/21840#discussion_r204244400
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3858,3 +3858,29 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+case class StructCopy(
+struct: Expression,
+fieldName: String,
+fieldValue: Expression) extends Expression with CodegenFallback {
+
+  override def children: Seq[Expression] = Seq(struct, fieldValue)
+  override def nullable: Boolean = struct.nullable
+
+  lazy val fieldIndex = 
struct.dataType.asInstanceOf[StructType].fieldIndex(fieldName)
--- End diff --

If the semantics are "copy", i.e., only existing fields must be used, it 
would be nice to add a user-friendly error message here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...

2018-07-18 Thread ssimeonov
Github user ssimeonov commented on the issue:

https://github.com/apache/spark/pull/21589
  
> Repartitioning based upon a snapshot of the number of cores available 
cluster-wide is clearly not the correct thing to do in many instances and use 
cases.

I wholeheartedly agree and I can't wait for the better approach(es) you 
proposed. In the meantime, repartitioning to a constant number of partitions, 
which is what people do today, is a lot worse in most instances and use cases 
(obviously excluding the situations where a fixed number of partitions is 
driven by a requirement).

In the end, your objections provide absolutely no immediate & practical 
alternative to an immediate & common problem that faces any Spark user whose 
jobs execute on clusters of varying size, a problem that meaningfully affects 
performance and cost.

> ... I don't appreciate being pinned ...

None of us do, @markhamstra, but that's sometimes how we help others, in 
this case, the broader Spark user community.

> I don't accept your assertions of what constitutes the majority and 
minority of Spark users or use cases or their relative importance.

My claims are based on (a) the constitution of data engineering/science 
teams at all non-ISV companies whose engineering structures/head counts I know 
well (7), (b) what multiple recruiters are telling me about hiring trends (East 
Coast-biased but consistently confirmed when talking to West Coast colleagues) 
and (c) the audiences at Spark meetups and the Spark Summit where I speak 
frequently. What is your non-acceptance based on?

> As a long-time maintainer of the Spark scheduler, it is also not my 
concern to define which Spark users are important or not, but rather to foster 
system internals and a public API that benefit all users.

I still do not understand how you evaluate an API. Do you mean you have a 
way of knowing when a public API benefits all users _without_ understanding how 
user personas break down by volume and/or by importance? Or, perhaps, you 
evaluate an API according to how well it serves the "average" user, who must be 
some strange cross between a Scala Spark committer, a Java data engineer and a 
Python/R data scientist, or the "average" Spark job, which must be a mix 
between batch ETL, streaming and ML/AI training? Or, just based on what you 
feel is right?

Your work on the Spark scheduler and its APIs is much appreciated as is 
your expertise in evolving these APIs over time. However, this PR is NOT about 
the scheduler API. It is about the public `SparkContext`/`SparkSession` APIs 
that are exposed to the end users of Spark. @MaxGekk spends his days talking to 
end users of Spark across dozens if not hundreds of companies. I would argue he 
has an excellent, mostly unbiased perspective of the life and needs of people 
using Spark. Do you have an excellent and mostly unbiased perspective of how 
Spark is used in the real world? You work on Spark internals, which means that 
you do not spend your days using Spark. Your users are internal Spark 
developers, not the end users of Spark. You work at a top-notch ISV, a highly 
technical organization, which is not representative of the broader Spark 
community.

I strongly feel that you are trying to do what's right but have you 
considered the possibility that @MaxGekk has a much more accurate perspective 
of Spark user needs, and the urgency of addressing those needs, and that the 
way you judge this PR is biased by your rather unique perspective and 
environment?

I have nothing more to say on the topic of this PR. No matter which way it 
goes, I thank @MaxGekk for looking out for Spark users and @mridulm + 
@markhamstra for trying to do the right thing, as they see it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...

2018-07-18 Thread ssimeonov
Github user ssimeonov commented on the issue:

https://github.com/apache/spark/pull/21589
  
@markhamstra I am confused about your API evaluation criteria. 

You are not arguing about the specific benefits these changes can provide 
immediately to an increasing majority of Spark users. Great.

You have some concerns about a minority audience of Spark users and you are 
using those concerns to argue against immediate, simple and specific 
improvements for the majority of Spark users. No problem, except that the 
details of your concerns are rather fuzzy. Can you please make explicit the 
specific harm you see in these APIs, as opposed to just arguing that there is a 
theoretical, yet-to-be-defined-but-surely-just-right way to improve job 
execution performance at an unspecified point in the future that will work well 
for both majority and minority users?

BTW, the package argument is irrelevant here. Tons of things that are in 
Spark can be done with Spark packages but, instead, we add them to the core 
project because this increases the likelihood that they will benefit the most 
users. The use cases discussed here are about essentially any type of job that 
repartitions or coalesces, which clearly falls under the umbrella of 
benefitting the most users.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...

2018-07-18 Thread ssimeonov
Github user ssimeonov commented on the issue:

https://github.com/apache/spark/pull/21589
  
@markhamstra even the words you are using indicate that you are missing the 
intended audience.

> high-level, declarative abstraction that can be used to specify requested 
Job resource-usage policy

How exactly do you imagine data scientists using something like this as 
they hack in a Jupyter or Databricks notebook in Python to sample data from a 
10Tb dataset?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...

2018-07-18 Thread ssimeonov
Github user ssimeonov commented on the issue:

https://github.com/apache/spark/pull/21589
  
@markhamstra the purpose of this PR is not to address the topic of dynamic 
resource management in arbitrarily complex Spark environments. Most Spark users 
do not operate in such environments. It is to help simple Spark users refactor 
code such as

```scala
df.repartition(25) // and related repartition() + coalesce() variants
```

to make job execution take advantage of additional cores, when they are 
available. 

Asking for a greater degree of parallelism than the cores a job has 
available rarely has significant negative effects (for reasonable values). 
Asking for a low degree of parallelism when there are lots of cores available 
has significant negative effects, especially in the common real-world use cases 
where there is lots of data skew. That's the point that both you and @mridulm 
seem to be missing. The arguments about resources flexing during job execution 
to do change this. 

My team has used this simple technique for years on both static and 
autoscaling clusters and we've seen meaningful performance improvements in both 
ETL and ML/AI-related data production for data ranging from gigabytes to 
petabytes. The idea is simple enough that even data scientists can (and do) 
easily use it. That's the benefit of this PR and that's why I like it. The cost 
of this PR is adding two simple & clear methods. The cost-benefit analysis 
seems obvious.

I agree with you that lots more can be done to handle the general case of 
better matching job resource needs to cluster/pool resources. This work is 
going to take forever given the current priorities. Let's not deny the majority 
of Spark users simple & real execution benefits while we dream about amazing 
architectural improvements. 

When looking at the net present value of performance, the discount factor 
is large. Performance improvements now are a lot more valuable than performance 
improvements in the far future.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21589: [SPARK-24591][CORE] Number of cores and executors in the...

2018-07-18 Thread ssimeonov
Github user ssimeonov commented on the issue:

https://github.com/apache/spark/pull/21589
  
@mridulm your comments make an implicit assumption, which is quite 
incorrect: that Spark users read the Spark codebase and/or are aware of Spark 
internals. Please, consider this PR in the context of its intended audience who 
(a) do not read the source code and (b) hardly look at the API docs. What they 
read are things like Stack Overflow, the Databricks Guide, blog posts and 
(quite rarely) the occasional how-to-with-Spark book. The fact that something 
is possible with Spark doesn't make it easy or intuitive. The value of this PR 
is that it makes a common use case easy and intuitive.

Let's consider the practicality of your suggestions:

> Rely on defaultParallelism - this gives the expected result, unless 
explicitly overridden by user.

That doesn't address the core use case as the scope of change & effect is 
very different. In the targeted use cases, a user wants to explicitly control 
the level of parallelism relative to the current cluster physical state for 
potentially a single stage. Relying on `defaultParallelism` exposes the user to 
undesired side-effects as the setting can be changed by other, potentially 
unrelated code the user has no control over. Introducing unintended side 
effects, which your suggestion does, is poor design.

> If you need fine grained information about executors, use spark listener 
(it is trivial to keep a count with onExecutorAdded/onExecutorRemoved).

I'd suggest you reconsider your definition of "trivial". Normal Spark 
users, not people who work on Spark or at companies like Hortonworks whose job 
is to be Spark experts, have no idea what a listener is, have never hooked one 
and never will. Not to mention how much fun it is to do this from, say, R.

> If you simply want a current value without own listener - use REST api to 
query for current executors.

This type of suggestion is a prime example of ignoring Spark user concerns. 
You are comparing `sc.numExecutors` with:

1. Knowing that a REST API exists that can produce this result.
2. Learning the details of the API.
3. Picking a synchronous REST client in the language they are using Spark 
with.
4. Initializing the REST client with the correct endpoint which they 
obtain... somehow.
5. Formulating the request.
6. Parsing the response.

I don't think there is any need to say more about this suggestion.

Taking a step back, it is important to acknowledge that Spark has become a 
mass-market data platform product and start designing user-facing APIs with 
this in mind. If the teams I know are any indication, the majority of Spark 
users are not experienced backend/data engineers. They are data scientists and 
data hackers: people who are getting into big data via Spark. The imbalance is 
only going to grow. The criteria by which user-focused Spark APIs are evaluated 
should evolve accordingly. 

From an ease-of-use perspective, I'd argue the two new methods should be 
exposed to `SparkSession` also as this is the typical new user "entry point". 
For example, the data scientists on my team never use `SparkContext` but they 
do adjust stage parallelism via implicits equivalent to the ones proposed in 
this PR (to significant benefit in query execution performance).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20276: [SPARK-14948][SQL] disambiguate attributes in join condi...

2018-07-08 Thread ssimeonov
Github user ssimeonov commented on the issue:

https://github.com/apache/spark/pull/20276
  
@cloud-fan do you expect to resolve conflict + merge at some point?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21730: [SPARK-24761][SQL] Adding of isModifiable() to Ru...

2018-07-08 Thread ssimeonov
Github user ssimeonov commented on a diff in the pull request:

https://github.com/apache/spark/pull/21730#discussion_r200852355
  
--- Diff: python/pyspark/sql/conf.py ---
@@ -63,6 +63,12 @@ def _checkType(self, obj, identifier):
 raise TypeError("expected %s '%s' to be a string (was '%s')" %
 (identifier, obj, type(obj).__name__))
 
+@ignore_unicode_prefix
+@since(2.4)
+def isModifiable(self, key):
+"""Is the configuration property modifiable or not."""
--- End diff --

Minor language tweak suggestion:

```
Indicates whether the configuration property with the given key is 
modifiable in the current session.
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21730: [SPARK-24761][SQL] Adding of isModifiable() to Ru...

2018-07-08 Thread ssimeonov
Github user ssimeonov commented on a diff in the pull request:

https://github.com/apache/spark/pull/21730#discussion_r200851248
  
--- Diff: python/pyspark/sql/conf.py ---
@@ -63,6 +63,12 @@ def _checkType(self, obj, identifier):
 raise TypeError("expected %s '%s' to be a string (was '%s')" %
 (identifier, obj, type(obj).__name__))
 
+@ignore_unicode_prefix
+@since(2.4)
+def isModifiable(self, key):
+"""Is the configuration property modifiable or not."""
--- End diff --

Why not make all doc  comments across languages identical?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21539: [SPARK-24500][SQL] Make sure streams are materialized du...

2018-06-12 Thread ssimeonov
Github user ssimeonov commented on the issue:

https://github.com/apache/spark/pull/21539
  
👍 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21410: [SPARK-24366][SQL] Improving of error messages for type ...

2018-05-23 Thread ssimeonov
Github user ssimeonov commented on the issue:

https://github.com/apache/spark/pull/21410
  
This is an excellent start and a worthy improvement. 

Is there a way to identify where in the schema the issue is occurring? For 
example, when you have a schema with many nested fields, the failing value is 
helpful but the breadcrumb trail, e.g., `a.b.c` where this is happening, is 
required to easily isolate the issue in the input data and resolve it. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20894: [SPARK-23786][SQL] Checking column names of csv headers

2018-05-14 Thread ssimeonov
Github user ssimeonov commented on the issue:

https://github.com/apache/spark/pull/20894
  
@HyukjinKwon we are one of the Spark users experiencing this problem in the 
real world: dealing with dirty data produced by a variety of third party 
systems. Documentation doesn't solve anything here: it simply punts a 
complicated series of traversals and manual header checks. We've had to 
implement them to address the issue in the absence of this capability.

I agree that there are (too) many reader (and writer) options in Spark. 
That's fundamentally a side effect of architecture choices: Spark offers no 
user-accessible lifecycle hooks for custom loading/validation (or saving, for 
that matter). This is a fundamental limitation, something I've discussed with 
@rxin and @marmbrus in the past. (For example, at Swoop, we've had to 
completely re-implement `DataFrameWriter` as `OpenDataFrameWriter` to expose 
options & the data being written so that the API can be extended via 
implicits.) Let's not blame this PR based a Spark architecture choice. In the 
absence of the abovementioned hooks, the only way to meaningfully extend 
Spark's loading behavior is by jamming more code into data sources and adding 
more options to control it. Given that (1) this is the Spark 2.x approach to 
data sources, (2) the issue this PR addresses can lead to silent correctness 
problems and (3) users have no easy way, within Spark, to identify and correct 
the issue, I
  believe this PR serves a very useful purpose in the Spark ecosystem.

By analogy, consider SQL's UNION, which can cause problems because columns 
are combined not by name but by position, hence, `(x: Int, y: Int)` can be 
happily UNIONed with `(y: Int, x: Int)` to create, very likely, gobbledygook. 
The difference in this case is that (a) the SQL UNION behavior is standardized, 
(b) it has been well-documented for decades and (c) the operands' schema can be 
easily inspected via Spark. This is a great situation where users who end up in 
trouble can be told to RTFM. By contrast, the Spark CSV data source behavior is 
(a) arbitrary, (b) undocumented and (c) Spark provides no useful tools for 
users to check whether they'll get in trouble.

Great job, @MaxGekk!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16204: [SPARK-18775][SQL] Limit the max number of record...

2016-12-07 Thread ssimeonov
Github user ssimeonov commented on a diff in the pull request:

https://github.com/apache/spark/pull/16204#discussion_r91437338
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
 ---
@@ -225,32 +228,50 @@ object FileFormatWriter extends Logging {
   taskAttemptContext: TaskAttemptContext,
   committer: FileCommitProtocol) extends ExecuteWriteTask {
 
-private[this] var outputWriter: OutputWriter = {
+private[this] var currentWriter: OutputWriter = _
--- End diff --

Looking through the code, three things stand out:

1. There is code duplication between `SingleDirectoryWriteTask` and 
`DynamicPartitionWriteTask` when it comes to current writer management and 
cleanup.

2. There is duplication within `releaseResources()` and `newOutputWriter()` 
of the write tasks when it comes to releasing resources. 

3. Write task state management is leaky because `releaseResources()` is 
called explicitly by `executeTask()`. Also, `releaseResources()` will be called 
twice when there are no exceptions and once if there is an exception in 
`execute()`, which is a bit confusing. 

What about asking the base trait to do a bit more work and present a 
stronger contract to its users, e.g.:

```scala
  private trait ExecuteWriteTask {

protected[this] var currentWriter: OutputWriter = null

def execute(iterator: Iterator[InternalRow]): Set[String] = {
  try {
executeImp(iterator)
  } finally {
releaseResources()
  }
}

/**
 * Writes data out to files, and then returns the list of partition 
strings written out.
 * The list of partitions is sent back to the driver and used to update 
the catalog.
 */
protected def executeImp(iterator: Iterator[InternalRow]): Set[String]

protected def resetCurrentWriter(): Unit = {
  if (currentWriter != null) {
currentWriter.close()
currentWriter = null
  }
}

protected def releaseResources(): Unit = {
  resetCurrentWriter()
}
  }
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15724: [SPARK-18216][SQL] Make Column.expr public

2016-11-01 Thread ssimeonov
Github user ssimeonov commented on the issue:

https://github.com/apache/spark/pull/15724
  
👍 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13623: [SPARK-15895][SQL] Filters out metadata files while doin...

2016-06-12 Thread ssimeonov
Github user ssimeonov commented on the issue:

https://github.com/apache/spark/pull/13623
  
👍 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #13623: [SPARK-15895][SQL] Filters out metadata files whi...

2016-06-12 Thread ssimeonov
Github user ssimeonov commented on a diff in the pull request:

https://github.com/apache/spark/pull/13623#discussion_r66724148
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala
 ---
@@ -96,7 +96,10 @@ abstract class PartitioningAwareFileCatalog(
 
   protected def inferPartitioning(): PartitionSpec = {
 // We use leaf dirs containing data files to discover the schema.
-val leafDirs = leafDirToChildrenFiles.keys.toSeq
+val leafDirs = leafDirToChildrenFiles.filterNot {
+  // SPARK-15895: Metadata files like Parquet summary files should not 
be counted as data files.
+  case (_, files) => files.forall(_.getPath.getName.startsWith("_"))
--- End diff --

For compatibility with 
https://github.com/rxin/spark/blob/705a76f84605f1d54e98caae7f81631bf80c8feb/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala#L347
 would it be helpful to also exclude any `.`-prefixed files?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10574] [ML] [MLlib] HashingTF supports ...

2016-04-19 Thread ssimeonov
Github user ssimeonov commented on the pull request:

https://github.com/apache/spark/pull/12498#issuecomment-211949147
  
When the "hashing trick" is used in practice, it is important to do things 
such as monitor, manage or randomize collisions. If that show there are 
problems with the chosen hashing approach, then one would experiment with the 
hashing function. All this suggests that a hashing function should be treated 
as an object with a simple interface, perhaps as simple as `Function1[Any, 
Int]`. Collision monitoring can then be performed with a decorator with an 
accumulator. Collision management would be performed by varying the seed or 
adding salt. Collision randomization would be performed by varying the 
seed/salt with each run and/or running multiple models in production which are 
identical expect for the different seed/salt used.

The hashing trick is very important in ML and quite... tricky... to get 
working well for complex, high-dimension spaces, which Spark is perfect for. An 
implementation that does not treat the hashing function as a first class object 
would substantially hinder MLlib's capabilities in practice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10724] [SQL] SQL's floor() returns DOUB...

2015-09-24 Thread ssimeonov
Github user ssimeonov commented on a diff in the pull request:

https://github.com/apache/spark/pull/8893#discussion_r40337580
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala
 ---
@@ -52,11 +52,12 @@ abstract class LeafMathExpression(c: Double, name: 
String)
  * @param f The math function.
  * @param name The short name of the function
  */
-abstract class UnaryMathExpression(f: Double => Double, name: String)
+abstract class UnaryMathExpression[T <: DataType](
--- End diff --

I am no Scala expert either but I think what you want is something along 
the lines of:

```scala
// Add to imports
import scala.reflect.runtime.universe.TypeTag

abstract class UnaryMathExpression[T <: NumericType](
  f: Double => Any, name: String)(implicit ttag: TypeTag[T], longttag: 
TypeTag[LongType])
  extends UnaryExpression with Serializable with ImplicitCastInputTypes {

  override def inputTypes: Seq[DataType] = Seq(DoubleType)
  override def dataType: NumericType = if (ttag == longttag) LongType else 
DoubleType
  override def nullable: Boolean = true
  override def toString: String = s"$name($child)"

  protected override def nullSafeEval(input: Any): Any = {
f(input.asInstanceOf[Double])
  }

  // name of function in java.lang.Math
  def funcName: String = name.toLowerCase

  override final def genCode(ctx: CodeGenContext, ev: 
GeneratedExpressionCode): String = {
defineCodeGen(ctx, ev, codeBuilder(ctx, ev))
  }

  protected def codeBuilder(ctx: CodeGenContext, ev: 
GeneratedExpressionCode): (String) => String =
c => s"(${dataType.typeName})java.lang.Math.$funcName($c)"
}
```

Notes:

- Eliminates the need for creating a subclass just for the purpose of 
`LongType` return specialization.

- Uses tighter constraint: `NumericType` instead of `DataType`, which makes 
sense intuitively for math functions.

- Sets up type to handle the universe of cases: `LongType` vs. `DoubleType` 
returns. Since we no longer subclass, that's an OK approach. It is better to 
use automatic companion object discovery but that would require modifying all 
data type case objects, which may not be a bad idea but belongs in a separate 
PR. See [this](http://stackoverflow.com/a/9173117/622495) for more information.

- Uses `dataType.typeName` instead of hard-coding Java casts.

This would require changing all references of `UnaryMathExpression` to 
`UnaryMathExpression[DoubleType]` and then using 
`UnaryMathExpression[LongType]` for `ceil` and `floor`.

The code compiles and should behave as intended.

Hope this helps.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10724] [SQL] SQL's floor() returns DOUB...

2015-09-23 Thread ssimeonov
Github user ssimeonov commented on a diff in the pull request:

https://github.com/apache/spark/pull/8893#discussion_r40279751
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala
 ---
@@ -72,6 +72,18 @@ abstract class UnaryMathExpression(f: Double => Double, 
name: String)
   }
 }
 
+// for floor and ceil which returns bigint instead of double
+abstract class UnaryMathExpressionWithBigIntRet(f: Double => Double, name: 
String)
--- End diff --

Did you consider parameterizing `UnaryMathExpression` by the return type as 
opposed to using inheritance?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-9210 corrects aggregate function name in...

2015-08-12 Thread ssimeonov
Github user ssimeonov commented on the pull request:

https://github.com/apache/spark/pull/7557#issuecomment-130440391
  
@yhual great


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-9210 corrects aggregate function name in...

2015-07-21 Thread ssimeonov
Github user ssimeonov commented on the pull request:

https://github.com/apache/spark/pull/7557#issuecomment-123526501
  
@marmbrus can you please provide a complete example that can execute in 
`spark-shell`?

You can find a standalone runnable example with complete shell output in 
[this gist](https://gist.github.com/ssimeonov/72c8a9b01f99e35ba470). Here is 
the summary of what happens:

```scala
// ERROR RetryingHMSHandler: 
MetaException(message:NoSuchObjectException(message:Function default.first does 
not exist))
// INFO FunctionRegistry: Unable to lookup UDF in metastore: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
MetaException(message:NoSuchObjectException(message:Function default.first does 
not exist))
// java.lang.RuntimeException: Couldn't find function first
ctx.sql(select first(num) from test_first group by category).show

// OK
ctx.sql(select first_value(num) from test_first group by category).show
```

Perhaps the difference is that I'm using `HiveContext`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-9210 corrects aggregate function name in...

2015-07-21 Thread ssimeonov
Github user ssimeonov commented on the pull request:

https://github.com/apache/spark/pull/7557#issuecomment-123543273
  
@marmbrus you can see the version and full INFO-level shell output in the 
gist. I'm running 1.4.1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-9210 corrects aggregate function name in...

2015-07-20 Thread ssimeonov
GitHub user ssimeonov opened a pull request:

https://github.com/apache/spark/pull/7557

SPARK-9210 corrects aggregate function name in exception message

[JIRA issue](https://issues.apache.org/jira/browse/SPARK-9210)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/swoop-inc/spark SPARK-9210

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/7557.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #7557


commit e0b8386f6da85e713a81c3978380c4c76e886402
Author: Simeon Simeonov s...@fastignite.com
Date:   2015-07-21T03:33:40Z

SPARK-9210 corrects aggregate function name in exception message




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org