Re: Performance regression for partitioned parquet data

2017-06-13 Thread Michael Allman
Hi Bertrand,

I encourage you to create a ticket for this and submit a PR if you have time. 
Please add me as a listener, and I'll try to contribute/review.

Michael

> On Jun 6, 2017, at 5:18 AM, Bertrand Bossy  
> wrote:
> 
> Hi,
> 
> since moving to spark 2.1 from 2.0, we experience a performance regression 
> when reading a large, partitioned parquet dataset: 
> 
> We observe many (hundreds) very short jobs executing before the job that 
> reads the data is starting. I looked into this issue and pinned it down to 
> PartitioningAwareFileIndex: While recursively listing the directories, if a 
> directory contains more than 
> "spark.sql.sources.parallelPartitionDiscovery.threshold" (default: 32) paths, 
> the children are listed using a spark job. Because the tree is listed 
> serially, this can result in a lot of small spark jobs executed one after the 
> other and the overhead dominates. Performance can be improved by tuning 
> "spark.sql.sources.parallelPartitionDiscovery.threshold". However, this is 
> not a satisfactory solution.
> 
> I think that the current behaviour could be improved by walking the directory 
> tree in breadth first search order and only launching one spark job to list 
> files in parallel if the number of paths to be listed at some level exceeds 
> spark.sql.sources.parallelPartitionDiscovery.threshold .
> 
> Does this approach make sense? I have found "Regression in file listing 
> performance" ( https://issues.apache.org/jira/browse/SPARK-18679 
>  ) as the most closely 
> related ticket.
> 
> Unless there is a reason for the current behaviour, I will create a ticket on 
> this soon. I might have some time in the coming days to work on this.
> 
> Regards,
> Bertrand
> 
> -- 
> Bertrand Bossy | TERALYTICS
> 
> software engineer
> 
> Teralytics AG | Zollstrasse 62 | 8005 Zurich | Switzerland 
> www.teralytics.net 
> Company registration number: CH-020.3.037.709-7 | Trade register Canton Zurich
> Board of directors: Georg Polzer, Luciano Franceschina, Mark Schmitz, Yann de 
> Vries
> This e-mail message contains confidential information which is for the sole 
> attention and use of the intended recipient. Please notify us at once if you 
> think that it may not be intended for you and delete it immediately.
> 
> 



Re: SQL TIMESTAMP semantics vs. SPARK-18350

2017-06-02 Thread Michael Allman
Hi Zoltan,

I don't fully understand your proposal for table-specific timestamp type 
semantics. I think it will be helpful to everyone in this conversation if you 
can identify the expected behavior for a few concrete scenarios.

Suppose we have a Hive metastore table hivelogs with a column named ts with the 
hive timestamp type as described here: 
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types#LanguageManualTypes-timestamp
 
<https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types#LanguageManualTypes-timestamp>.
 This table was created by Hive and is usually accessed through Hive or Presto.

Suppose again we have a Hive metastore table sparklogs with a column named ts 
with the Spark SQL timestamp type as described here: 
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.TimestampType$
 
<http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.TimestampType$>.
 This table was created by Spark SQL and is usually accessed through Spark SQL.

Let's say Spark SQL sets and reads a table property called timestamp_interp to 
determine timestamp type semantics for that table. Consider a dataframe df 
defined by sql("SELECT sts as ts FROM sparklogs UNION ALL SELECT hts as ts FROM 
hivelogs"). Suppose the timestamp_interp table property is absent from 
hivelogs. For each possible value of timestamp_interp set on the table 
sparklogs,

1. does df successfully pass analysis (i.e. is it a valid query)?
2. if it's a valid dataframe, what is the type of the ts column?
3. if it's a valid dataframe, what are the semantics of the type of the ts 
column?

Suppose further that Spark SQL sets the timestamp_interp on hivelogs. Can you 
answer the same three questions for each combination of timestamp_interp on 
hivelogs and sparklogs?

Thank you.

Michael


> On Jun 2, 2017, at 8:33 AM, Zoltan Ivanfi <z...@cloudera.com> wrote:
> 
> Hi,
> 
> We would like to solve the problem of interoperability of existing data, and 
> that is the main use case for having table-level control. Spark should be 
> able to read timestamps written by Impala or Hive and at the same time read 
> back its own data. These have different semantics, so having a single flag is 
> not enough.
> 
> Two separate types will solve this problem indeed, but only once every 
> component involved supports them. Unfortunately, adding these separate SQL 
> types is a larger effort that is only feasible in the long term and we would 
> like to provide a short-term solution for interoperability in the meantime.
> 
> Br,
> 
> Zoltan
> 
> On Fri, Jun 2, 2017 at 1:32 AM Reynold Xin <r...@databricks.com 
> <mailto:r...@databricks.com>> wrote:
> Yea I don't see why this needs to be per table config. If the user wants to 
> configure it per table, can't they just declare the data type on a per table 
> basis, once we have separate types for timestamp w/ tz and w/o tz? 
> 
> On Thu, Jun 1, 2017 at 4:14 PM, Michael Allman <mich...@videoamp.com 
> <mailto:mich...@videoamp.com>> wrote:
> I would suggest that making timestamp type behavior configurable and 
> persisted per-table could introduce some real confusion, e.g. in queries 
> involving tables with different timestamp type semantics.
> 
> I suggest starting with the assumption that timestamp type behavior is a 
> per-session flag that can be set in a global `spark-defaults.conf` and 
> consider more granular levels of configuration as people identify solid use 
> cases.
> 
> Cheers,
> 
> Michael
> 
> 
> 
>> On May 30, 2017, at 7:41 AM, Zoltan Ivanfi <z...@cloudera.com 
>> <mailto:z...@cloudera.com>> wrote:
>> 
>> Hi,
>> 
>> If I remember correctly, the TIMESTAMP type had UTC-normalized local time 
>> semantics even before Spark 2, so I can understand that Spark considers it 
>> to be the "established" behavior that must not be broken. Unfortunately, 
>> this behavior does not provide interoperability with other SQL engines of 
>> the Hadoop stack.
>> 
>> Let me summarize the findings of this e-mail thread so far:
>> Timezone-agnostic TIMESTAMP semantics would be beneficial for 
>> interoperability and SQL compliance.
>> Spark can not make a breaking change. For backward-compatibility with 
>> existing data, timestamp semantics should be user-configurable on a 
>> per-table level.
>> Before going into the specifics of a possible solution, do we all agree on 
>> these points?
>> 
>> Thanks,
>> 
>> Zoltan
>> 
>> On Sat, May 27, 2017 at 8:57 PM Imran Rashid <iras...@cloudera.com 
>> <mailto:iras...@cloudera.com>> wrote:
>> I had asked zoltan to bring this 

Re: SQL TIMESTAMP semantics vs. SPARK-18350

2017-06-01 Thread Michael Allman
I would suggest that making timestamp type behavior configurable and persisted 
per-table could introduce some real confusion, e.g. in queries involving tables 
with different timestamp type semantics.

I suggest starting with the assumption that timestamp type behavior is a 
per-session flag that can be set in a global `spark-defaults.conf` and consider 
more granular levels of configuration as people identify solid use cases.

Cheers,

Michael


> On May 30, 2017, at 7:41 AM, Zoltan Ivanfi  wrote:
> 
> Hi,
> 
> If I remember correctly, the TIMESTAMP type had UTC-normalized local time 
> semantics even before Spark 2, so I can understand that Spark considers it to 
> be the "established" behavior that must not be broken. Unfortunately, this 
> behavior does not provide interoperability with other SQL engines of the 
> Hadoop stack.
> 
> Let me summarize the findings of this e-mail thread so far:
> Timezone-agnostic TIMESTAMP semantics would be beneficial for 
> interoperability and SQL compliance.
> Spark can not make a breaking change. For backward-compatibility with 
> existing data, timestamp semantics should be user-configurable on a per-table 
> level.
> Before going into the specifics of a possible solution, do we all agree on 
> these points?
> 
> Thanks,
> 
> Zoltan
> 
> On Sat, May 27, 2017 at 8:57 PM Imran Rashid  > wrote:
> I had asked zoltan to bring this discussion to the dev list because I think 
> it's a question that extends beyond a single jira (we can't figure out the 
> semantics of timestamp in parquet if we don't k ow the overall goal of the 
> timestamp type) and since its a design question the entire community should 
> be involved.
> 
> I think that a lot of the confusion comes because we're talking about 
> different ways time zone affect behavior: (1) parsing and (2) behavior when 
> changing time zones for processing data.
> 
> It seems we agree that spark should eventually provide a timestamp type which 
> does conform to the standard.   The question is, how do we get there?  Has 
> spark already broken compliance so much that it's impossible to go back 
> without breaking user behavior?  Or perhaps spark already has inconsistent 
> behavior / broken compatibility within the 2.x line, so its not unthinkable 
> to have another breaking change?
> 
> (Another part of the confusion is on me -- I believed the behavior change was 
> in 2.2, but actually it looks like its in 2.0.1.  That changes how we think 
> about this in context of what goes into a 2.2 release.  SPARK-18350 isn't the 
> origin of the difference in behavior.)
> 
> First: consider processing data that is already stored in tables, and then 
> accessing it from machines in different time zones.  The standard is clear 
> that "timestamp" should be just like "timestamp without time zone": it does 
> not represent one instant in time, rather it's always displayed the same, 
> regardless of time zone.  This was the behavior in spark 2.0.0 (and 1.6),  
> for hive tables stored as text files, and for spark's json formats.
> 
> Spark 2.0.1  changed the behavior of the json format (I believe with 
> SPARK-16216), so that it behaves more like timestamp *with* time zone.  It 
> also makes csv behave the same (timestamp in csv was basically broken in 
> 2.0.0).  However it did *not* change the behavior of a hive textfile; it 
> still behaves like "timestamp with*out* time zone".  Here's some experiments 
> I tried -- there are a bunch of files there for completeness, but mostly 
> focus on the difference between query_output_2_0_0.txt vs. 
> query_output_2_0_1.txt
> 
> https://gist.github.com/squito/f348508ca7903ec2e1a64f4233e7aa70 
> 
> 
> Given that spark has changed this behavior post 2.0.0, is it still out of the 
> question to change this behavior to bring it back in line with the sql 
> standard for timestamp (without time zone) in the 2.x line?  Or, as reynold 
> proposes, is the only option at this point to add an off-by-default feature 
> flag to get "timestamp without time zone" semantics?
> 
> 
> Second, there is the question of parsing strings into timestamp type.   I'm 
> far less knowledgeable about this, so I mostly just have questions:
> 
> * does the standard dictate what the parsing behavior should be for timestamp 
> (without time zone) when a time zone is present?
> 
> * if it does and spark violates this standard is it worth trying to retain 
> the *other* semantics of timestamp without time zone, even if we violate the 
> parsing part?
> 
> I did look at what postgres does for comparison:
> 
> https://gist.github.com/squito/cb81a1bb07e8f67e9d27eaef44cc522c 
> 
> 
> spark's timestamp certainly does not match postgres's timestamp for parsing, 
> it seems closer to postgres's "timestamp with timezone" -- though 

Re: [VOTE] Apache Spark 2.2.0 (RC2)

2017-05-25 Thread Michael Allman
PR is here: https://github.com/apache/spark/pull/18112 
<https://github.com/apache/spark/pull/18112>


> On May 25, 2017, at 10:28 AM, Michael Allman <mich...@videoamp.com> wrote:
> 
> Michael,
> 
> If you haven't started cutting the new RC, I'm working on a documentation PR 
> right now I'm hoping we can get into Spark 2.2 as a migration note, even if 
> it's just a mention: https://issues.apache.org/jira/browse/SPARK-20888 
> <https://issues.apache.org/jira/browse/SPARK-20888>.
> 
> Michael
> 
> 
>> On May 22, 2017, at 11:39 AM, Michael Armbrust <mich...@databricks.com 
>> <mailto:mich...@databricks.com>> wrote:
>> 
>> I'm waiting for SPARK-20814 
>> <https://issues.apache.org/jira/browse/SPARK-20814> at Marcelo's request and 
>> I'd also like to include SPARK-20844 
>> <https://issues.apache.org/jira/browse/SPARK-20844>.  I think we should be 
>> able to cut another RC midweek.
>> 
>> On Fri, May 19, 2017 at 11:53 AM, Nick Pentreath <nick.pentre...@gmail.com 
>> <mailto:nick.pentre...@gmail.com>> wrote:
>> All the outstanding ML QA doc and user guide items are done for 2.2 so from 
>> that side we should be good to cut another RC :)
>> 
>> 
>> On Thu, 18 May 2017 at 00:18 Russell Spitzer <russell.spit...@gmail.com 
>> <mailto:russell.spit...@gmail.com>> wrote:
>> Seeing an issue with the DataScanExec and some of our integration tests for 
>> the SCC. Running dataframe read and writes from the shell seems fine but the 
>> Redaction code seems to get a "None" when doing 
>> SparkSession.getActiveSession.get in our integration tests. I'm not sure why 
>> but i'll dig into this later if I get a chance.
>> 
>> Example Failed Test
>> https://github.com/datastax/spark-cassandra-connector/blob/v2.0.1/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraSQLSpec.scala#L311
>>  
>> <https://github.com/datastax/spark-cassandra-connector/blob/v2.0.1/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraSQLSpec.scala#L311>
>> 
>> ```[info]   org.apache.spark.SparkException: Job aborted due to stage 
>> failure: Task serialization failed: java.util.NoSuchElementException: 
>> None.get
>> [info] java.util.NoSuchElementException: None.get
>> [info]   at scala.None$.get(Option.scala:347)
>> [info]   at scala.None$.get(Option.scala:345)
>> [info]   at org.apache.spark.sql.execution.DataSourceScanExec$class.org 
>> <http://class.org/>$apache$spark$sql$execution$DataSourceScanExec$$redact(DataSourceScanExec.scala:70)
>> [info]   at 
>> org.apache.spark.sql.execution.DataSourceScanExec$$anonfun$4.apply(DataSourceScanExec.scala:54)
>> [info]   at 
>> org.apache.spark.sql.execution.DataSourceScanExec$$anonfun$4.apply(DataSourceScanExec.scala:52)
>> ``` 
>> 
>> Again this only seems to repo in our IT suite so i'm not sure if this is a 
>> real issue. 
>> 
>> 
>> On Tue, May 16, 2017 at 1:40 PM Joseph Bradley <jos...@databricks.com 
>> <mailto:jos...@databricks.com>> wrote:
>> All of the ML/Graph/SparkR QA blocker JIRAs have been resolved.  Thanks 
>> everyone who helped out on those!
>> 
>> We still have open ML/Graph/SparkR JIRAs targeted at 2.2, but they are 
>> essentially all for documentation.
>> 
>> Joseph
>> 
>> On Thu, May 11, 2017 at 3:08 PM, Marcelo Vanzin <van...@cloudera.com 
>> <mailto:van...@cloudera.com>> wrote:
>> Since you'll be creating a new RC, I'd wait until SPARK-20666 is
>> fixed, since the change that caused it is in branch-2.2. Probably a
>> good idea to raise it to blocker at this point.
>> 
>> On Thu, May 11, 2017 at 2:59 PM, Michael Armbrust
>> <mich...@databricks.com <mailto:mich...@databricks.com>> wrote:
>> > I'm going to -1 given the outstanding issues and lack of +1s.  I'll create
>> > another RC once ML has had time to take care of the more critical problems.
>> > In the meantime please keep testing this release!
>> >
>> > On Tue, May 9, 2017 at 2:00 AM, Kazuaki Ishizaki <ishiz...@jp.ibm.com 
>> > <mailto:ishiz...@jp.ibm.com>>
>> > wrote:
>> >>
>> >> +1 (non-binding)
>> >>
>> >> I tested it on Ubuntu 16.04 and OpenJDK8 on ppc64le. All of the tests for
>> >> core have passed.
>> >>
>> >> $ java -version
>> >> openjdk version "1.8.0_111"
>> >> OpenJDK Runtime Environment (build
>> >> 1

Re: [VOTE] Apache Spark 2.2.0 (RC2)

2017-05-25 Thread Michael Allman
Michael,

If you haven't started cutting the new RC, I'm working on a documentation PR 
right now I'm hoping we can get into Spark 2.2 as a migration note, even if 
it's just a mention: https://issues.apache.org/jira/browse/SPARK-20888 
.

Michael


> On May 22, 2017, at 11:39 AM, Michael Armbrust  wrote:
> 
> I'm waiting for SPARK-20814 
>  at Marcelo's request and 
> I'd also like to include SPARK-20844 
> .  I think we should be 
> able to cut another RC midweek.
> 
> On Fri, May 19, 2017 at 11:53 AM, Nick Pentreath  > wrote:
> All the outstanding ML QA doc and user guide items are done for 2.2 so from 
> that side we should be good to cut another RC :)
> 
> 
> On Thu, 18 May 2017 at 00:18 Russell Spitzer  > wrote:
> Seeing an issue with the DataScanExec and some of our integration tests for 
> the SCC. Running dataframe read and writes from the shell seems fine but the 
> Redaction code seems to get a "None" when doing 
> SparkSession.getActiveSession.get in our integration tests. I'm not sure why 
> but i'll dig into this later if I get a chance.
> 
> Example Failed Test
> https://github.com/datastax/spark-cassandra-connector/blob/v2.0.1/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/sql/CassandraSQLSpec.scala#L311
>  
> 
> 
> ```[info]   org.apache.spark.SparkException: Job aborted due to stage 
> failure: Task serialization failed: java.util.NoSuchElementException: None.get
> [info] java.util.NoSuchElementException: None.get
> [info]at scala.None$.get(Option.scala:347)
> [info]at scala.None$.get(Option.scala:345)
> [info]at org.apache.spark.sql.execution.DataSourceScanExec$class.org 
> $apache$spark$sql$execution$DataSourceScanExec$$redact(DataSourceScanExec.scala:70)
> [info]at 
> org.apache.spark.sql.execution.DataSourceScanExec$$anonfun$4.apply(DataSourceScanExec.scala:54)
> [info]at 
> org.apache.spark.sql.execution.DataSourceScanExec$$anonfun$4.apply(DataSourceScanExec.scala:52)
> ``` 
> 
> Again this only seems to repo in our IT suite so i'm not sure if this is a 
> real issue. 
> 
> 
> On Tue, May 16, 2017 at 1:40 PM Joseph Bradley  > wrote:
> All of the ML/Graph/SparkR QA blocker JIRAs have been resolved.  Thanks 
> everyone who helped out on those!
> 
> We still have open ML/Graph/SparkR JIRAs targeted at 2.2, but they are 
> essentially all for documentation.
> 
> Joseph
> 
> On Thu, May 11, 2017 at 3:08 PM, Marcelo Vanzin  > wrote:
> Since you'll be creating a new RC, I'd wait until SPARK-20666 is
> fixed, since the change that caused it is in branch-2.2. Probably a
> good idea to raise it to blocker at this point.
> 
> On Thu, May 11, 2017 at 2:59 PM, Michael Armbrust
> > wrote:
> > I'm going to -1 given the outstanding issues and lack of +1s.  I'll create
> > another RC once ML has had time to take care of the more critical problems.
> > In the meantime please keep testing this release!
> >
> > On Tue, May 9, 2017 at 2:00 AM, Kazuaki Ishizaki  > >
> > wrote:
> >>
> >> +1 (non-binding)
> >>
> >> I tested it on Ubuntu 16.04 and OpenJDK8 on ppc64le. All of the tests for
> >> core have passed.
> >>
> >> $ java -version
> >> openjdk version "1.8.0_111"
> >> OpenJDK Runtime Environment (build
> >> 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14)
> >> OpenJDK 64-Bit Server VM (build 25.111-b14, mixed mode)
> >> $ build/mvn -DskipTests -Phive -Phive-thriftserver -Pyarn -Phadoop-2.7
> >> package install
> >> $ build/mvn -Phive -Phive-thriftserver -Pyarn -Phadoop-2.7 test -pl core
> >> ...
> >> Run completed in 15 minutes, 12 seconds.
> >> Total number of tests run: 1940
> >> Suites: completed 206, aborted 0
> >> Tests: succeeded 1940, failed 0, canceled 4, ignored 8, pending 0
> >> All tests passed.
> >> [INFO]
> >> 
> >> [INFO] BUILD SUCCESS
> >> [INFO]
> >> 
> >> [INFO] Total time: 16:51 min
> >> [INFO] Finished at: 2017-05-09T17:51:04+09:00
> >> [INFO] Final Memory: 53M/514M
> >> [INFO]
> >> 
> >> [WARNING] The requested profile "hive" could not be activated because it
> >> does not exist.
> >>
> >>
> >> Kazuaki Ishizaki,
> >>
> >>
> >>
> >> 

Re: Parquet vectorized reader DELTA_BYTE_ARRAY

2017-05-22 Thread Michael Allman
Hi AndreiL,

Were these files written with the Parquet V2 writer? The Spark 2.1 vectorized 
reader does not appear to support that format.

Michael


> On May 9, 2017, at 11:04 AM, andreiL  wrote:
> 
> Hi, I am getting an exception in Spark 2.1 reading parquet files where some
> columns are DELTA_BYTE_ARRAY encoded.
> 
> java.lang.UnsupportedOperationException: Unsupported encoding:
> DELTA_BYTE_ARRAY
> 
> Is this exception by design, or am I missing something?
> 
> If I turn off the vectorized reader, reading these files works fine.
> 
> AndreiL
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Parquet-vectorized-reader-DELTA-BYTE-ARRAY-tp21538.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Method for gracefully terminating a driver on a standalone master in Spark 2.1+

2017-05-22 Thread Michael Allman
As I cannot find a way to gracefully kill an app which takes longer than 10 
seconds to shut down, I have reported this issue as a bug:

https://issues.apache.org/jira/browse/SPARK-20843 
<https://issues.apache.org/jira/browse/SPARK-20843>

Michael

> On May 4, 2017, at 4:15 PM, Michael Allman <mich...@videoamp.com> wrote:
> 
> Hello,
> 
> In performing our prod cluster upgrade, we've noticed that the behavior for 
> killing a driver is more aggressive. Whereas pre-2.1 the driver runner would 
> only call `Process.destroy`, in 2.1+ it now calls `Process.destroyForcibly` 
> (on Java 8) if the previous `destroy` call does not return within 10 seconds. 
> (Commit in which this change was made: 
> https://github.com/apache/spark/commit/1c9a386c6b6812a3931f3fb0004249894a01f657
>  
> <https://github.com/apache/spark/commit/1c9a386c6b6812a3931f3fb0004249894a01f657>)
> 
> In light of this change, is there still a way to gracefully kill a spark 
> driver running on a standalone cluster that takes longer than 10 seconds to 
> die gracefully? For example, if we have a streaming job with 1 minute batches 
> we want a much longer timeout before force-killing a driver. We'd rather do a 
> `kill -9` ourselves—if necessary—so we can control the termination procedure.
> 
> Thank you.
> 
> Michael



Method for gracefully terminating a driver on a standalone master in Spark 2.1+

2017-05-04 Thread Michael Allman
Hello,

In performing our prod cluster upgrade, we've noticed that the behavior for 
killing a driver is more aggressive. Whereas pre-2.1 the driver runner would 
only call `Process.destroy`, in 2.1+ it now calls `Process.destroyForcibly` (on 
Java 8) if the previous `destroy` call does not return within 10 seconds. 
(Commit in which this change was made: 
https://github.com/apache/spark/commit/1c9a386c6b6812a3931f3fb0004249894a01f657 
)

In light of this change, is there still a way to gracefully kill a spark driver 
running on a standalone cluster that takes longer than 10 seconds to die 
gracefully? For example, if we have a streaming job with 1 minute batches we 
want a much longer timeout before force-killing a driver. We'd rather do a 
`kill -9` ourselves—if necessary—so we can control the termination procedure.

Thank you.

Michael

Re: [VOTE] Apache Spark 2.1.1 (RC3)

2017-04-24 Thread Michael Allman
The trouble we ran into is that this upgrade was blocking access to our tables, 
and we didn't know why. This sounds like a kind of migration operation, but it 
was not apparent that this was the case. It took an expert examining a stack 
trace and source code to figure this out. Would a more naive end user be able 
to debug this issue? Maybe we're an unusual case, but our particular experience 
was pretty bad. I have my doubts that the schema inference on our largest 
tables would ever complete without throwing some kind of timeout (which we were 
in fact receiving) or the end user just giving up and killing our job. We ended 
up doing a rollback while we investigated the source of the issue. In our case, 
INFER_NEVER is clearly the best configuration. We're going to add that to our 
default configuration files.

My expectation is that a minor point release is a pretty safe bug fix release. 
We were a bit hasty in not doing better due diligence pre-upgrade.

One suggestion the Spark team might consider is releasing 2.1.1 with 
INVER_NEVER and 2.2.0 with INFER_AND_SAVE. Clearly some kind of up-front 
migration notes would help in identifying this new behavior in 2.2.

Thanks,

Michael


> On Apr 24, 2017, at 2:09 AM, Wenchen Fan <wenc...@databricks.com> wrote:
> 
> see https://issues.apache.org/jira/browse/SPARK-19611 
> <https://issues.apache.org/jira/browse/SPARK-19611>
> 
> On Mon, Apr 24, 2017 at 2:22 PM, Holden Karau <hol...@pigscanfly.ca 
> <mailto:hol...@pigscanfly.ca>> wrote:
> Whats the regression this fixed in 2.1 from 2.0?
> 
> On Fri, Apr 21, 2017 at 7:45 PM, Wenchen Fan <wenc...@databricks.com 
> <mailto:wenc...@databricks.com>> wrote:
> IIRC, the new "spark.sql.hive.caseSensitiveInferenceMode" stuff will only 
> scan all table files only once, and write back the inferred schema to 
> metastore so that we don't need to do the schema inference again.
> 
> So technically this will introduce a performance regression for the first 
> query, but compared to branch-2.0, it's not performance regression. And this 
> patch fixed a regression in branch-2.1, which can run in branch-2.0. 
> Personally, I think we should keep INFER_AND_SAVE as the default mode.
> 
> + [Eric], what do you think?
> 
> On Sat, Apr 22, 2017 at 1:37 AM, Michael Armbrust <mich...@databricks.com 
> <mailto:mich...@databricks.com>> wrote:
> Thanks for pointing this out, Michael.  Based on the conversation on the PR 
> <https://github.com/apache/spark/pull/16944#issuecomment-285529275> this 
> seems like a risky change to include in a release branch with a default other 
> than NEVER_INFER.
> 
> +Wenchen?  What do you think?
> 
> On Thu, Apr 20, 2017 at 4:14 PM, Michael Allman <mich...@videoamp.com 
> <mailto:mich...@videoamp.com>> wrote:
> We've identified the cause of the change in behavior. It is related to the 
> SQL conf key "spark.sql.hive.caseSensitiveInferenceMode". This key and its 
> related functionality was absent from our previous build. The default setting 
> in the current build was causing Spark to attempt to scan all table files 
> during query analysis. Changing this setting to NEVER_INFER disabled this 
> operation and resolved the issue we had.
> 
> Michael
> 
> 
>> On Apr 20, 2017, at 3:42 PM, Michael Allman <mich...@videoamp.com 
>> <mailto:mich...@videoamp.com>> wrote:
>> 
>> I want to caution that in testing a build from this morning's branch-2.1 we 
>> found that Hive partition pruning was not working. We found that Spark SQL 
>> was fetching all Hive table partitions for a very simple query whereas in a 
>> build from several weeks ago it was fetching only the required partitions. I 
>> cannot currently think of a reason for the regression outside of some 
>> difference between branch-2.1 from our previous build and branch-2.1 from 
>> this morning.
>> 
>> That's all I know right now. We are actively investigating to find the root 
>> cause of this problem, and specifically whether this is a problem in the 
>> Spark codebase or not. I will report back when I have an answer to that 
>> question.
>> 
>> Michael
>> 
>> 
>>> On Apr 18, 2017, at 11:59 AM, Michael Armbrust <mich...@databricks.com 
>>> <mailto:mich...@databricks.com>> wrote:
>>> 
>>> Please vote on releasing the following candidate as Apache Spark version 
>>> 2.1.1. The vote is open until Fri, April 21st, 2018 at 13:00 PST and passes 
>>> if a majority of at least 3 +1 PMC votes are cast.
>>> 
>>> [ ] +1 Release this package as Apache Spark 2.1.1
>>> [ ] -1 Do not release this package because ...
>>> 

Re: [VOTE] Apache Spark 2.1.1 (RC3)

2017-04-20 Thread Michael Allman
We've identified the cause of the change in behavior. It is related to the SQL 
conf key "spark.sql.hive.caseSensitiveInferenceMode". This key and its related 
functionality was absent from our previous build. The default setting in the 
current build was causing Spark to attempt to scan all table files during query 
analysis. Changing this setting to NEVER_INFER disabled this operation and 
resolved the issue we had.

Michael


> On Apr 20, 2017, at 3:42 PM, Michael Allman <mich...@videoamp.com> wrote:
> 
> I want to caution that in testing a build from this morning's branch-2.1 we 
> found that Hive partition pruning was not working. We found that Spark SQL 
> was fetching all Hive table partitions for a very simple query whereas in a 
> build from several weeks ago it was fetching only the required partitions. I 
> cannot currently think of a reason for the regression outside of some 
> difference between branch-2.1 from our previous build and branch-2.1 from 
> this morning.
> 
> That's all I know right now. We are actively investigating to find the root 
> cause of this problem, and specifically whether this is a problem in the 
> Spark codebase or not. I will report back when I have an answer to that 
> question.
> 
> Michael
> 
> 
>> On Apr 18, 2017, at 11:59 AM, Michael Armbrust <mich...@databricks.com 
>> <mailto:mich...@databricks.com>> wrote:
>> 
>> Please vote on releasing the following candidate as Apache Spark version 
>> 2.1.1. The vote is open until Fri, April 21st, 2018 at 13:00 PST and passes 
>> if a majority of at least 3 +1 PMC votes are cast.
>> 
>> [ ] +1 Release this package as Apache Spark 2.1.1
>> [ ] -1 Do not release this package because ...
>> 
>> 
>> To learn more about Apache Spark, please see http://spark.apache.org/ 
>> <http://spark.apache.org/>
>> 
>> The tag to be voted on is v2.1.1-rc3 
>> <https://github.com/apache/spark/tree/v2.1.1-rc3> 
>> (2ed19cff2f6ab79a718526e5d16633412d8c4dd4)
>> 
>> List of JIRA tickets resolved can be found with this filter 
>> <https://issues.apache.org/jira/browse/SPARK-20134?jql=project%20%3D%20SPARK%20AND%20fixVersion%20%3D%202.1.1>.
>> 
>> The release files, including signatures, digests, etc. can be found at:
>> http://home.apache.org/~pwendell/spark-releases/spark-2.1.1-rc3-bin/ 
>> <http://home.apache.org/~pwendell/spark-releases/spark-2.1.1-rc3-bin/>
>> 
>> Release artifacts are signed with the following key:
>> https://people.apache.org/keys/committer/pwendell.asc 
>> <https://people.apache.org/keys/committer/pwendell.asc>
>> 
>> The staging repository for this release can be found at:
>> https://repository.apache.org/content/repositories/orgapachespark-1230/ 
>> <https://repository.apache.org/content/repositories/orgapachespark-1230/>
>> 
>> The documentation corresponding to this release can be found at:
>> http://people.apache.org/~pwendell/spark-releases/spark-2.1.1-rc3-docs/ 
>> <http://people.apache.org/~pwendell/spark-releases/spark-2.1.1-rc3-docs/>
>> 
>> 
>> FAQ
>> 
>> How can I help test this release?
>> 
>> If you are a Spark user, you can help us test this release by taking an 
>> existing Spark workload and running on this release candidate, then 
>> reporting any regressions.
>> 
>> What should happen to JIRA tickets still targeting 2.1.1?
>> 
>> Committers should look at those and triage. Extremely important bug fixes, 
>> documentation, and API tweaks that impact compatibility should be worked on 
>> immediately. Everything else please retarget to 2.1.2 or 2.2.0.
>> 
>> But my bug isn't fixed!??!
>> 
>> In order to make timely releases, we will typically not hold the release 
>> unless the bug in question is a regression from 2.1.0.
>> 
>> What happened to RC1?
>> 
>> There were issues with the release packaging and as a result was skipped.
> 



Re: [VOTE] Apache Spark 2.1.1 (RC3)

2017-04-20 Thread Michael Allman
I want to caution that in testing a build from this morning's branch-2.1 we 
found that Hive partition pruning was not working. We found that Spark SQL was 
fetching all Hive table partitions for a very simple query whereas in a build 
from several weeks ago it was fetching only the required partitions. I cannot 
currently think of a reason for the regression outside of some difference 
between branch-2.1 from our previous build and branch-2.1 from this morning.

That's all I know right now. We are actively investigating to find the root 
cause of this problem, and specifically whether this is a problem in the Spark 
codebase or not. I will report back when I have an answer to that question.

Michael


> On Apr 18, 2017, at 11:59 AM, Michael Armbrust  wrote:
> 
> Please vote on releasing the following candidate as Apache Spark version 
> 2.1.1. The vote is open until Fri, April 21st, 2018 at 13:00 PST and passes 
> if a majority of at least 3 +1 PMC votes are cast.
> 
> [ ] +1 Release this package as Apache Spark 2.1.1
> [ ] -1 Do not release this package because ...
> 
> 
> To learn more about Apache Spark, please see http://spark.apache.org/ 
> 
> 
> The tag to be voted on is v2.1.1-rc3 
>  
> (2ed19cff2f6ab79a718526e5d16633412d8c4dd4)
> 
> List of JIRA tickets resolved can be found with this filter 
> .
> 
> The release files, including signatures, digests, etc. can be found at:
> http://home.apache.org/~pwendell/spark-releases/spark-2.1.1-rc3-bin/ 
> 
> 
> Release artifacts are signed with the following key:
> https://people.apache.org/keys/committer/pwendell.asc 
> 
> 
> The staging repository for this release can be found at:
> https://repository.apache.org/content/repositories/orgapachespark-1230/ 
> 
> 
> The documentation corresponding to this release can be found at:
> http://people.apache.org/~pwendell/spark-releases/spark-2.1.1-rc3-docs/ 
> 
> 
> 
> FAQ
> 
> How can I help test this release?
> 
> If you are a Spark user, you can help us test this release by taking an 
> existing Spark workload and running on this release candidate, then reporting 
> any regressions.
> 
> What should happen to JIRA tickets still targeting 2.1.1?
> 
> Committers should look at those and triage. Extremely important bug fixes, 
> documentation, and API tweaks that impact compatibility should be worked on 
> immediately. Everything else please retarget to 2.1.2 or 2.2.0.
> 
> But my bug isn't fixed!??!
> 
> In order to make timely releases, we will typically not hold the release 
> unless the bug in question is a regression from 2.1.0.
> 
> What happened to RC1?
> 
> There were issues with the release packaging and as a result was skipped.



Re: Implementation of RNN/LSTM in Spark

2017-02-28 Thread Michael Allman
Hi Yuhao,

BigDL looks very promising and it's a framework we're considering using. It 
seems the general approach to high performance DL is via GPUs. Your project 
mentions performance on a Xeon comparable to that of a GPU, but where does this 
claim come from? Can you provide benchmarks?

Thanks,

Michael

> On Feb 27, 2017, at 11:11 PM, Yuhao Yang  wrote:
> 
> Welcome to try and contribute to our BigDL: 
> https://github.com/intel-analytics/BigDL 
>  
> 
> It's native on Spark and fast by leveraging Intel MKL. 
> 
> 2017-02-23 4:51 GMT-08:00 Joeri Hermans  >:
> Hi Nikita,
> 
> We are actively working on this: https://github.com/cerndb/dist-keras 
>  This will allow you to run Keras on 
> Spark (with distributed optimization algorithms) through pyspark. I recommend 
> you to check the examples 
> https://github.com/cerndb/dist-keras/tree/master/examples 
> . However, you 
> need to be aware that distributed optimization is a research topic, and has 
> several approaches and caveats you need to be aware of. I wrote a blog post 
> on this if you like to have some additional information on this topic 
> https://db-blog.web.cern.ch/blog/joeri-hermans/2017-01-distributed-deep-learning-apache-spark-and-keras
>  
> 
> 
> However, if you don't want to use a distributed optimization algorithm, we 
> also support a "sequential trainer" which allows you to train a model on 
> Spark dataframes.
> 
> Kind regards,
> 
> Joeri
> .
> From: Nick Pentreath [nick.pentre...@gmail.com 
> ]
> Sent: 23 February 2017 13:39
> To: dev@spark.apache.org 
> Subject: Re: Implementation of RNN/LSTM in Spark
> 
> The short answer is there is none and highly unlikely to be inside of Spark 
> MLlib any time in the near future.
> 
> The best bets are to look at other DL libraries - for JVM there is 
> Deeplearning4J and BigDL (there are others but these seem to be the most 
> comprehensive I have come across) - that run on Spark. Also there are various 
> flavours of TensorFlow / Caffe on Spark. And of course the libs such as 
> Torch, Keras, Tensorflow, MXNet, Caffe etc. Some of them have Java or Scala 
> APIs and some form of Spark integration out there in the community (in 
> varying states of development).
> 
> Integrations with Spark are a bit patchy currently but include the "XOnSpark" 
> flavours mentioned above and TensorFrames (again, there may be others).
> 
> On Thu, 23 Feb 2017 at 14:23 n1kt0    >> wrote:
> Hi,
> can anyone tell me what the current status about RNNs in Spark is?
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Implementation-of-RNN-LSTM-in-Spark-tp14866p21060.html
>  
> 
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org 
>   >
> 
> 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org 
> 
> 
> 



Simple bug fix PR looking for love

2017-02-09 Thread Michael Allman
Hi Guys,

Can someone help move https://github.com/apache/spark/pull/16499 
 along in the review process? This 
PR fixes replicated off-heap storage.

Thanks!

Michael

Re: Unique Partition Id per partition

2017-01-31 Thread Michael Allman
Hi Sumit,

Can you use 
http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=rdd#pyspark.RDD.mapPartitionsWithIndex
 

 to solve your problem?

Michael

> On Jan 31, 2017, at 9:08 AM, Chawla,Sumit  wrote:
> 
> Hi All
> 
> I have a rdd, which i partition based on some key, and then can sc.runJob for 
> each partition. 
>  Inside this function, i assign each partition a unique key using following:
> 
> "%s_%s" % (id(part), int(round(time.time()))
> This is to make sure that, each partition produces separate bookeeping stuff, 
> which can be aggregated by external system. However, I sometimes i notice 
> multiple 
> partition results pointing to same partition_id. Is this some issue due to 
> the 
> way above code is serialized by Pyspark. What's the best way to define a 
> unique id 
> for each partition. I undestand that its same executor getting multiple 
> partitions to process,
> but i would expect the above code to produce a unique id for each partition.
> 
> 
> Regards
> Sumit Chawla
> 



Re: Error Saving Dataframe to Hive with Spark 2.0.0

2017-01-31 Thread Michael Allman
That's understandable. Maybe I can help. :)

What happens if you set `HIVE_TABLE_NAME = "default.employees"`?

Also, does that table exist before you call 
`filtered_output_timestamp.write.mode("append").saveAsTable(HIVE_TABLE_NAME)`?

Cheers,

Michael

> On Jan 29, 2017, at 9:52 PM, Chetan Khatri  
> wrote:
> 
> Okey, you are saying that 2.0.0 don't have that patch fixed ? @dev cc-- 
> I don't like everytime changing the service versions !
> 
> Thanks.
> 
> On Mon, Jan 30, 2017 at 1:10 AM, Jacek Laskowski  > wrote:
> Hi, 
> 
> I think you have to upgrade to 2.1.0. There were few changes wrt the ERROR 
> since. 
> 
> Jacek 
> 
> 
> On 29 Jan 2017 9:24 a.m., "Chetan Khatri"  > wrote:
> Hello Spark Users,
> 
> I am getting error while saving Spark Dataframe to Hive Table:
> Hive 1.2.1
> Spark 2.0.0
> Local environment.
> Note: Job is getting executed successfully and the way I want but still 
> Exception raised.
> Source Code:
> 
> package com.chetan.poc.hbase
> 
> /**
>   * Created by chetan on 24/1/17.
>   */
> import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
> import org.apache.hadoop.hbase.mapreduce.TableInputFormat
> import org.apache.hadoop.hbase.util.Bytes
> import org.apache.hadoop.hbase.KeyValue.Type
> import org.apache.spark.sql.SparkSession
> import scala.collection.JavaConverters._
> import java.util.Date
> import java.text.SimpleDateFormat
> 
> 
> object IncrementalJob {
> val APP_NAME: String = "SparkHbaseJob"
> var HBASE_DB_HOST: String = null
> var HBASE_TABLE: String = null
> var HBASE_COLUMN_FAMILY: String = null
> var HIVE_DATA_WAREHOUSE: String = null
> var HIVE_TABLE_NAME: String = null
>   def main(args: Array[String]) {
> // Initializing HBASE Configuration variables
> HBASE_DB_HOST="127.0.0.1"
> HBASE_TABLE="university"
> HBASE_COLUMN_FAMILY="emp"
> // Initializing Hive Metastore configuration
> HIVE_DATA_WAREHOUSE = "/usr/local/hive/warehouse"
> // Initializing Hive table name - Target table
> HIVE_TABLE_NAME = "employees"
> // setting spark application
> // val sparkConf = new SparkConf().setAppName(APP_NAME).setMaster("local")
> //initialize the spark context
> //val sparkContext = new SparkContext(sparkConf)
> //val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
> // Enable Hive with Hive warehouse in SparkSession
> val spark = 
> SparkSession.builder().appName(APP_NAME).config("hive.metastore.warehouse.dir",
>  HIVE_DATA_WAREHOUSE).config("spark.sql.warehouse.dir", 
> HIVE_DATA_WAREHOUSE).enableHiveSupport().getOrCreate()
> import spark.implicits._
> import spark.sql
> 
> val conf = HBaseConfiguration.create()
> conf.set(TableInputFormat.INPUT_TABLE, HBASE_TABLE)
> conf.set(TableInputFormat.SCAN_COLUMNS, HBASE_COLUMN_FAMILY)
> // Load an RDD of rowkey, result(ImmutableBytesWritable, Result) tuples 
> from the table
> val hBaseRDD = spark.sparkContext.newAPIHadoopRDD(conf, 
> classOf[TableInputFormat],
>   classOf[org.apache.hadoop.hbase.io 
> .ImmutableBytesWritable],
>   classOf[org.apache.hadoop.hbase.client.Result])
> 
> println(hBaseRDD.count())
> //hBaseRDD.foreach(println)
> 
> //keyValue is a RDD[java.util.list[hbase.KeyValue]]
> val keyValue = hBaseRDD.map(x => x._2).map(_.list)
> 
> //outPut is a RDD[String], in which each line represents a record in HBase
> val outPut = keyValue.flatMap(x =>  x.asScala.map(cell =>
> 
>   HBaseResult(
> Bytes.toInt(CellUtil.cloneRow(cell)),
> Bytes.toStringBinary(CellUtil.cloneFamily(cell)),
> Bytes.toStringBinary(CellUtil.cloneQualifier(cell)),
> cell.getTimestamp,
> new SimpleDateFormat("-MM-dd HH:mm:ss:SSS").format(new 
> Date(cell.getTimestamp.toLong)),
> Bytes.toStringBinary(CellUtil.cloneValue(cell)),
> Type.codeToType(cell.getTypeByte).toString
> )
>   )
> ).toDF()
> // Output dataframe
> outPut.show
> 
> // get timestamp
> val datetimestamp_threshold = "2016-08-25 14:27:02:001"
> val datetimestampformat = new SimpleDateFormat("-MM-dd 
> HH:mm:ss:SSS").parse(datetimestamp_threshold).getTime()
> 
> // Resultset filteration based on timestamp
> val filtered_output_timestamp = outPut.filter($"colDatetime" >= 
> datetimestampformat)
> // Resultset filteration based on rowkey
> val filtered_output_row = 
> outPut.filter($"colDatetime".between(1668493360,1668493365))
> 
> 
> // Saving Dataframe to Hive Table Successfully.
> 
> filtered_output_timestamp.write.mode("append").saveAsTable(HIVE_TABLE_NAME)
>   }
>   case class HBaseResult(rowkey: Int, colFamily: String, colQualifier: 
> String, colDatetime: Long, colDatetimeStr: String, colValue: String, colType: 
> String)
> }
> 
> 

Re: Executors exceed maximum memory defined with `--executor-memory` in Spark 2.1.0

2017-01-23 Thread Michael Allman
Hi Stan,

What OS/version are you using?

Michael

> On Jan 22, 2017, at 11:36 PM, StanZhai  wrote:
> 
> I'm using Parallel GC.
> rxin wrote
>> Are you using G1 GC? G1 sometimes uses a lot more memory than the size
>> allocated.
>> 
>> 
>> On Sun, Jan 22, 2017 at 12:58 AM StanZhai 
> 
>> mail@
> 
>>  wrote:
>> 
>>> Hi all,
>>> 
>>> 
>>> 
>>> We just upgraded our Spark from 1.6.2 to 2.1.0.
>>> 
>>> 
>>> 
>>> Our Spark application is started by spark-submit with config of
>>> 
>>> `--executor-memory 35G` in standalone model, but the actual use of memory
>>> up
>>> 
>>> to 65G after a full gc(jmap -histo:live $pid) as follow:
>>> 
>>> 
>>> 
>>> test@c6 ~ $ ps aux | grep CoarseGrainedExecutorBackend
>>> 
>>> test  181941  181 34.7 94665384 68836752 ?   Sl   09:25 711:21
>>> 
>>> /home/test/service/jdk/bin/java -cp
>>> 
>>> 
>>> /home/test/service/hadoop/share/hadoop/common/hadoop-lzo-0.4.20-SNAPSHOT.jar:/home/test/service/hadoop/share/hadoop/common/hadoop-lzo-0.4.20-SNAPSHOT.jar:/home/test/service/spark/conf/:/home/test/service/spark/jars/*:/home/test/service/hadoop/etc/hadoop/
>>> 
>>> -Xmx35840M -Dspark.driver.port=47781 -XX:+PrintGCDetails
>>> 
>>> -XX:+PrintGCDateStamps -Xloggc:./gc.log -verbose:gc
>>> 
>>> org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url
>>> 
>>> spark://
> 
>> CoarseGrainedScheduler@.xxx
> 
>> :47781 --executor-id 1
>>> 
>>> --hostname test-192 --cores 36 --app-id app-20170122092509-0017
>>> --worker-url
>>> 
>>> spark://Worker@test-192:33890
>>> 
>>> 
>>> 
>>> Our Spark jobs are all sql.
>>> 
>>> 
>>> 
>>> The exceed memory looks like off-heap memory, but the default value of
>>> 
>>> `spark.memory.offHeap.enabled` is `false`.
>>> 
>>> 
>>> 
>>> We didn't find the problem in Spark 1.6.x, what causes this in Spark
>>> 2.1.0?
>>> 
>>> 
>>> 
>>> Any help is greatly appreicated!
>>> 
>>> 
>>> 
>>> Best,
>>> 
>>> Stan
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> --
>>> 
>>> View this message in context:
>>> http://apache-spark-developers-list.1001551.n3.nabble.com/Executors-exceed-maximum-memory-defined-with-executor-memory-in-Spark-2-1-0-tp20697.html
>>> 
>>> Sent from the Apache Spark Developers List mailing list archive at
>>> Nabble.com .
>>> 
>>> 
>>> 
>>> -
>>> 
>>> To unsubscribe e-mail: 
> 
>> dev-unsubscribe@.apache
> 
>>> 
>>> 
>>> 
>>> 
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Executors-exceed-maximum-memory-defined-with-executor-memory-in-Spark-2-1-0-tp20697p20707.html
>  
> 
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com 
> .
> 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org 
> 


Re: GraphX-related "open" issues

2017-01-20 Thread Michael Allman
Yes, SPARK-10335 is a bug that will be fixed when SPARK-5484 is fixed.

> On Jan 19, 2017, at 10:36 PM, Takeshi Yamamuro <linguin@gmail.com> wrote:
> 
> IMO SPARK-10335 should be tagged with "Bug"? If so, I think we should not 
> close it and fix in future.
> 
> // maropu
> 
> On Fri, Jan 20, 2017 at 1:27 PM, Michael Allman <mich...@videoamp.com 
> <mailto:mich...@videoamp.com>> wrote:
> That sounds fine to me. I think that in closing the issues, we should mention 
> that we're closing them because these algorithms can be implemented using the 
> existing API.
> 
> Michael
> 
> 
> 
>> On Jan 19, 2017, at 5:34 PM, Dongjin Lee <dong...@apache.org 
>> <mailto:dong...@apache.org>> wrote:
>> 
>> Thanks for your comments. Then, How about change following issues (see 
>> below) into 'won't fix'? After Implementing & uploading them as Spark 
>> Packages, commenting on those issues would be a reasonable solution. It 
>> would also be better for the potential users of those graph algorithms.
>> 
>> - SPARK-15880: PREGEL Based Semi-Clustering Algorithm Implementation using 
>> Spark GraphX API <https://issues.apache.org/jira/browse/SPARK-15880>
>> - SPARK-7244: Find vertex sequences satisfying predicates 
>> <https://issues.apache.org/jira/browse/SPARK-7244>
>> - SPARK-7257: Find nearest neighbor satisfying predicate 
>> <https://issues.apache.org/jira/browse/SPARK-7257>
>> - SPARK-8497: Graph Clique(Complete Connected Sub-graph) Discovery Algorithm 
>> <https://issues.apache.org/jira/browse/SPARK-8497>
>> 
>> Best,
>> Dongjin
>> 
>> On Fri, Jan 20, 2017 at 2:48 AM, Michael Allman <mich...@videoamp.com 
>> <mailto:mich...@videoamp.com>> wrote:
>> Regarding new GraphX algorithms, I am in agreement with the idea of 
>> publishing algorithms which are implemented using the existing API as 
>> outside packages.
>> 
>> Regarding SPARK-10335, we have a PR for SPARK-5484 which should address the 
>> problem described in that ticket. I've reviewed that PR, but because it 
>> touches the ML codebase I'd like to get an ML committer to review that PR. 
>> It's a relatively simple change and fixes an significant barrier to scaling 
>> in GraphX.
>> 
>> https://github.com/apache/spark/pull/15125 
>> <https://github.com/apache/spark/pull/15125>
>> 
>> Cheers,
>> 
>> Michael
>> 
>> 
>>> On Jan 19, 2017, at 8:09 AM, Takeshi Yamamuro <linguin@gmail.com 
>>> <mailto:linguin@gmail.com>> wrote:
>>> 
>>> Thanks for your comment, Dongjin!
>>> I have a pretty basic and also important question; why do you implement 
>>> these features as  a third-party library (and then upload them to the spark 
>>> packages https://spark-packages.org/ <https://spark-packages.org/>)? ISTM 
>>> graphx has already necessary and sufficient APIs for these third-party ones.
>>> 
>>> On Thu, Jan 19, 2017 at 12:21 PM, Dongjin Lee <dong...@apache.org 
>>> <mailto:dong...@apache.org>> wrote:
>>> Hi all,
>>> 
>>> I am currently working on SPARK-15880[^1] and also have some interest on 
>>> SPARK-7244[^2] and SPARK-7257[^3]. In fact, SPARK-7244 and SPARK-7257 have 
>>> some importance on graph analysis field.
>>> Could you make them an exception? Since I am working on graph analysis, I 
>>> hope to take them.
>>> 
>>> If needed, I can take SPARK-10335 and SPARK-8497 after them.
>>> 
>>> Thanks,
>>> Dongjin
>>> 
>>> On Wed, Jan 18, 2017 at 2:40 AM, Sean Owen <so...@cloudera.com 
>>> <mailto:so...@cloudera.com>> wrote:
>>> WontFix or Later is fine. There's not really any practical distinction. I 
>>> figure that if something times out and is closed, it's very unlikely to be 
>>> looked at again. Therefore marking it as something to do 'later' seemed 
>>> less accurate.
>>> 
>>> On Tue, Jan 17, 2017 at 5:30 PM Takeshi Yamamuro <linguin@gmail.com 
>>> <mailto:linguin@gmail.com>> wrote:
>>> Thank for your comment!
>>> I'm just thinking I'll set "Won't Fix" though, "Later" is also okay.
>>> But, I re-checked "Contributing to JIRA Maintenance" in the contribution 
>>> guide (http://spark.apache.org/contributing.html 
>>> <http://spark.apache.org/contributing.html>) and
>>> I couldn't find any setting policy about "Later".
>>> So

Re: GraphX-related "open" issues

2017-01-19 Thread Michael Allman
That sounds fine to me. I think that in closing the issues, we should mention 
that we're closing them because these algorithms can be implemented using the 
existing API.

Michael


> On Jan 19, 2017, at 5:34 PM, Dongjin Lee <dong...@apache.org> wrote:
> 
> Thanks for your comments. Then, How about change following issues (see below) 
> into 'won't fix'? After Implementing & uploading them as Spark Packages, 
> commenting on those issues would be a reasonable solution. It would also be 
> better for the potential users of those graph algorithms.
> 
> - SPARK-15880: PREGEL Based Semi-Clustering Algorithm Implementation using 
> Spark GraphX API <https://issues.apache.org/jira/browse/SPARK-15880>
> - SPARK-7244: Find vertex sequences satisfying predicates 
> <https://issues.apache.org/jira/browse/SPARK-7244>
> - SPARK-7257: Find nearest neighbor satisfying predicate 
> <https://issues.apache.org/jira/browse/SPARK-7257>
> - SPARK-8497: Graph Clique(Complete Connected Sub-graph) Discovery Algorithm 
> <https://issues.apache.org/jira/browse/SPARK-8497>
> 
> Best,
> Dongjin
> 
> On Fri, Jan 20, 2017 at 2:48 AM, Michael Allman <mich...@videoamp.com 
> <mailto:mich...@videoamp.com>> wrote:
> Regarding new GraphX algorithms, I am in agreement with the idea of 
> publishing algorithms which are implemented using the existing API as outside 
> packages.
> 
> Regarding SPARK-10335, we have a PR for SPARK-5484 which should address the 
> problem described in that ticket. I've reviewed that PR, but because it 
> touches the ML codebase I'd like to get an ML committer to review that PR. 
> It's a relatively simple change and fixes an significant barrier to scaling 
> in GraphX.
> 
> https://github.com/apache/spark/pull/15125 
> <https://github.com/apache/spark/pull/15125>
> 
> Cheers,
> 
> Michael
> 
> 
>> On Jan 19, 2017, at 8:09 AM, Takeshi Yamamuro <linguin@gmail.com 
>> <mailto:linguin@gmail.com>> wrote:
>> 
>> Thanks for your comment, Dongjin!
>> I have a pretty basic and also important question; why do you implement 
>> these features as  a third-party library (and then upload them to the spark 
>> packages https://spark-packages.org/ <https://spark-packages.org/>)? ISTM 
>> graphx has already necessary and sufficient APIs for these third-party ones.
>> 
>> On Thu, Jan 19, 2017 at 12:21 PM, Dongjin Lee <dong...@apache.org 
>> <mailto:dong...@apache.org>> wrote:
>> Hi all,
>> 
>> I am currently working on SPARK-15880[^1] and also have some interest on 
>> SPARK-7244[^2] and SPARK-7257[^3]. In fact, SPARK-7244 and SPARK-7257 have 
>> some importance on graph analysis field.
>> Could you make them an exception? Since I am working on graph analysis, I 
>> hope to take them.
>> 
>> If needed, I can take SPARK-10335 and SPARK-8497 after them.
>> 
>> Thanks,
>> Dongjin
>> 
>> On Wed, Jan 18, 2017 at 2:40 AM, Sean Owen <so...@cloudera.com 
>> <mailto:so...@cloudera.com>> wrote:
>> WontFix or Later is fine. There's not really any practical distinction. I 
>> figure that if something times out and is closed, it's very unlikely to be 
>> looked at again. Therefore marking it as something to do 'later' seemed less 
>> accurate.
>> 
>> On Tue, Jan 17, 2017 at 5:30 PM Takeshi Yamamuro <linguin@gmail.com 
>> <mailto:linguin@gmail.com>> wrote:
>> Thank for your comment!
>> I'm just thinking I'll set "Won't Fix" though, "Later" is also okay.
>> But, I re-checked "Contributing to JIRA Maintenance" in the contribution 
>> guide (http://spark.apache.org/contributing.html 
>> <http://spark.apache.org/contributing.html>) and
>> I couldn't find any setting policy about "Later".
>> So, IMO it's okay to set "Won't Fix" for now and those who'd like to make 
>> prs feel free to (re?-)open tickets.
>> 
>> 
>> On Wed, Jan 18, 2017 at 1:48 AM, Dongjoon Hyun <dongj...@apache.org 
>> <mailto:dongj...@apache.org>> wrote:
>> Hi, Takeshi.
>> 
>> > So, IMO it seems okay to close tickets about "Improvement" and "New 
>> > Feature" for now.
>> 
>> I'm just wondering about what kind of field value you want to fill in the 
>> `Resolution` field for those issues.
>> 
>> Maybe, 'Later'? Or, 'Won't Fix'?
>> 
>> Bests,
>> Dongjoon.
>> 
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org 
>&

Re: GraphX-related "open" issues

2017-01-19 Thread Michael Allman
Regarding new GraphX algorithms, I am in agreement with the idea of publishing 
algorithms which are implemented using the existing API as outside packages.

Regarding SPARK-10335, we have a PR for SPARK-5484 which should address the 
problem described in that ticket. I've reviewed that PR, but because it touches 
the ML codebase I'd like to get an ML committer to review that PR. It's a 
relatively simple change and fixes an significant barrier to scaling in GraphX.

https://github.com/apache/spark/pull/15125

Cheers,

Michael


> On Jan 19, 2017, at 8:09 AM, Takeshi Yamamuro  wrote:
> 
> Thanks for your comment, Dongjin!
> I have a pretty basic and also important question; why do you implement these 
> features as  a third-party library (and then upload them to the spark 
> packages https://spark-packages.org/ )? ISTM 
> graphx has already necessary and sufficient APIs for these third-party ones.
> 
> On Thu, Jan 19, 2017 at 12:21 PM, Dongjin Lee  > wrote:
> Hi all,
> 
> I am currently working on SPARK-15880[^1] and also have some interest on 
> SPARK-7244[^2] and SPARK-7257[^3]. In fact, SPARK-7244 and SPARK-7257 have 
> some importance on graph analysis field.
> Could you make them an exception? Since I am working on graph analysis, I 
> hope to take them.
> 
> If needed, I can take SPARK-10335 and SPARK-8497 after them.
> 
> Thanks,
> Dongjin
> 
> On Wed, Jan 18, 2017 at 2:40 AM, Sean Owen  > wrote:
> WontFix or Later is fine. There's not really any practical distinction. I 
> figure that if something times out and is closed, it's very unlikely to be 
> looked at again. Therefore marking it as something to do 'later' seemed less 
> accurate.
> 
> On Tue, Jan 17, 2017 at 5:30 PM Takeshi Yamamuro  > wrote:
> Thank for your comment!
> I'm just thinking I'll set "Won't Fix" though, "Later" is also okay.
> But, I re-checked "Contributing to JIRA Maintenance" in the contribution 
> guide (http://spark.apache.org/contributing.html 
> ) and
> I couldn't find any setting policy about "Later".
> So, IMO it's okay to set "Won't Fix" for now and those who'd like to make prs 
> feel free to (re?-)open tickets.
> 
> 
> On Wed, Jan 18, 2017 at 1:48 AM, Dongjoon Hyun  > wrote:
> Hi, Takeshi.
> 
> > So, IMO it seems okay to close tickets about "Improvement" and "New 
> > Feature" for now.
> 
> I'm just wondering about what kind of field value you want to fill in the 
> `Resolution` field for those issues.
> 
> Maybe, 'Later'? Or, 'Won't Fix'?
> 
> Bests,
> Dongjoon.
> 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org 
> 
> 
> 
> 
> 
> -- 
> ---
> Takeshi Yamamuro
> 
> 
> 
> -- 
> Dongjin Lee
> 
> Software developer in Line+.
> So interested in massive-scale machine learning.
> 
> facebook: www.facebook.com/dongjin.lee.kr 
> 
> linkedin: kr.linkedin.com/in/dongjinleekr 
> 
> github:  github.com/dongjinleekr 
> 
> twitter: www.twitter.com/dongjinleekr 
> 
> 
> -- 
> ---
> Takeshi Yamamuro



Re: [Spark SQL] Making InferSchema and JacksonParser public

2017-01-18 Thread Michael Allman
Personally I'd love to see some kind of pluggability, configurability in the 
JSON schema parsing, maybe as an option in the DataFrameReader. Perhaps you can 
propose an API?

> On Jan 18, 2017, at 5:51 AM, Brian Hong  wrote:
> 
> I work for a mobile game company. I'm solving a simple question: "Can we 
> efficiently/cheaply query for the log of a particular user within given date 
> period?"
> 
> I've created a special JSON text-based file format that has these traits:
>  - Snappy compressed, saved in AWS S3
>  - Partitioned by date. ie. 2017-01-01.sz , 
> 2017-01-02.sz , ...
>  - Sorted by a primary key (log_type) and a secondary key (user_id), Snappy 
> block compressed by 5MB blocks
>  - Blocks are indexed with primary/secondary key in file 2017-01-01.json
>  - Efficient block based random access on primary key (log_type) and 
> secondary key (user_id) using the index
> 
> I've created a Spark SQL DataFrame relation that can query this file format.  
> Since the schema of each log type is fairly consistent, I've reused the 
> `InferSchema.inferSchema` method and `JacksonParser`in the Spark SQL code to 
> support structured querying.  I've also implemented filter push-down to 
> optimize the file access.
> 
> It is very fast when querying for a single user or querying for a single log 
> type with a sampling ratio of 1 to 1 compared to parquet file format.  
> (We do use parquet for some log types when we need batch analysis.)
> 
> One of the problems we face is that the methods we use above are private API. 
>  So we are forced to use hacks to use these methods.  (Things like copying 
> the code or using the org.apache.spark.sql package namespace)
> 
> I've been following Spark SQL code since 1.4, and the JSON schema inferencing 
> code and JacksonParser seem to be relatively stable recently.  Can the 
> core-devs make these APIs public?
> 
> We are willing to open source this file format because it is very excellent 
> for archiving user related logs in S3.  The key dependency of private APIs in 
> Spark SQL is the main hurdle in making this a reality.
> 
> Thank you for reading!
> 



Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-18 Thread Michael Allman
Based on what you've described, I think you should be able to use Spark's 
parquet reader plus partition pruning in 2.1.

> On Jan 17, 2017, at 10:44 PM, Raju Bairishetti <r...@apache.org> wrote:
> 
> Thanks for the detailed explanation. Is it completely fixed in spark-2.1.0?
> 
>   We are giving very high memory to spark-driver to avoid the OOM(heap space/ 
> GC overhead limit) errors in spark-app. But when we run two-three jobs 
> together, these are bringing down the Hive metastore. We had to forcefully 
> drop older partitions to avoid frequent downs of Hive Metastore.
> 
> 
> On Wed, Jan 18, 2017 at 2:09 PM, Michael Allman <mich...@videoamp.com 
> <mailto:mich...@videoamp.com>> wrote:
> I think I understand. Partition pruning for the case where 
> spark.sql.hive.convertMetastoreParquet is true was not added to Spark until 
> 2.1.0. I think that in previous versions it only worked when 
> spark.sql.hive.convertMetastoreParquet is false. Unfortunately, that 
> configuration gives you data decoding errors. If it's possible for you to 
> write all of your data with Hive, then you should be able to read it without 
> decoding errors and with partition pruning turned on. Another possibility is 
> running your Spark app with a very large maximum heap configuration, like 8g 
> or even 16g. However, loading all of that partition metadata can be quite 
> slow for very large tables. I'm sorry I can't think of a better solution for 
> you.
> 
> Michael
> 
> 
> 
> 
>> On Jan 17, 2017, at 8:59 PM, Raju Bairishetti <r...@apache.org 
>> <mailto:r...@apache.org>> wrote:
>> 
>> Tested on both 1.5.2 and 1.61.
>> 
>> On Wed, Jan 18, 2017 at 12:52 PM, Michael Allman <mich...@videoamp.com 
>> <mailto:mich...@videoamp.com>> wrote:
>> What version of Spark are you running?
>> 
>>> On Jan 17, 2017, at 8:42 PM, Raju Bairishetti <r...@apache.org 
>>> <mailto:r...@apache.org>> wrote:
>>> 
>>>  describe dummy;
>>> 
>>> OK
>>> 
>>> sample  string 
>>> 
>>> yearstring 
>>> 
>>> month   string  
>>> 
>>> # Partition Information  
>>> 
>>> # col_namedata_type   comment
>>> 
>>> yearstring 
>>> 
>>> 
>>> month   string 
>>> 
>>> 
>>> 
>>> val df = sqlContext.sql("select count(1) from rajub.dummy where 
>>> year='2017'")
>>> 
>>> df: org.apache.spark.sql.DataFrame = [_c0: bigint]
>>> 
>>> scala> df.explain
>>> 
>>> == Physical Plan ==
>>> 
>>> TungstenAggregate(key=[], 
>>> functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#3070L])
>>> 
>>> +- TungstenExchange SinglePartition, None
>>> 
>>>+- TungstenAggregate(key=[], 
>>> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#3076L])
>>> 
>>>   +- Scan ParquetRelation: rajub.dummy[] InputPaths: 
>>> maprfs:/user/rajub/dummy/sample/year=2016/month=10, 
>>> maprfs:/user/rajub/dummy/sample/year=2016/month=11, 
>>> maprfs:/user/rajub/dummy/sample/year=2016/month=9, 
>>> maprfs:/user/rajub/dummy/sample/year=2017/month=10, 
>>> maprfs:/user/rajub/dummy/sample/year=2017/month=11, 
>>> maprfs:/user/rajub/dummy/sample/year=2017/month=9
>>> 
>>> 
>>> On Wed, Jan 18, 2017 at 12:25 PM, Michael Allman <mich...@videoamp.com 
>>> <mailto:mich...@videoamp.com>> wrote:
>>> Can you paste the actual query plan here, please?
>>> 
>>>> On Jan 17, 2017, at 7:38 PM, Raju Bairishetti <r...@apache.org 
>>>> <mailto:r...@apache.org>> wrote:
>>>> 
>>>> 
>>>> On Wed, Jan 18, 2017 at 11:13 AM, Michael Allman <mich...@videoamp.com 
>>>> <mailto:mich...@videoamp.com>> wrote:
>>>> What is the physical query plan after you set 
>>>> spark.sql.hive.convertMetastoreParquet to true?
>>>> Physical plan continas all the partition locations 
>>>> 
>>>> Michael
>>>> 
>>>>> On Jan 17, 2017, at 6:51 PM, Raju Bairishetti <r...@apache.org 
>>>>> <mailto:r...@apache.org>> wrote:
>>>>> 
>>>>> Thanks Michael for the respopnse.
>&

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Michael Allman
I think I understand. Partition pruning for the case where 
spark.sql.hive.convertMetastoreParquet is true was not added to Spark until 
2.1.0. I think that in previous versions it only worked when 
spark.sql.hive.convertMetastoreParquet is false. Unfortunately, that 
configuration gives you data decoding errors. If it's possible for you to write 
all of your data with Hive, then you should be able to read it without decoding 
errors and with partition pruning turned on. Another possibility is running 
your Spark app with a very large maximum heap configuration, like 8g or even 
16g. However, loading all of that partition metadata can be quite slow for very 
large tables. I'm sorry I can't think of a better solution for you.

Michael



> On Jan 17, 2017, at 8:59 PM, Raju Bairishetti <r...@apache.org> wrote:
> 
> Tested on both 1.5.2 and 1.61.
> 
> On Wed, Jan 18, 2017 at 12:52 PM, Michael Allman <mich...@videoamp.com 
> <mailto:mich...@videoamp.com>> wrote:
> What version of Spark are you running?
> 
>> On Jan 17, 2017, at 8:42 PM, Raju Bairishetti <r...@apache.org 
>> <mailto:r...@apache.org>> wrote:
>> 
>>  describe dummy;
>> 
>> OK
>> 
>> sample  string  
>> 
>> yearstring  
>> 
>> month   string   
>> 
>> # Partition Information   
>> 
>> # col_namedata_type   comment
>> 
>> yearstring  
>> 
>> 
>> month   string 
>> 
>> 
>> 
>> val df = sqlContext.sql("select count(1) from rajub.dummy where year='2017'")
>> 
>> df: org.apache.spark.sql.DataFrame = [_c0: bigint]
>> 
>> scala> df.explain
>> 
>> == Physical Plan ==
>> 
>> TungstenAggregate(key=[], 
>> functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#3070L])
>> 
>> +- TungstenExchange SinglePartition, None
>> 
>>+- TungstenAggregate(key=[], 
>> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#3076L])
>> 
>>   +- Scan ParquetRelation: rajub.dummy[] InputPaths: 
>> maprfs:/user/rajub/dummy/sample/year=2016/month=10, 
>> maprfs:/user/rajub/dummy/sample/year=2016/month=11, 
>> maprfs:/user/rajub/dummy/sample/year=2016/month=9, 
>> maprfs:/user/rajub/dummy/sample/year=2017/month=10, 
>> maprfs:/user/rajub/dummy/sample/year=2017/month=11, 
>> maprfs:/user/rajub/dummy/sample/year=2017/month=9
>> 
>> 
>> On Wed, Jan 18, 2017 at 12:25 PM, Michael Allman <mich...@videoamp.com 
>> <mailto:mich...@videoamp.com>> wrote:
>> Can you paste the actual query plan here, please?
>> 
>>> On Jan 17, 2017, at 7:38 PM, Raju Bairishetti <r...@apache.org 
>>> <mailto:r...@apache.org>> wrote:
>>> 
>>> 
>>> On Wed, Jan 18, 2017 at 11:13 AM, Michael Allman <mich...@videoamp.com 
>>> <mailto:mich...@videoamp.com>> wrote:
>>> What is the physical query plan after you set 
>>> spark.sql.hive.convertMetastoreParquet to true?
>>> Physical plan continas all the partition locations 
>>> 
>>> Michael
>>> 
>>>> On Jan 17, 2017, at 6:51 PM, Raju Bairishetti <r...@apache.org 
>>>> <mailto:r...@apache.org>> wrote:
>>>> 
>>>> Thanks Michael for the respopnse.
>>>> 
>>>> 
>>>> On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman <mich...@videoamp.com 
>>>> <mailto:mich...@videoamp.com>> wrote:
>>>> Hi Raju,
>>>> 
>>>> I'm sorry this isn't working for you. I helped author this functionality 
>>>> and will try my best to help.
>>>> 
>>>> First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to 
>>>> false? 
>>>> I had set as suggested in SPARK-6910 and corresponsing pull reqs. It did 
>>>> not work for me without  setting spark.sql.hive.convertMetastoreParquet 
>>>> property. 
>>>> 
>>>> Can you link specifically to the jira issue or spark pr you referred to? 
>>>> The first thing I would try is setting 
>>>> spark.sql.hive.convertMetastoreParquet to true. Setting that to false 
>>>> might also explain why you're getting parquet decode errors. If you're 
>>>> writing your table data with Spark's parquet file writer and reading with 
>>>> Hive's parquet file reader, there ma

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Michael Allman
What version of Spark are you running?

> On Jan 17, 2017, at 8:42 PM, Raju Bairishetti <r...@apache.org> wrote:
> 
>  describe dummy;
> 
> OK
> 
> sample  string   
> 
> yearstring   
> 
> month   string
> 
> # Partition Information
> 
> # col_namedata_type   comment
> 
> yearstring   
> 
> 
> month   string 
> 
> 
> 
> val df = sqlContext.sql("select count(1) from rajub.dummy where year='2017'")
> 
> df: org.apache.spark.sql.DataFrame = [_c0: bigint]
> 
> scala> df.explain
> 
> == Physical Plan ==
> 
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], 
> output=[_c0#3070L])
> 
> +- TungstenExchange SinglePartition, None
> 
>+- TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#3076L])
> 
>   +- Scan ParquetRelation: rajub.dummy[] InputPaths: 
> maprfs:/user/rajub/dummy/sample/year=2016/month=10, 
> maprfs:/user/rajub/dummy/sample/year=2016/month=11, 
> maprfs:/user/rajub/dummy/sample/year=2016/month=9, 
> maprfs:/user/rajub/dummy/sample/year=2017/month=10, 
> maprfs:/user/rajub/dummy/sample/year=2017/month=11, 
> maprfs:/user/rajub/dummy/sample/year=2017/month=9
> 
> 
> On Wed, Jan 18, 2017 at 12:25 PM, Michael Allman <mich...@videoamp.com 
> <mailto:mich...@videoamp.com>> wrote:
> Can you paste the actual query plan here, please?
> 
>> On Jan 17, 2017, at 7:38 PM, Raju Bairishetti <r...@apache.org 
>> <mailto:r...@apache.org>> wrote:
>> 
>> 
>> On Wed, Jan 18, 2017 at 11:13 AM, Michael Allman <mich...@videoamp.com 
>> <mailto:mich...@videoamp.com>> wrote:
>> What is the physical query plan after you set 
>> spark.sql.hive.convertMetastoreParquet to true?
>> Physical plan continas all the partition locations 
>> 
>> Michael
>> 
>>> On Jan 17, 2017, at 6:51 PM, Raju Bairishetti <r...@apache.org 
>>> <mailto:r...@apache.org>> wrote:
>>> 
>>> Thanks Michael for the respopnse.
>>> 
>>> 
>>> On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman <mich...@videoamp.com 
>>> <mailto:mich...@videoamp.com>> wrote:
>>> Hi Raju,
>>> 
>>> I'm sorry this isn't working for you. I helped author this functionality 
>>> and will try my best to help.
>>> 
>>> First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to 
>>> false? 
>>> I had set as suggested in SPARK-6910 and corresponsing pull reqs. It did 
>>> not work for me without  setting spark.sql.hive.convertMetastoreParquet 
>>> property. 
>>> 
>>> Can you link specifically to the jira issue or spark pr you referred to? 
>>> The first thing I would try is setting 
>>> spark.sql.hive.convertMetastoreParquet to true. Setting that to false might 
>>> also explain why you're getting parquet decode errors. If you're writing 
>>> your table data with Spark's parquet file writer and reading with Hive's 
>>> parquet file reader, there may be an incompatibility accounting for the 
>>> decode errors you're seeing. 
>>> 
>>>  https://issues.apache.org/jira/browse/SPARK-6910 
>>> <https://issues.apache.org/jira/browse/SPARK-6910> . My main motivation is 
>>> to avoid fetching all the partitions. We reverted 
>>> spark.sql.hive.convertMetastoreParquet  setting to true to decoding errors. 
>>> After reverting this it is fetching all partiitons from the table.
>>> 
>>> Can you reply with your table's Hive metastore schema, including partition 
>>> schema?
>>>  col1 string
>>>  col2 string
>>>  year int
>>>  month int
>>>  day int
>>>  hour int   
>>> # Partition Information  
>>> 
>>> # col_namedata_type   comment
>>> 
>>> year  int
>>> 
>>> month int
>>> 
>>> day int
>>> 
>>> hour int
>>> 
>>> venture string
>>> 
>>>  
>>> Where are the table's files located?
>>> In hadoop. Under some user directory. 
>>> If you do a "show partitions ." in the spark-sql shell, 
>>> does it show the partitions you expect to see? If not, run "ms

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Michael Allman
Can you paste the actual query plan here, please?

> On Jan 17, 2017, at 7:38 PM, Raju Bairishetti <r...@apache.org> wrote:
> 
> 
> On Wed, Jan 18, 2017 at 11:13 AM, Michael Allman <mich...@videoamp.com 
> <mailto:mich...@videoamp.com>> wrote:
> What is the physical query plan after you set 
> spark.sql.hive.convertMetastoreParquet to true?
> Physical plan continas all the partition locations 
> 
> Michael
> 
>> On Jan 17, 2017, at 6:51 PM, Raju Bairishetti <r...@apache.org 
>> <mailto:r...@apache.org>> wrote:
>> 
>> Thanks Michael for the respopnse.
>> 
>> 
>> On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman <mich...@videoamp.com 
>> <mailto:mich...@videoamp.com>> wrote:
>> Hi Raju,
>> 
>> I'm sorry this isn't working for you. I helped author this functionality and 
>> will try my best to help.
>> 
>> First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to 
>> false? 
>> I had set as suggested in SPARK-6910 and corresponsing pull reqs. It did not 
>> work for me without  setting spark.sql.hive.convertMetastoreParquet 
>> property. 
>> 
>> Can you link specifically to the jira issue or spark pr you referred to? The 
>> first thing I would try is setting spark.sql.hive.convertMetastoreParquet to 
>> true. Setting that to false might also explain why you're getting parquet 
>> decode errors. If you're writing your table data with Spark's parquet file 
>> writer and reading with Hive's parquet file reader, there may be an 
>> incompatibility accounting for the decode errors you're seeing. 
>> 
>>  https://issues.apache.org/jira/browse/SPARK-6910 
>> <https://issues.apache.org/jira/browse/SPARK-6910> . My main motivation is 
>> to avoid fetching all the partitions. We reverted 
>> spark.sql.hive.convertMetastoreParquet  setting to true to decoding errors. 
>> After reverting this it is fetching all partiitons from the table.
>> 
>> Can you reply with your table's Hive metastore schema, including partition 
>> schema?
>>  col1 string
>>  col2 string
>>  year int
>>  month int
>>  day int
>>  hour int   
>> # Partition Information   
>> 
>> # col_namedata_type   comment
>> 
>> year  int
>> 
>> month int
>> 
>> day int
>> 
>> hour int
>> 
>> venture string
>> 
>>  
>> Where are the table's files located?
>> In hadoop. Under some user directory. 
>> If you do a "show partitions ." in the spark-sql shell, 
>> does it show the partitions you expect to see? If not, run "msck repair 
>> table .".
>> Yes. It is listing the partitions
>> Cheers,
>> 
>> Michael
>> 
>> 
>>> On Jan 17, 2017, at 12:02 AM, Raju Bairishetti <r...@apache.org 
>>> <mailto:r...@apache.org>> wrote:
>>> 
>>> Had a high level look into the code. Seems getHiveQlPartitions  method from 
>>> HiveMetastoreCatalog is getting called irrespective of 
>>> metastorePartitionPruning conf value.
>>> 
>>>  It should not fetch all partitions if we set metastorePartitionPruning to 
>>> true (Default value for this is false) 
>>> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] 
>>> = {
>>>   val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
>>> table.getPartitions(predicates)
>>>   } else {
>>> allPartitions
>>>   }
>>> ...
>>> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
>>>   client.getPartitionsByFilter(this, predicates)
>>> lazy val allPartitions = table.getAllPartitions
>>> But somehow getAllPartitions is getting called eventough after setting 
>>> metastorePartitionPruning to true.
>>> Am I missing something or looking at wrong place?
>>> 
>>> On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti <r...@apache.org 
>>> <mailto:r...@apache.org>> wrote:
>>> Hello,
>>>   
>>>Spark sql is generating query plan with all partitions information even 
>>> though if we apply filters on partitions in the query.  Due to this, 
>>> sparkdriver/hive metastore is hitting with OOM as each table is with lots 
>>> of partitions.
>>> 
>>> We can confirm from hive audit logs that it tries to fetch all partitions 
>>> from hive metastore.
>>> 
>>>  2016-12-28 07:18:33,749 INFO  [pool-4-thre

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Michael Allman
What is the physical query plan after you set 
spark.sql.hive.convertMetastoreParquet to true?

Michael

> On Jan 17, 2017, at 6:51 PM, Raju Bairishetti <r...@apache.org> wrote:
> 
> Thanks Michael for the respopnse.
> 
> 
> On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman <mich...@videoamp.com 
> <mailto:mich...@videoamp.com>> wrote:
> Hi Raju,
> 
> I'm sorry this isn't working for you. I helped author this functionality and 
> will try my best to help.
> 
> First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to 
> false? 
> I had set as suggested in SPARK-6910 and corresponsing pull reqs. It did not 
> work for me without  setting spark.sql.hive.convertMetastoreParquet property. 
> 
> Can you link specifically to the jira issue or spark pr you referred to? The 
> first thing I would try is setting spark.sql.hive.convertMetastoreParquet to 
> true. Setting that to false might also explain why you're getting parquet 
> decode errors. If you're writing your table data with Spark's parquet file 
> writer and reading with Hive's parquet file reader, there may be an 
> incompatibility accounting for the decode errors you're seeing. 
> 
>  https://issues.apache.org/jira/browse/SPARK-6910 
> <https://issues.apache.org/jira/browse/SPARK-6910> . My main motivation is to 
> avoid fetching all the partitions. We reverted 
> spark.sql.hive.convertMetastoreParquet  setting to true to decoding errors. 
> After reverting this it is fetching all partiitons from the table.
> 
> Can you reply with your table's Hive metastore schema, including partition 
> schema?
>  col1 string
>  col2 string
>  year int
>  month int
>  day int
>  hour int   
> # Partition Information
> 
> # col_namedata_type   comment
> 
> year  int
> 
> month int
> 
> day int
> 
> hour int
> 
> venture string
> 
>  
> Where are the table's files located?
> In hadoop. Under some user directory. 
> If you do a "show partitions ." in the spark-sql shell, 
> does it show the partitions you expect to see? If not, run "msck repair table 
> .".
> Yes. It is listing the partitions
> Cheers,
> 
> Michael
> 
> 
>> On Jan 17, 2017, at 12:02 AM, Raju Bairishetti <r...@apache.org 
>> <mailto:r...@apache.org>> wrote:
>> 
>> Had a high level look into the code. Seems getHiveQlPartitions  method from 
>> HiveMetastoreCatalog is getting called irrespective of 
>> metastorePartitionPruning conf value.
>> 
>>  It should not fetch all partitions if we set metastorePartitionPruning to 
>> true (Default value for this is false) 
>> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = 
>> {
>>   val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
>> table.getPartitions(predicates)
>>   } else {
>> allPartitions
>>   }
>> ...
>> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
>>   client.getPartitionsByFilter(this, predicates)
>> lazy val allPartitions = table.getAllPartitions
>> But somehow getAllPartitions is getting called eventough after setting 
>> metastorePartitionPruning to true.
>> Am I missing something or looking at wrong place?
>> 
>> On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti <r...@apache.org 
>> <mailto:r...@apache.org>> wrote:
>> Hello,
>>   
>>Spark sql is generating query plan with all partitions information even 
>> though if we apply filters on partitions in the query.  Due to this, 
>> sparkdriver/hive metastore is hitting with OOM as each table is with lots of 
>> partitions.
>> 
>> We can confirm from hive audit logs that it tries to fetch all partitions 
>> from hive metastore.
>> 
>>  2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit 
>> (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajubip=/x.x.x.x   
>> cmd=get_partitions : db= tbl=x
>> 
>> 
>> Configured the following parameters in the spark conf to fix the above 
>> issue(source: from spark-jira & github pullreq):
>> spark.sql.hive.convertMetastoreParquet   false
>> spark.sql.hive.metastorePartitionPruning   true
>> 
>>plan:  rdf.explain
>>== Physical Plan ==
>>HiveTableScan [rejection_reason#626], MetastoreRelation dbname, 
>> tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 
>> 28),(hour#317 = 2),(venture#318 = DEFAULT)]
>> 
>> get_partitions_by_filter method is called and fetching only re

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

2017-01-17 Thread Michael Allman
Hi Raju,

I'm sorry this isn't working for you. I helped author this functionality and 
will try my best to help.

First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to false? 
Can you link specifically to the jira issue or spark pr you referred to? The 
first thing I would try is setting spark.sql.hive.convertMetastoreParquet to 
true. Setting that to false might also explain why you're getting parquet 
decode errors. If you're writing your table data with Spark's parquet file 
writer and reading with Hive's parquet file reader, there may be an 
incompatibility accounting for the decode errors you're seeing. 

Can you reply with your table's Hive metastore schema, including partition 
schema? Where are the table's files located? If you do a "show partitions 
." in the spark-sql shell, does it show the partitions you 
expect to see? If not, run "msck repair table .".

Cheers,

Michael


> On Jan 17, 2017, at 12:02 AM, Raju Bairishetti  wrote:
> 
> Had a high level look into the code. Seems getHiveQlPartitions  method from 
> HiveMetastoreCatalog is getting called irrespective of 
> metastorePartitionPruning conf value.
> 
>  It should not fetch all partitions if we set metastorePartitionPruning to 
> true (Default value for this is false) 
> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
>   val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
> table.getPartitions(predicates)
>   } else {
> allPartitions
>   }
> ...
> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
>   client.getPartitionsByFilter(this, predicates)
> lazy val allPartitions = table.getAllPartitions
> But somehow getAllPartitions is getting called eventough after setting 
> metastorePartitionPruning to true.
> Am I missing something or looking at wrong place?
> 
> On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti  > wrote:
> Hello,
>   
>Spark sql is generating query plan with all partitions information even 
> though if we apply filters on partitions in the query.  Due to this, 
> sparkdriver/hive metastore is hitting with OOM as each table is with lots of 
> partitions.
> 
> We can confirm from hive audit logs that it tries to fetch all partitions 
> from hive metastore.
> 
>  2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit 
> (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajubip=/x.x.x.x   
> cmd=get_partitions : db= tbl=x
> 
> 
> Configured the following parameters in the spark conf to fix the above 
> issue(source: from spark-jira & github pullreq):
> spark.sql.hive.convertMetastoreParquet   false
> spark.sql.hive.metastorePartitionPruning   true
> 
>plan:  rdf.explain
>== Physical Plan ==
>HiveTableScan [rejection_reason#626], MetastoreRelation dbname, 
> tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 
> 28),(hour#317 = 2),(venture#318 = DEFAULT)]
> 
> get_partitions_by_filter method is called and fetching only required 
> partitions.
> 
> But we are seeing parquetDecode errors in our applications frequently 
> after this. Looks like these decoding errors were because of changing serde 
> fromspark-builtin to hive serde.
> 
> I feel like, fixing query plan generation in the spark-sql is the right 
> approach instead of forcing users to use hive serde.
> 
> Is there any workaround/way to fix this issue? I would like to hear more 
> thoughts on this :)
> 
> 
> On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti  > wrote:
> Had a high level look into the code. Seems getHiveQlPartitions  method from 
> HiveMetastoreCatalog is getting called irrespective of 
> metastorePartitionPruning conf value.
> 
>  It should not fetch all partitions if we set metastorePartitionPruning to 
> true (Default value for this is false) 
> def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
>   val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
> table.getPartitions(predicates)
>   } else {
> allPartitions
>   }
> ...
> def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
>   client.getPartitionsByFilter(this, predicates)
> lazy val allPartitions = table.getAllPartitions
> But somehow getAllPartitions is getting called eventough after setting 
> metastorePartitionPruning to true.
> Am I missing something or looking at wrong place?
> 
> On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti  > wrote:
> Waiting for suggestions/help on this... 
> 
> On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti  > wrote:
> Hello,
>   
>Spark sql is generating query plan with all partitions information even 
> though if we apply filters on partitions in the query.  Due to this, spark 
> driver/hive metastore is hitting with OOM as 

Re: [VOTE] Apache Spark 2.1.0 (RC2)

2016-12-08 Thread Michael Allman
I believe https://github.com/apache/spark/pull/16122 
 needs to be included in Spark 2.1. 
It's a simple bug fix to some functionality that is introduced in 2.1. 
Unfortunately, it's been manually verified only. There's no unit test that 
covers it, and building one is far from trivial.

Michael



> On Dec 8, 2016, at 12:39 AM, Reynold Xin  wrote:
> 
> Please vote on releasing the following candidate as Apache Spark version 
> 2.1.0. The vote is open until Sun, December 11, 2016 at 1:00 PT and passes if 
> a majority of at least 3 +1 PMC votes are cast.
> 
> [ ] +1 Release this package as Apache Spark 2.1.0
> [ ] -1 Do not release this package because ...
> 
> 
> To learn more about Apache Spark, please see http://spark.apache.org/ 
> 
> 
> The tag to be voted on is v2.1.0-rc2 
> (080717497365b83bc202ab16812ced93eb1ea7bd)
> 
> List of JIRA tickets resolved are:  
> https://issues.apache.org/jira/issues/?jql=project%20%3D%20SPARK%20AND%20fixVersion%20%3D%202.1.0
>  
> 
> 
> The release files, including signatures, digests, etc. can be found at:
> http://people.apache.org/~pwendell/spark-releases/spark-2.1.0-rc2-bin/ 
> 
> 
> Release artifacts are signed with the following key:
> https://people.apache.org/keys/committer/pwendell.asc 
> 
> 
> The staging repository for this release can be found at:
> https://repository.apache.org/content/repositories/orgapachespark-1217 
> 
> 
> The documentation corresponding to this release can be found at:
> http://people.apache.org/~pwendell/spark-releases/spark-2.1.0-rc2-docs/ 
> 
> 
> 
> (Note that the docs and staging repo are still being uploaded and will be 
> available soon)
> 
> 
> ===
> How can I help test this release?
> ===
> If you are a Spark user, you can help us test this release by taking an 
> existing Spark workload and running on this release candidate, then reporting 
> any regressions.
> 
> ===
> What should happen to JIRA tickets still targeting 2.1.0?
> ===
> Committers should look at those and triage. Extremely important bug fixes, 
> documentation, and API tweaks that impact compatibility should be worked on 
> immediately. Everything else please retarget to 2.1.1 or 2.2.0.



Re: Can't read tables written in Spark 2.1 in Spark 2.0 (and earlier)

2016-11-29 Thread Michael Allman
This is not an issue with all tables created in Spark 2.1, though I'm not sure 
why some work and some do not. I have found that a table created as such

sql("create table test stored as parquet as select 1")

in Spark 2.1 cannot be read in previous versions of Spark.

Michael


> On Nov 29, 2016, at 5:15 PM, Michael Allman <mich...@videoamp.com> wrote:
> 
> Hello,
> 
> When I try to read from a Hive table created by Spark 2.1 in Spark 2.0 or 
> earlier, I get an error:
> 
> java.lang.ClassNotFoundException: Failed to load class for data source: hive.
> 
> Is there a way to get previous versions of Spark to read tables written with 
> Spark 2.1?
> 
> Cheers,
> 
> Michael


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Can't read tables written in Spark 2.1 in Spark 2.0 (and earlier)

2016-11-29 Thread Michael Allman
Hello,

When I try to read from a Hive table created by Spark 2.1 in Spark 2.0 or 
earlier, I get an error:

java.lang.ClassNotFoundException: Failed to load class for data source: hive.

Is there a way to get previous versions of Spark to read tables written with 
Spark 2.1?

Cheers,

Michael
-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Jackson Spark/app incompatibility and how to resolve it

2016-11-17 Thread Michael Allman
Hello,

I'm running into an issue with a Spark app I'm building, which depends on a 
library which depends on Jackson 2.8, which fails at runtime because Spark 
brings in Jackson 2.6. I'm looking for a solution. As a workaround, I've 
patched our build of Spark to use Jackson 2.8. That's working, however given 
all the trouble associated with attempting a Jackson upgrade in the past (see 
https://issues.apache.org/jira/browse/SPARK-14989 

 and https://github.com/apache/spark/pull/13417 
), I'm wondering if I should submit 
a PR for that. Is shading Spark's Jackson deps another option? Any other 
suggestions for an acceptable way to fix this incompatibility with apps using a 
newer version of Jackson?

FWIW, Jackson claims to support backward compatibility within minor releases 
(https://github.com/FasterXML/jackson-docs#on-jackson-versioning 
). So in 
theory, apps that depend on an upgraded Spark version should work even if they 
ask for an older version.

Cheers,

Michael

Re: Updating Parquet dep to 1.9

2016-11-02 Thread Michael Allman
Sounds great. Regarding the min/max stats issue, is that an issue with the way 
the files are written or read? What's the Parquet project issue for that bug? 
What's the 1.9.1 release timeline look like?

I will aim to have a PR in by the end of the week. I feel strongly that either 
this or https://github.com/apache/spark/pull/15538 
<https://github.com/apache/spark/pull/15538> needs to make it into 2.1. The 
logging output issue is really bad. I would probably call it a blocker.

Michael


> On Nov 1, 2016, at 1:22 PM, Ryan Blue <rb...@netflix.com> wrote:
> 
> I can when I'm finished with a couple other issues if no one gets to it first.
> 
> Michael, if you're interested in updating to 1.9.0 I'm happy to help review 
> that PR.
> 
> On Tue, Nov 1, 2016 at 1:03 PM, Reynold Xin <r...@databricks.com 
> <mailto:r...@databricks.com>> wrote:
> Ryan want to submit a pull request?
> 
> 
> On Tue, Nov 1, 2016 at 9:05 AM, Ryan Blue <rb...@netflix.com.invalid 
> <mailto:rb...@netflix.com.invalid>> wrote:
> 1.9.0 includes some fixes intended specifically for Spark:
> 
> * PARQUET-389: Evaluates push-down predicates for missing columns as though 
> they are null. This is to address Spark's work-around that requires reading 
> and merging file schemas, even for metastore tables.
> * PARQUET-654: Adds an option to disable record-level predicate push-down, 
> but keep row group evaluation. This allows Spark to skip row groups based on 
> stats and dictionaries, but implement its own vectorized record filtering.
> 
> The Parquet community also evaluated performance to ensure no performance 
> regressions from moving to the ByteBuffer read path.
> 
> There is one concern about 1.9.0 that will be addressed in 1.9.1, which is 
> that stats calculations were incorrectly using unsigned byte order for string 
> comparison. This means that min/max stats can't be used if the data contains 
> (or may contain) UTF8 characters with the msb set. 1.9.0 won't return the bad 
> min/max values for correctness, but there is a property to override this 
> behavior for data that doesn't use the affected code points.
> 
> Upgrading to 1.9.0 depends on how the community wants to handle the sort 
> order bug: whether correctness or performance should be the default.
> 
> rb
> 
> On Tue, Nov 1, 2016 at 2:22 AM, Sean Owen <so...@cloudera.com 
> <mailto:so...@cloudera.com>> wrote:
> Yes this came up from a different direction: 
> https://issues.apache.org/jira/browse/SPARK-18140 
> <https://issues.apache.org/jira/browse/SPARK-18140>
> 
> I think it's fine to pursue an upgrade to fix these several issues. The 
> question is just how well it will play with other components, so bears some 
> testing and evaluation of the changes from 1.8, but yes this would be good.
> 
> On Mon, Oct 31, 2016 at 9:07 PM Michael Allman <mich...@videoamp.com 
> <mailto:mich...@videoamp.com>> wrote:
> Hi All,
> 
> Is anyone working on updating Spark's Parquet library dep to 1.9? If not, I 
> can at least get started on it and publish a PR.
> 
> Cheers,
> 
> Michael
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org 
> <mailto:dev-unsubscr...@spark.apache.org>
> 
> 
> 
> 
> -- 
> Ryan Blue
> Software Engineer
> Netflix
> 
> 
> 
> 
> -- 
> Ryan Blue
> Software Engineer
> Netflix



Updating Parquet dep to 1.9

2016-10-31 Thread Michael Allman
Hi All,

Is anyone working on updating Spark's Parquet library dep to 1.9? If not, I can 
at least get started on it and publish a PR.

Cheers,

Michael
-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: help from other committers on getting started

2016-09-02 Thread Michael Allman
Hi Dayne,

Have a look at 
https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark 
. I 
think you'll find answers to most of your questions there.

Cheers,

Michael


> On Sep 2, 2016, at 8:53 AM, Dayne Sorvisto  
> wrote:
> 
> Hi,
> 
> I'd like to request help from committers/contributors to work on some trivial 
> bug fixes or documentation for the Spark project. I'm very interested in the 
> machine learning side of things as I have a math background. I recently 
> passed the databricks cert and feel I have a decent understanding of the key 
> concepts I need to get started as a beginner contributor. My github is 
> DayneSorvisto (Dayne )  and I've signed up 
> for a Jira account.
>  
>  
>  
>  
>  
>  
>  
>  
> DayneSorvisto (Dayne )
>  DayneSorvisto has 11 repositories 
> available. Follow their code on GitHub.
> View on github.com   
> Preview by Yahoo
>  
> 
> 
> Thank you,
> Dayne Sorvisto



Re: Anyone else having trouble with replicated off heap RDD persistence?

2016-08-24 Thread Michael Allman
FYI, I've updated the issue's description to include a very simple program 
which reproduces the issue for me.

Cheers,

Michael

> On Aug 23, 2016, at 4:54 PM, Michael Allman <mich...@videoamp.com> wrote:
> 
> I've replied on the issue's page, but in a word, "yes". See 
> https://issues.apache.org/jira/browse/SPARK-17204 
> <https://issues.apache.org/jira/browse/SPARK-17204>.
> 
> Michael
> 
> 
>> On Aug 23, 2016, at 11:55 AM, Reynold Xin <r...@databricks.com 
>> <mailto:r...@databricks.com>> wrote:
>> 
>> Does this problem still exist on today's master/branch-2.0? 
>> 
>> SPARK-16550 was merged. It might be fixed already.
>> 
>> On Tue, Aug 23, 2016 at 9:37 AM, Michael Allman <mich...@videoamp.com 
>> <mailto:mich...@videoamp.com>> wrote:
>> FYI, I posted this to user@ and have followed up with a bug report: 
>> https://issues.apache.org/jira/browse/SPARK-17204 
>> <https://issues.apache.org/jira/browse/SPARK-17204>
>> 
>> Michael
>> 
>>> Begin forwarded message:
>>> 
>>> From: Michael Allman <mich...@videoamp.com <mailto:mich...@videoamp.com>>
>>> Subject: Anyone else having trouble with replicated off heap RDD 
>>> persistence?
>>> Date: August 16, 2016 at 3:45:14 PM PDT
>>> To: user <u...@spark.apache.org <mailto:u...@spark.apache.org>>
>>> 
>>> Hello,
>>> 
>>> A coworker was having a problem with a big Spark job failing after several 
>>> hours when one of the executors would segfault. That problem aside, I 
>>> speculated that her job would be more robust against these kinds of 
>>> executor crashes if she used replicated RDD storage. She's using off heap 
>>> storage (for good reason), so I asked her to try running her job with the 
>>> following storage level: `StorageLevel(useDisk = true, useMemory = true, 
>>> useOffHeap = true, deserialized = false, replication = 2)`. The job would 
>>> immediately fail with a rather suspicious looking exception. For example:
>>> 
>>> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
>>> 9086
>>> at 
>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
>>> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
>>> at 
>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
>>> at 
>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
>>> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>>> at 
>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>> at 
>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>>> at 
>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown
>>>  Source)
>>> at 
>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>>>  Source)
>>> at 
>>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>>>  Source)
>>> at 
>>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>>> at 
>>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>>> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>>> at 
>>> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>>> at 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>>> at 
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:85)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at 

Re: Anyone else having trouble with replicated off heap RDD persistence?

2016-08-23 Thread Michael Allman
I've replied on the issue's page, but in a word, "yes". See 
https://issues.apache.org/jira/browse/SPARK-17204 
<https://issues.apache.org/jira/browse/SPARK-17204>.

Michael


> On Aug 23, 2016, at 11:55 AM, Reynold Xin <r...@databricks.com> wrote:
> 
> Does this problem still exist on today's master/branch-2.0? 
> 
> SPARK-16550 was merged. It might be fixed already.
> 
> On Tue, Aug 23, 2016 at 9:37 AM, Michael Allman <mich...@videoamp.com 
> <mailto:mich...@videoamp.com>> wrote:
> FYI, I posted this to user@ and have followed up with a bug report: 
> https://issues.apache.org/jira/browse/SPARK-17204 
> <https://issues.apache.org/jira/browse/SPARK-17204>
> 
> Michael
> 
>> Begin forwarded message:
>> 
>> From: Michael Allman <mich...@videoamp.com <mailto:mich...@videoamp.com>>
>> Subject: Anyone else having trouble with replicated off heap RDD persistence?
>> Date: August 16, 2016 at 3:45:14 PM PDT
>> To: user <u...@spark.apache.org <mailto:u...@spark.apache.org>>
>> 
>> Hello,
>> 
>> A coworker was having a problem with a big Spark job failing after several 
>> hours when one of the executors would segfault. That problem aside, I 
>> speculated that her job would be more robust against these kinds of executor 
>> crashes if she used replicated RDD storage. She's using off heap storage 
>> (for good reason), so I asked her to try running her job with the following 
>> storage level: `StorageLevel(useDisk = true, useMemory = true, useOffHeap = 
>> true, deserialized = false, replication = 2)`. The job would immediately 
>> fail with a rather suspicious looking exception. For example:
>> 
>> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 
>> 9086
>>  at 
>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
>>  at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
>>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
>>  at 
>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
>>  at 
>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
>>  at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>>  at 
>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>  at 
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown
>>  Source)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>>  Source)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>>  Source)
>>  at 
>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>>  at 
>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>>  at 
>> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>>  at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>>  at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>>  at org.apache.spark.scheduler.Task.run(Task.scala:85)
>>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>  at java.lang.Thread.run(Thread.java:745)
>> 
>> or
>> 
>> java.lang.IndexOutOfBoundsException: Index: 6, Size: 0
>>  at java.util.ArrayList.rangeCheck(ArrayList.java:653)
>>  at java.util.ArrayList.get(ArrayList.java:429)
>>  at 
>> com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:60)
>>  at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:834)
>>  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:788)
>>  at 
>> org.apache.spark.serializer

Re: Serving Spark ML models via a regular Python web app

2016-08-11 Thread Michael Allman
Hi Chris,

I was just checking out your project. I mentioned we use MLeap to serve 
predictions from a trained Spark ML RandomForest model. How would I do that 
with pipeline.io <http://pipeline.io/>? It isn't clear to me.

Thanks!

Michael

> On Aug 11, 2016, at 9:42 AM, Chris Fregly <ch...@fregly.com> wrote:
> 
> And here's a recent slide deck on the pipeline.io <http://pipeline.io/> that 
> summarizes what we're working on (all open source):  
> 
> https://www.slideshare.net/mobile/cfregly/advanced-spark-and-tensorflow-meetup-08042016-one-click-spark-ml-pipeline-deploy-to-production
>  
> <https://www.slideshare.net/mobile/cfregly/advanced-spark-and-tensorflow-meetup-08042016-one-click-spark-ml-pipeline-deploy-to-production>
> 
> mleap is heading the wrong direction and reinventing the wheel.  not quite 
> sure where that project will go.  doesn't seem like it will have a long 
> shelf-life in my opinion.
> 
> check out pipeline.io <http://pipeline.io/>.  some cool stuff in there.
> 
> On Aug 11, 2016, at 9:35 AM, Chris Fregly <ch...@fregly.com 
> <mailto:ch...@fregly.com>> wrote:
> 
>> this is exactly what my http://pipeline.io <http://pipeline.io/> project is 
>> addressing.  check it out and send me feedback or create issues at that 
>> github location.
>> 
>> On Aug 11, 2016, at 7:42 AM, Nicholas Chammas <nicholas.cham...@gmail.com 
>> <mailto:nicholas.cham...@gmail.com>> wrote:
>> 
>>> Thanks Michael for the reference, and thanks Nick for the comprehensive 
>>> overview of existing JIRA discussions about this. I've added myself as a 
>>> watcher on the various tasks.
>>> 
>>> On Thu, Aug 11, 2016 at 3:02 AM Nick Pentreath <nick.pentre...@gmail.com 
>>> <mailto:nick.pentre...@gmail.com>> wrote:
>>> Currently there is no direct way in Spark to serve models without bringing 
>>> in all of Spark as a dependency.
>>> 
>>> For Spark ML, there is actually no way to do it independently of DataFrames 
>>> either (which for single-instance prediction makes things sub-optimal). 
>>> That is covered here: https://issues.apache.org/jira/browse/SPARK-10413 
>>> <https://issues.apache.org/jira/browse/SPARK-10413>
>>> 
>>> So, your options are (in Scala) things like MLeap, PredictionIO, or "roll 
>>> your own". Or you can try to export to some other format such as PMML or 
>>> PFA. Some MLlib models support PMML export, but for ML it is still missing 
>>> (see https://issues.apache.org/jira/browse/SPARK-11171 
>>> <https://issues.apache.org/jira/browse/SPARK-11171>).
>>> 
>>> There is an external project for PMML too (note licensing) - 
>>> https://github.com/jpmml/jpmml-sparkml 
>>> <https://github.com/jpmml/jpmml-sparkml> - which is by now actually quite 
>>> comprehensive. It shows that PMML can represent a pretty large subset of 
>>> typical ML pipeline functionality.
>>> 
>>> On the Python side sadly there is even less - I would say your options are 
>>> pretty much "roll your own" currently, or export in PMML or PFA.
>>> 
>>> Finally, part of the "mllib-local" idea was around enabling this local 
>>> model-serving (for some initial discussion about the future see 
>>> https://issues.apache.org/jira/browse/SPARK-16365 
>>> <https://issues.apache.org/jira/browse/SPARK-16365>).
>>> 
>>> N
>>> 
>>> 
>>> On Thu, 11 Aug 2016 at 06:28 Michael Allman <mich...@videoamp.com 
>>> <mailto:mich...@videoamp.com>> wrote:
>>> Nick,
>>> 
>>> Check out MLeap: https://github.com/TrueCar/mleap 
>>> <https://github.com/TrueCar/mleap>. It's not python, but we use it in 
>>> production to serve a random forest model trained by a Spark ML pipeline.
>>> 
>>> Thanks,
>>> 
>>> Michael
>>> 
>>>> On Aug 10, 2016, at 7:50 PM, Nicholas Chammas <nicholas.cham...@gmail.com 
>>>> <mailto:nicholas.cham...@gmail.com>> wrote:
>>>> 
>>>> Are there any existing JIRAs covering the possibility of serving up Spark 
>>>> ML models via, for example, a regular Python web app?
>>>> 
>>>> The story goes like this: You train your model with Spark on several TB of 
>>>> data, and now you want to use it in a prediction service that you’re 
>>>> building, say with Flask <http://flask.pocoo.org/>. In principle, you 
>>>> don’t need Spark anymore since you’re just passing individual data points 
>>>> to your model and looking for it to spit some prediction back.
>>>> 
>>>> I assume this is something people do today, right? I presume Spark needs 
>>>> to run in their web service to serve up the model. (Sorry, I’m new to the 
>>>> ML side of Spark. )
>>>> 
>>>> Are there any JIRAs discussing potential improvements to this story? I did 
>>>> a search, but I’m not sure what exactly to look for. SPARK-4587 
>>>> <https://issues.apache.org/jira/browse/SPARK-4587> (model import/export) 
>>>> looks relevant, but doesn’t address the story directly.
>>>> 
>>>> Nick
>>>> 
>>> 



Re: Serving Spark ML models via a regular Python web app

2016-08-10 Thread Michael Allman
Nick,

Check out MLeap: https://github.com/TrueCar/mleap 
. It's not python, but we use it in 
production to serve a random forest model trained by a Spark ML pipeline.

Thanks,

Michael

> On Aug 10, 2016, at 7:50 PM, Nicholas Chammas  
> wrote:
> 
> Are there any existing JIRAs covering the possibility of serving up Spark ML 
> models via, for example, a regular Python web app?
> 
> The story goes like this: You train your model with Spark on several TB of 
> data, and now you want to use it in a prediction service that you’re 
> building, say with Flask . In principle, you don’t 
> need Spark anymore since you’re just passing individual data points to your 
> model and looking for it to spit some prediction back.
> 
> I assume this is something people do today, right? I presume Spark needs to 
> run in their web service to serve up the model. (Sorry, I’m new to the ML 
> side of Spark. )
> 
> Are there any JIRAs discussing potential improvements to this story? I did a 
> search, but I’m not sure what exactly to look for. SPARK-4587 
>  (model import/export) 
> looks relevant, but doesn’t address the story directly.
> 
> Nick
> 



Re: Scaling partitioned Hive table support

2016-08-09 Thread Michael Allman
Hi Eric,

I've rebased my first patch to master and created a Jira issue for tracking: 
https://issues.apache.org/jira/browse/SPARK-16980 
<https://issues.apache.org/jira/browse/SPARK-16980>. As mentioned in the issue, 
I will open a PR for discussion and design review, and include you in the 
conversation.

Cheers,

Michael


> On Aug 8, 2016, at 12:51 PM, Eric Liang <e...@databricks.com> wrote:
> 
> I like the former approach -- it seems more generally applicable to other 
> catalogs and IIUC would let you defer pruning until execution time. Pruning 
> is work that should be done by the catalog anyways, as is the case when 
> querying over an (unconverted) hive table.
> 
> You might also want to look at https://github.com/apache/spark/pull/14241 
> <https://github.com/apache/spark/pull/14241> , which refactors some of the 
> file scan execution to defer pruning.
> 
> 
> On Mon, Aug 8, 2016, 11:53 AM Michael Allman <mich...@videoamp.com 
> <mailto:mich...@videoamp.com>> wrote:
> Hello,
> 
> I'd like to propose a modification in the way Hive table partition metadata 
> are loaded and cached. Currently, when a user reads from a partitioned Hive 
> table whose metadata are not cached (and for which Hive table conversion is 
> enabled and supported), all partition metadata is fetched from the metastore:
> 
> https://github.com/apache/spark/blob/5effc016c893ce917d535cc1b5026d8e4c846721/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L252-L260
>  
> <https://github.com/apache/spark/blob/5effc016c893ce917d535cc1b5026d8e4c846721/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L252-L260>
> 
> This is highly inefficient in some scenarios. In the most extreme case, a 
> user starts a new Spark app, runs a query which reads from a single partition 
> in a table with a large number of partitions and terminates their app. All 
> partition metadata are loaded and their files' schema are merged, but only a 
> single partition is read. Instead, I propose we load and cache partition 
> metadata on-demand, as needed to build query plans.
> 
> We've long encountered this performance problem at VideoAmp and have taken 
> different approaches to address it. In addition to the load time, we've found 
> that loading all of a table's partition metadata can require a significant 
> amount of JVM heap space. Our largest tables OOM our Spark drivers unless we 
> allocate several GB of heap space.
> 
> Certainly one could argue that our situation is pathological and rare, and 
> that the problem in our scenario is with the design of our tables—not Spark. 
> However, even in tables with more modest numbers of partitions, loading only 
> the necessary partition metadata and file schema can significantly reduce the 
> query planning time, and is definitely more memory efficient.
> 
> I've written POCs for a couple of different implementation approaches. Though 
> incomplete, both have been successful in their basic goal. The first extends 
> `org.apache.spark.sql.catalyst.catalog.ExternalCatalog` and as such is more 
> general. It requires some new abstractions and refactoring of 
> `HadoopFsRelation` and `FileCatalog`, among others. It places a greater 
> burden on other implementations of `ExternalCatalog`. Currently the only 
> other implementation of `ExternalCatalog` is `InMemoryCatalog`, and my code 
> throws an `UnsupportedOperationException` on that implementation.
> 
> The other approach is simpler and only touches code in the codebase's `hive` 
> project. Basically, conversion of `MetastoreRelation` to `HadoopFsRelation` 
> is deferred to physical planning when the metastore relation is partitioned. 
> During physical planning, the partition pruning filters in a logical query 
> plan are used to select the required partition metadata and a 
> `HadoopFsRelation` is built from those. The new logical plan is then 
> re-injected into the planner.
> 
> I'd like to get the community's thoughts on my proposal and implementation 
> approaches.
> 
> Thanks!
> 
> Michael



Re: Scaling partitioned Hive table support

2016-08-08 Thread Michael Allman
Hi Eric,

Thanks for your feedback. I'm rebasing my code for the first approach on a more 
recent Spark master and am resolving some conflicts. I'll have a better 
understanding of the relationship to your PR once my rebase is complete.

Cheers,

Michael

> On Aug 8, 2016, at 12:51 PM, Eric Liang <e...@databricks.com> wrote:
> 
> I like the former approach -- it seems more generally applicable to other 
> catalogs and IIUC would let you defer pruning until execution time. Pruning 
> is work that should be done by the catalog anyways, as is the case when 
> querying over an (unconverted) hive table.
> 
> You might also want to look at https://github.com/apache/spark/pull/14241 
> <https://github.com/apache/spark/pull/14241> , which refactors some of the 
> file scan execution to defer pruning.
> 
> 
> On Mon, Aug 8, 2016, 11:53 AM Michael Allman <mich...@videoamp.com 
> <mailto:mich...@videoamp.com>> wrote:
> Hello,
> 
> I'd like to propose a modification in the way Hive table partition metadata 
> are loaded and cached. Currently, when a user reads from a partitioned Hive 
> table whose metadata are not cached (and for which Hive table conversion is 
> enabled and supported), all partition metadata is fetched from the metastore:
> 
> https://github.com/apache/spark/blob/5effc016c893ce917d535cc1b5026d8e4c846721/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L252-L260
>  
> <https://github.com/apache/spark/blob/5effc016c893ce917d535cc1b5026d8e4c846721/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L252-L260>
> 
> This is highly inefficient in some scenarios. In the most extreme case, a 
> user starts a new Spark app, runs a query which reads from a single partition 
> in a table with a large number of partitions and terminates their app. All 
> partition metadata are loaded and their files' schema are merged, but only a 
> single partition is read. Instead, I propose we load and cache partition 
> metadata on-demand, as needed to build query plans.
> 
> We've long encountered this performance problem at VideoAmp and have taken 
> different approaches to address it. In addition to the load time, we've found 
> that loading all of a table's partition metadata can require a significant 
> amount of JVM heap space. Our largest tables OOM our Spark drivers unless we 
> allocate several GB of heap space.
> 
> Certainly one could argue that our situation is pathological and rare, and 
> that the problem in our scenario is with the design of our tables—not Spark. 
> However, even in tables with more modest numbers of partitions, loading only 
> the necessary partition metadata and file schema can significantly reduce the 
> query planning time, and is definitely more memory efficient.
> 
> I've written POCs for a couple of different implementation approaches. Though 
> incomplete, both have been successful in their basic goal. The first extends 
> `org.apache.spark.sql.catalyst.catalog.ExternalCatalog` and as such is more 
> general. It requires some new abstractions and refactoring of 
> `HadoopFsRelation` and `FileCatalog`, among others. It places a greater 
> burden on other implementations of `ExternalCatalog`. Currently the only 
> other implementation of `ExternalCatalog` is `InMemoryCatalog`, and my code 
> throws an `UnsupportedOperationException` on that implementation.
> 
> The other approach is simpler and only touches code in the codebase's `hive` 
> project. Basically, conversion of `MetastoreRelation` to `HadoopFsRelation` 
> is deferred to physical planning when the metastore relation is partitioned. 
> During physical planning, the partition pruning filters in a logical query 
> plan are used to select the required partition metadata and a 
> `HadoopFsRelation` is built from those. The new logical plan is then 
> re-injected into the planner.
> 
> I'd like to get the community's thoughts on my proposal and implementation 
> approaches.
> 
> Thanks!
> 
> Michael



Scaling partitioned Hive table support

2016-08-08 Thread Michael Allman
Hello,

I'd like to propose a modification in the way Hive table partition metadata are 
loaded and cached. Currently, when a user reads from a partitioned Hive table 
whose metadata are not cached (and for which Hive table conversion is enabled 
and supported), all partition metadata is fetched from the metastore:

https://github.com/apache/spark/blob/5effc016c893ce917d535cc1b5026d8e4c846721/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L252-L260
 


This is highly inefficient in some scenarios. In the most extreme case, a user 
starts a new Spark app, runs a query which reads from a single partition in a 
table with a large number of partitions and terminates their app. All partition 
metadata are loaded and their files' schema are merged, but only a single 
partition is read. Instead, I propose we load and cache partition metadata 
on-demand, as needed to build query plans.

We've long encountered this performance problem at VideoAmp and have taken 
different approaches to address it. In addition to the load time, we've found 
that loading all of a table's partition metadata can require a significant 
amount of JVM heap space. Our largest tables OOM our Spark drivers unless we 
allocate several GB of heap space.

Certainly one could argue that our situation is pathological and rare, and that 
the problem in our scenario is with the design of our tables—not Spark. 
However, even in tables with more modest numbers of partitions, loading only 
the necessary partition metadata and file schema can significantly reduce the 
query planning time, and is definitely more memory efficient.

I've written POCs for a couple of different implementation approaches. Though 
incomplete, both have been successful in their basic goal. The first extends 
`org.apache.spark.sql.catalyst.catalog.ExternalCatalog` and as such is more 
general. It requires some new abstractions and refactoring of 
`HadoopFsRelation` and `FileCatalog`, among others. It places a greater burden 
on other implementations of `ExternalCatalog`. Currently the only other 
implementation of `ExternalCatalog` is `InMemoryCatalog`, and my code throws an 
`UnsupportedOperationException` on that implementation.

The other approach is simpler and only touches code in the codebase's `hive` 
project. Basically, conversion of `MetastoreRelation` to `HadoopFsRelation` is 
deferred to physical planning when the metastore relation is partitioned. 
During physical planning, the partition pruning filters in a logical query plan 
are used to select the required partition metadata and a `HadoopFsRelation` is 
built from those. The new logical plan is then re-injected into the planner.

I'd like to get the community's thoughts on my proposal and implementation 
approaches.

Thanks!

Michael

Re: Build speed

2016-07-22 Thread Michael Allman
I use sbt. Rebuilds are super fast.

Michael

> On Jul 22, 2016, at 7:54 AM, Mikael Ståldal  wrote:
> 
> Is there any way to speed up an incremental build of Spark?
> 
> For me it takes 8 minutes to build the project with just a few code changes.
> 
> -- 
>  
> 
> Mikael Ståldal
> Senior software developer 
> 
> Magine TV
> mikael.stal...@magine.com 
> Grev Turegatan 3  | 114 46 Stockholm, Sweden  |   www.magine.com  
> 
> 
> Privileged and/or Confidential Information may be contained in this message. 
> If you are not the addressee indicated in this message
> (or responsible for delivery of the message to such a person), you may not 
> copy or deliver this message to anyone. In such case, 
> you should destroy this message and kindly notify the sender by reply email.  
>  



Re: [VOTE] Release Apache Spark 2.0.0 (RC5)

2016-07-20 Thread Michael Allman
I've run some tests with some real and some synthetic parquet data with nested 
columns with and without the hive metastore on our Spark 1.5, 1.6 and 2.0 
versions. I haven't seen any unexpected performance surprises, except that 
Spark 2.0 now does schema inference across all files in a partitioned parquet 
metastore table. Granted, you aren't using a metastore table, but maybe Spark 
does that for partitioned non-metastore tables as well.

Michael

> On Jul 20, 2016, at 2:16 PM, Maciej Bryński  wrote:
> 
> @Michael,
> I answered in Jira and could repeat here.
> I think that my problem is unrelated to Hive, because I'm using read.parquet 
> method.
> I also attached some VisualVM snapshots to SPARK-16321 (I think I should 
> merge both issues)
> And code profiling suggest bottleneck when reading parquet file.
> 
> I wonder if there are any other benchmarks related to parquet performance.
> 
> Regards,
> -- 
> Maciek Bryński


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [VOTE] Release Apache Spark 2.0.0 (RC5)

2016-07-20 Thread Michael Allman
Marcin,

I'm not sure what you're referring to. Can you be more specific?

Cheers,

Michael

> On Jul 20, 2016, at 9:10 AM, Marcin Tustin  wrote:
> 
> Whatever happened with the query regarding benchmarks? Is that resolved?
> 
> On Tue, Jul 19, 2016 at 10:35 PM, Reynold Xin  > wrote:
> Please vote on releasing the following candidate as Apache Spark version 
> 2.0.0. The vote is open until Friday, July 22, 2016 at 20:00 PDT and passes 
> if a majority of at least 3 +1 PMC votes are cast.
> 
> [ ] +1 Release this package as Apache Spark 2.0.0
> [ ] -1 Do not release this package because ...
> 
> 
> The tag to be voted on is v2.0.0-rc5 
> (13650fc58e1fcf2cf2a26ba11c819185ae1acc1f).
> 
> This release candidate resolves ~2500 issues: 
> https://s.apache.org/spark-2.0.0-jira 
> 
> The release files, including signatures, digests, etc. can be found at:
> http://people.apache.org/~pwendell/spark-releases/spark-2.0.0-rc5-bin/ 
> 
> 
> Release artifacts are signed with the following key:
> https://people.apache.org/keys/committer/pwendell.asc 
> 
> 
> The staging repository for this release can be found at:
> https://repository.apache.org/content/repositories/orgapachespark-1195/ 
> 
> 
> The documentation corresponding to this release can be found at:
> http://people.apache.org/~pwendell/spark-releases/spark-2.0.0-rc5-docs/ 
> 
> 
> 
> =
> How can I help test this release?
> =
> If you are a Spark user, you can help us test this release by taking an 
> existing Spark workload and running on this release candidate, then reporting 
> any regressions from 1.x.
> 
> ==
> What justifies a -1 vote for this release?
> ==
> Critical bugs impacting major functionalities.
> 
> Bugs already present in 1.x, missing features, or bugs related to new 
> features will not necessarily block this release. Note that historically 
> Spark documentation has been published on the website separately from the 
> main release so we do not need to block the release due to documentation 
> errors either.
> 
> 
> 
> Want to work at Handy? Check out our culture deck and open roles 
> 
> Latest news  at Handy
> Handy just raised $50m 
> 
>  led by Fidelity
> 
> 



Re: transtition SQLContext to SparkSession

2016-07-19 Thread Michael Allman
Hi Reynold,

So far we've been able to transition everything to `SparkSession`. I was just 
following up on behalf of Maciej.

Michael

> On Jul 19, 2016, at 11:02 AM, Reynold Xin <r...@databricks.com> wrote:
> 
> dropping user list
> 
> Yup I just took a look -- you are right.
> 
> What's the reason you'd need a HiveContext? The only method that HiveContext 
> has and SQLContext does not have is refreshTable. Given this is meant for 
> helping code transition, it might be easier to just use SQLContext and change 
> the places that use refreshTable?
> 
> In order for SparkSession.sqlContext to return an actual HiveContext, we'd 
> need to use reflection to create a HiveContext, which is pretty hacky.
> 
> 
> 
> On Tue, Jul 19, 2016 at 10:58 AM, Michael Allman <mich...@videoamp.com 
> <mailto:mich...@videoamp.com>> wrote:
> Sorry Reynold, I want to triple check this with you. I'm looking at the 
> `SparkSession.sqlContext` field in the latest 2.0 branch, and it appears that 
> that val is set specifically to an instance of the `SQLContext` class. A cast 
> to `HiveContext` will fail. Maybe there's a misunderstanding here. This is 
> what I'm looking at:
> 
> https://github.com/apache/spark/blob/24ea875198ffcef4a4c3ba28aba128d6d7d9a395/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L122
>  
> <https://github.com/apache/spark/blob/24ea875198ffcef4a4c3ba28aba128d6d7d9a395/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L122>
> 
> Michael
> 
> 
> 
>> On Jul 19, 2016, at 10:01 AM, Reynold Xin <r...@databricks.com 
>> <mailto:r...@databricks.com>> wrote:
>> 
>> Yes. But in order to access methods available only in HiveContext a user 
>> cast is required. 
>> 
>> On Tuesday, July 19, 2016, Maciej Bryński <mac...@brynski.pl 
>> <mailto:mac...@brynski.pl>> wrote:
>> @Reynold Xin,
>> How this will work with Hive Support ?
>> SparkSession.sqlContext return HiveContext ?
>> 
>> 2016-07-19 0:26 GMT+02:00 Reynold Xin <r...@databricks.com <>>:
>> > Good idea.
>> >
>> > https://github.com/apache/spark/pull/14252 
>> > <https://github.com/apache/spark/pull/14252>
>> >
>> >
>> >
>> > On Mon, Jul 18, 2016 at 12:16 PM, Michael Armbrust <mich...@databricks.com 
>> > <>>
>> > wrote:
>> >>
>> >> + dev, reynold
>> >>
>> >> Yeah, thats a good point.  I wonder if SparkSession.sqlContext should be
>> >> public/deprecated?
>> >>
>> >> On Mon, Jul 18, 2016 at 8:37 AM, Koert Kuipers <ko...@tresata.com <>> 
>> >> wrote:
>> >>>
>> >>> in my codebase i would like to gradually transition to SparkSession, so
>> >>> while i start using SparkSession i also want a SQLContext to be 
>> >>> available as
>> >>> before (but with a deprecated warning when i use it). this should be easy
>> >>> since SQLContext is now a wrapper for SparkSession.
>> >>>
>> >>> so basically:
>> >>> val session = SparkSession.builder.set(..., ...).getOrCreate()
>> >>> val sqlc = new SQLContext(session)
>> >>>
>> >>> however this doesnt work, the SQLContext constructor i am trying to use
>> >>> is private. SparkSession.sqlContext is also private.
>> >>>
>> >>> am i missing something?
>> >>>
>> >>> a non-gradual switch is not very realistic in any significant codebase,
>> >>> and i do not want to create SparkSession and SQLContext independendly 
>> >>> (both
>> >>> from same SparkContext) since that can only lead to confusion and
>> >>> inconsistent settings.
>> >>
>> >>
>> >
>> 
>> 
>> 
>> --
>> Maciek Bryński
> 
> 



Re: transtition SQLContext to SparkSession

2016-07-19 Thread Michael Allman
Sorry Reynold, I want to triple check this with you. I'm looking at the 
`SparkSession.sqlContext` field in the latest 2.0 branch, and it appears that 
that val is set specifically to an instance of the `SQLContext` class. A cast 
to `HiveContext` will fail. Maybe there's a misunderstanding here. This is what 
I'm looking at:

https://github.com/apache/spark/blob/24ea875198ffcef4a4c3ba28aba128d6d7d9a395/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L122

Michael


> On Jul 19, 2016, at 10:01 AM, Reynold Xin  wrote:
> 
> Yes. But in order to access methods available only in HiveContext a user cast 
> is required. 
> 
> On Tuesday, July 19, 2016, Maciej Bryński  > wrote:
> @Reynold Xin,
> How this will work with Hive Support ?
> SparkSession.sqlContext return HiveContext ?
> 
> 2016-07-19 0:26 GMT+02:00 Reynold Xin >:
> > Good idea.
> >
> > https://github.com/apache/spark/pull/14252 
> > 
> >
> >
> >
> > On Mon, Jul 18, 2016 at 12:16 PM, Michael Armbrust  > >
> > wrote:
> >>
> >> + dev, reynold
> >>
> >> Yeah, thats a good point.  I wonder if SparkSession.sqlContext should be
> >> public/deprecated?
> >>
> >> On Mon, Jul 18, 2016 at 8:37 AM, Koert Kuipers  >> > wrote:
> >>>
> >>> in my codebase i would like to gradually transition to SparkSession, so
> >>> while i start using SparkSession i also want a SQLContext to be available 
> >>> as
> >>> before (but with a deprecated warning when i use it). this should be easy
> >>> since SQLContext is now a wrapper for SparkSession.
> >>>
> >>> so basically:
> >>> val session = SparkSession.builder.set(..., ...).getOrCreate()
> >>> val sqlc = new SQLContext(session)
> >>>
> >>> however this doesnt work, the SQLContext constructor i am trying to use
> >>> is private. SparkSession.sqlContext is also private.
> >>>
> >>> am i missing something?
> >>>
> >>> a non-gradual switch is not very realistic in any significant codebase,
> >>> and i do not want to create SparkSession and SQLContext independendly 
> >>> (both
> >>> from same SparkContext) since that can only lead to confusion and
> >>> inconsistent settings.
> >>
> >>
> >
> 
> 
> 
> --
> Maciek Bryński



Re: Spark 2.0.0 performance; potential large Spark core regression

2016-07-08 Thread Michael Allman
Here are some settings we use for some very large GraphX jobs. These are based 
on using EC2 c3.8xl workers:

.set("spark.sql.shuffle.partitions", "1024")
.set("spark.sql.tungsten.enabled", "true")
.set("spark.executor.memory", "24g")
.set("spark.kryoserializer.buffer.max","1g")
.set("spark.sql.codegen.wholeStage", "true")
.set("spark.memory.offHeap.enabled", "true")
.set("spark.memory.offHeap.size", "25769803776") // 24 GB

Some of these are in fact default configurations. Some are not.

Michael


> On Jul 8, 2016, at 9:01 AM, Michael Allman <mich...@videoamp.com> wrote:
> 
> Hi Adam,
> 
> From our experience we've found the default Spark 2.0 configuration to be 
> highly suboptimal. I don't know if this affects your benchmarks, but I would 
> consider running some tests with tuned and alternate configurations.
> 
> Michael
> 
> 
>> On Jul 8, 2016, at 8:58 AM, Adam Roberts <arobe...@uk.ibm.com 
>> <mailto:arobe...@uk.ibm.com>> wrote:
>> 
>> Hi Michael, the two Spark configuration files aren't very exciting 
>> 
>> spark-env.sh 
>> Same as the template apart from a JAVA_HOME setting 
>> 
>> spark-defaults.conf 
>> spark.io.compression.codec lzf 
>> 
>> config.py has the Spark home set, is running Spark standalone mode, we run 
>> and prep Spark tests only, driver 8g, executor memory 16g, Kryo, 0.66 memory 
>> fraction, 100 trials 
>> 
>> We can post the 1.6.2 comparison early next week, running lots of iterations 
>> over the weekend once we get the dedicated time again 
>> 
>> Cheers, 
>> 
>> 
>> 
>> 
>> 
>> From:Michael Allman <mich...@videoamp.com 
>> <mailto:mich...@videoamp.com>> 
>> To:Adam Roberts/UK/IBM@IBMGB 
>> Cc:dev <dev@spark.apache.org <mailto:dev@spark.apache.org>> 
>> Date:08/07/2016 16:44 
>> Subject:Re: Spark 2.0.0 performance; potential large Spark core 
>> regression 
>> 
>> 
>> 
>> Hi Adam, 
>> 
>> Do you have your spark confs and your spark-env.sh somewhere where we can 
>> see them? If not, can you make them available? 
>> 
>> Cheers, 
>> 
>> Michael 
>> 
>> On Jul 8, 2016, at 3:17 AM, Adam Roberts <arobe...@uk.ibm.com 
>> <mailto:arobe...@uk.ibm.com>> wrote: 
>> 
>> Hi, we've been testing the performance of Spark 2.0 compared to previous 
>> releases, unfortunately there are no Spark 2.0 compatible versions of 
>> HiBench and SparkPerf apart from those I'm working on (see 
>> https://github.com/databricks/spark-perf/issues/108 
>> <https://github.com/databricks/spark-perf/issues/108>) 
>> 
>> With the Spark 2.0 version of SparkPerf we've noticed a 30% geomean 
>> regression with a very small scale factor and so we've generated a couple of 
>> profiles comparing 1.5.2 vs 2.0.0. Same JDK version and same platform. We 
>> will gather a 1.6.2 comparison and increase the scale factor. 
>> 
>> Has anybody noticed a similar problem? My changes for SparkPerf and Spark 
>> 2.0 are very limited and AFAIK don't interfere with Spark core 
>> functionality, so any feedback on the changes would be much appreciated and 
>> welcome, I'd much prefer it if my changes are the problem. 
>> 
>> A summary for your convenience follows (this matches what I've mentioned on 
>> the SparkPerf issue above) 
>> 
>> 1. spark-perf/config/config.py : SCALE_FACTOR=0.05
>> No. Of Workers: 1
>> Executor per Worker : 1
>> Executor Memory: 18G
>> Driver Memory : 8G
>> Serializer: kryo 
>> 
>> 2. $SPARK_HOME/conf/spark-defaults.conf: executor Java Options: 
>> -Xdisableexplicitgc -Xcompressedrefs 
>> 
>> Main changes I made for the benchmark itself
>> Use Scala 2.11.8 and Spark 2.0.0 RC2 on our local filesystem
>> MLAlgorithmTests use Vectors.fromML
>> For streaming-tests in HdfsRecoveryTest we use wordStream.foreachRDD not 
>> wordStream.foreach
>> KVDataTest uses awaitTerminationOrTimeout in a SparkStreamingContext instead 
>> of awaitTermination
>> Trivial: we use compact not compact.render for outputting json
>> 
>> In Spark 2.0 the top five methods where we spend our time is as follows, the 
>> percentage is how much of the overall processing time was spent in this 
>> particular method: 
>> 1.AppendOnlyMap.changeValue 44% 
>> 2.SortShuffleWriter.write 19% 
&g

Re: Spark 2.0.0 performance; potential large Spark core regression

2016-07-08 Thread Michael Allman
Hi Adam,

From our experience we've found the default Spark 2.0 configuration to be 
highly suboptimal. I don't know if this affects your benchmarks, but I would 
consider running some tests with tuned and alternate configurations.

Michael


> On Jul 8, 2016, at 8:58 AM, Adam Roberts <arobe...@uk.ibm.com> wrote:
> 
> Hi Michael, the two Spark configuration files aren't very exciting 
> 
> spark-env.sh 
> Same as the template apart from a JAVA_HOME setting 
> 
> spark-defaults.conf 
> spark.io.compression.codec lzf 
> 
> config.py has the Spark home set, is running Spark standalone mode, we run 
> and prep Spark tests only, driver 8g, executor memory 16g, Kryo, 0.66 memory 
> fraction, 100 trials 
> 
> We can post the 1.6.2 comparison early next week, running lots of iterations 
> over the weekend once we get the dedicated time again 
> 
> Cheers, 
> 
> 
> 
> 
> 
> From:Michael Allman <mich...@videoamp.com> 
> To:Adam Roberts/UK/IBM@IBMGB 
> Cc:dev <dev@spark.apache.org> 
> Date:08/07/2016 16:44 
> Subject:Re: Spark 2.0.0 performance; potential large Spark core 
> regression 
> 
> 
> 
> Hi Adam, 
> 
> Do you have your spark confs and your spark-env.sh somewhere where we can see 
> them? If not, can you make them available? 
> 
> Cheers, 
> 
> Michael 
> 
> On Jul 8, 2016, at 3:17 AM, Adam Roberts <arobe...@uk.ibm.com 
> <mailto:arobe...@uk.ibm.com>> wrote: 
> 
> Hi, we've been testing the performance of Spark 2.0 compared to previous 
> releases, unfortunately there are no Spark 2.0 compatible versions of HiBench 
> and SparkPerf apart from those I'm working on (see 
> https://github.com/databricks/spark-perf/issues/108 
> <https://github.com/databricks/spark-perf/issues/108>) 
> 
> With the Spark 2.0 version of SparkPerf we've noticed a 30% geomean 
> regression with a very small scale factor and so we've generated a couple of 
> profiles comparing 1.5.2 vs 2.0.0. Same JDK version and same platform. We 
> will gather a 1.6.2 comparison and increase the scale factor. 
> 
> Has anybody noticed a similar problem? My changes for SparkPerf and Spark 2.0 
> are very limited and AFAIK don't interfere with Spark core functionality, so 
> any feedback on the changes would be much appreciated and welcome, I'd much 
> prefer it if my changes are the problem. 
> 
> A summary for your convenience follows (this matches what I've mentioned on 
> the SparkPerf issue above) 
> 
> 1. spark-perf/config/config.py : SCALE_FACTOR=0.05
> No. Of Workers: 1
> Executor per Worker : 1
> Executor Memory: 18G
> Driver Memory : 8G
> Serializer: kryo 
> 
> 2. $SPARK_HOME/conf/spark-defaults.conf: executor Java Options: 
> -Xdisableexplicitgc -Xcompressedrefs 
> 
> Main changes I made for the benchmark itself
> Use Scala 2.11.8 and Spark 2.0.0 RC2 on our local filesystem
> MLAlgorithmTests use Vectors.fromML
> For streaming-tests in HdfsRecoveryTest we use wordStream.foreachRDD not 
> wordStream.foreach
> KVDataTest uses awaitTerminationOrTimeout in a SparkStreamingContext instead 
> of awaitTermination
> Trivial: we use compact not compact.render for outputting json
> 
> In Spark 2.0 the top five methods where we spend our time is as follows, the 
> percentage is how much of the overall processing time was spent in this 
> particular method: 
> 1.AppendOnlyMap.changeValue 44% 
> 2.SortShuffleWriter.write 19% 
> 3.SizeTracker.estimateSize 7.5% 
> 4.SizeEstimator.estimate 5.36% 
> 5.Range.foreach 3.6% 
> 
> and in 1.5.2 the top five methods are: 
> 1.AppendOnlyMap.changeValue 38% 
> 2.ExternalSorter.insertAll 33% 
> 3.Range.foreach 4% 
> 4.SizeEstimator.estimate 2% 
> 5.SizeEstimator.visitSingleObject 2% 
> 
> I see the following scores, on the left I have the test name followed by the 
> 1.5.2 time and then the 2.0.0 time
> scheduling throughput: 5.2s vs 7.08s
> agg by key; 0.72s vs 1.01s
> agg by key int: 0.93s vs 1.19s
> agg by key naive: 1.88s vs 2.02
> sort by key: 0.64s vs 0.8s
> sort by key int: 0.59s vs 0.64s
> scala count: 0.09s vs 0.08s
> scala count w fltr: 0.31s vs 0.47s 
> 
> This is only running the Spark core tests (scheduling throughput through 
> scala-count-w-filtr, including all inbetween). 
> 
> Cheers, 
> 
> 
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number 
> 741598. 
> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU 
> 
> 
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number 
> 741598. 
> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU



Spark performance regression test suite

2016-07-08 Thread Michael Allman
Hello,

I've seen a few messages on the mailing list regarding Spark performance 
concerns, especially regressions from previous versions. It got me thinking 
that perhaps an automated performance regression suite would be a worthwhile 
contribution? Is anyone working on this? Do we have a Jira issue for it?

I cannot commit to taking charge of such a project. I just thought it would be 
a great contribution for someone who does have the time and the chops to build 
it.

Cheers,

Michael
-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Spark 2.0.0 performance; potential large Spark core regression

2016-07-08 Thread Michael Allman
Hi Adam,

Do you have your spark confs and your spark-env.sh somewhere where we can see 
them? If not, can you make them available?

Cheers,

Michael

> On Jul 8, 2016, at 3:17 AM, Adam Roberts  wrote:
> 
> Hi, we've been testing the performance of Spark 2.0 compared to previous 
> releases, unfortunately there are no Spark 2.0 compatible versions of HiBench 
> and SparkPerf apart from those I'm working on (see 
> https://github.com/databricks/spark-perf/issues/108 
> ) 
> 
> With the Spark 2.0 version of SparkPerf we've noticed a 30% geomean 
> regression with a very small scale factor and so we've generated a couple of 
> profiles comparing 1.5.2 vs 2.0.0. Same JDK version and same platform. We 
> will gather a 1.6.2 comparison and increase the scale factor. 
> 
> Has anybody noticed a similar problem? My changes for SparkPerf and Spark 2.0 
> are very limited and AFAIK don't interfere with Spark core functionality, so 
> any feedback on the changes would be much appreciated and welcome, I'd much 
> prefer it if my changes are the problem. 
> 
> A summary for your convenience follows (this matches what I've mentioned on 
> the SparkPerf issue above) 
> 
> 1. spark-perf/config/config.py : SCALE_FACTOR=0.05
> No. Of Workers: 1
> Executor per Worker : 1
> Executor Memory: 18G
> Driver Memory : 8G
> Serializer: kryo 
> 
> 2. $SPARK_HOME/conf/spark-defaults.conf: executor Java Options: 
> -Xdisableexplicitgc -Xcompressedrefs 
> 
> Main changes I made for the benchmark itself
> Use Scala 2.11.8 and Spark 2.0.0 RC2 on our local filesystem
> MLAlgorithmTests use Vectors.fromML
> For streaming-tests in HdfsRecoveryTest we use wordStream.foreachRDD not 
> wordStream.foreach
> KVDataTest uses awaitTerminationOrTimeout in a SparkStreamingContext instead 
> of awaitTermination
> Trivial: we use compact not compact.render for outputting json
> 
> In Spark 2.0 the top five methods where we spend our time is as follows, the 
> percentage is how much of the overall processing time was spent in this 
> particular method: 
> 1.AppendOnlyMap.changeValue 44% 
> 2.SortShuffleWriter.write 19% 
> 3.SizeTracker.estimateSize 7.5% 
> 4.SizeEstimator.estimate 5.36% 
> 5.Range.foreach 3.6% 
> 
> and in 1.5.2 the top five methods are: 
> 1.AppendOnlyMap.changeValue 38% 
> 2.ExternalSorter.insertAll 33% 
> 3.Range.foreach 4% 
> 4.SizeEstimator.estimate 2% 
> 5.SizeEstimator.visitSingleObject 2% 
> 
> I see the following scores, on the left I have the test name followed by the 
> 1.5.2 time and then the 2.0.0 time
> scheduling throughput: 5.2s vs 7.08s
> agg by key; 0.72s vs 1.01s
> agg by key int: 0.93s vs 1.19s
> agg by key naive: 1.88s vs 2.02
> sort by key: 0.64s vs 0.8s
> sort by key int: 0.59s vs 0.64s
> scala count: 0.09s vs 0.08s
> scala count w fltr: 0.31s vs 0.47s 
> 
> This is only running the Spark core tests (scheduling throughput through 
> scala-count-w-filtr, including all inbetween). 
> 
> Cheers, 
> 
> 
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number 
> 741598. 
> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU



Re: Anyone knows the hive repo for spark-2.0?

2016-07-07 Thread Michael Allman
FYI if you just want to look at the source code, there are source jars for 
those binary versions in maven central. I was just looking at the metastore 
source code last night.

Michael

> On Jul 7, 2016, at 12:13 PM, Jonathan Kelly  wrote:
> 
> I'm not sure, but I think it's 
> https://github.com/JoshRosen/hive/tree/release-1.2.1-spark2 
> .
> 
> It would be really nice though to have this whole process better documented 
> and more "official" than just building from somebody's personal fork of Hive.
> 
> Or is there some way that the Spark community could contribute back these 
> changes to Hive in such a way that they would accept them into trunk? Then 
> Spark could depend upon an official version of Hive rather than this fork.
> 
> ~ Jonathan
> 
> On Thu, Jul 7, 2016 at 11:46 AM Marcelo Vanzin  > wrote:
> (Actually that's "spark" and not "spark2", so yeah, that doesn't
> really answer the question.)
> 
> On Thu, Jul 7, 2016 at 11:38 AM, Marcelo Vanzin  > wrote:
> > My guess would be https://github.com/pwendell/hive/tree/release-1.2.1-spark 
> > 
> >
> > On Thu, Jul 7, 2016 at 11:37 AM, Zhan Zhang  > > wrote:
> >> I saw the pom file having hive version as
> >> 1.2.1.spark2. But I cannot find the branch in
> >> https://github.com/pwendell/ 
> >>
> >> Does anyone know where the repo is?
> >>
> >> Thanks.
> >>
> >> Zhan Zhang
> >>
> >>
> >>
> >>
> >> --
> >> View this message in context: 
> >> http://apache-spark-developers-list.1001551.n3.nabble.com/Anyone-knows-the-hive-repo-for-spark-2-0-tp18234.html
> >>  
> >> 
> >> Sent from the Apache Spark Developers List mailing list archive at 
> >> Nabble.com.
> >>
> >> -
> >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org 
> >> 
> >>
> >
> >
> >
> > --
> > Marcelo
> 
> 
> 
> --
> Marcelo
> 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org 
> 
> 



Re: SparkSession replace SQLContext

2016-07-05 Thread Michael Allman
These topics have been included in the documentation for recent builds of Spark 
2.0.

Michael

> On Jul 5, 2016, at 3:49 AM, Romi Kuntsman  wrote:
> 
> You can also claim that there's a whole section of "Migrating from 1.6 to 
> 2.0" missing there:
> https://spark.apache.org/docs/2.0.0-preview/sql-programming-guide.html#migration-guide
>  
> 
> 
> Romi Kuntsman, Big Data Engineer
> http://www.totango.com 
> 
> On Tue, Jul 5, 2016 at 12:24 PM, nihed mbarek  > wrote:
> Hi,
> 
> I just discover that that SparkSession will replace SQLContext for spark 2.0
> JavaDoc is clear 
> https://spark.apache.org/docs/2.0.0-preview/api/java/org/apache/spark/sql/SparkSession.html
>  
> 
> but there is no mention in sql programming guide
> https://spark.apache.org/docs/2.0.0-preview/sql-programming-guide.html#starting-point-sqlcontext
>  
> 
> 
> Is it possible to update documentation before the release ?
> 
> 
> Thank you
> 
> -- 
> 
> MBAREK Med Nihed,
> Fedora Ambassador, TUNISIA, Northern Africa
> http://www.nihed.com 
> 
>  
> 
> 



Can't build scala unidoc since Kafka 0.10 support was added

2016-07-02 Thread Michael Allman
Hello,

I'm no longer able to successfully run `sbt unidoc` in branch-2.0, and the 
problem seems to stem from the addition of Kafka 0.10 support. If I remove 
either the Kafka 0.8 or 0.10 projects from the build then unidoc works. If I 
keep both in I get two dozen inexplicable compilation errors as part of the 
unidoc task execution. Here's the first few:

[error] 
/Users/msa/workspace/spark-2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala:50:
 value assign is not a member of 
org.apache.kafka.clients.consumer.KafkaConsumer[K,V]
[error] c.assign(tps)
[error]   ^
[error] 
/Users/msa/workspace/spark-2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala:95:
 too many arguments for method seek: (x$1: 
java.util.Map[org.apache.kafka.common.TopicPartition,Long])Unit
[error] consumer.seek(topicPartition, offset)
[error]  ^
[error] 
/Users/msa/workspace/spark-2.0/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala:100:
 value records is not a member of 
java.util.Map[String,org.apache.kafka.clients.consumer.ConsumerRecords[K,V]]
[error] val r = p.records(topicPartition)

Running `sbt compile` completes without error.

Has anyone else seen this behavior? Any ideas? This seems to be an issue around 
dependency management, but I'm otherwise stumped.

Cheers,

Michael
-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Bitmap Indexing to increase OLAP query performance

2016-06-30 Thread Michael Allman
Hi Nishadi,

I have not seen bloom filters in Spark. They are mentioned as part of the Orc 
file format, but I don't know if Spark uses them: 
https://orc.apache.org/docs/spec-index.html. Parquet has block-level min/max 
values, null counts, etc for leaf columns in its metadata. I don't believe 
Spark uses those directly either, though the underlying column reader may. See 
https://github.com/apache/parquet-mr/tree/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata
 and 
https://github.com/apache/parquet-mr/tree/master/parquet-column/src/main/java/org/apache/parquet/column/statistics.

Michael


> On Jun 29, 2016, at 11:27 PM, Nishadi Kirielle  wrote:
> 
> Thank you for the response. 
> Can I please know the reason why bit map indexes are not appropriate for big 
> data. 
> Rather than using the traditional bitmap indexing techniques we are planning 
> to implement a combination of novel bitmap indexing techniques like bit 
> sliced indexes and projection indexes. 
> Furthermore, can I please know whether bloom filters have already been 
> implemented in Spark.
> 
> Thank you
> 
> On Thu, Jun 30, 2016 at 12:51 AM, Jörn Franke  > wrote:
> 
> Is it the traditional bitmap indexing? I would not recommend it for big data. 
> You could use bloom filters and min/max indexes in-memory which look to be 
> more appropriate. However, if you want to use bitmap indexes then you would 
> have to do it as you say. However, bitmap indexes may consume a lot of 
> memory, so I am not sure that simply caching them in-memory is desired.
> 
> > On 29 Jun 2016, at 19:49, Nishadi Kirielle  > > wrote:
> >
> > Hi All,
> >
> > I am a CSE undergraduate and as for our final year project, we are 
> > expecting to construct a cluster based, bit-oriented analytic platform 
> > (storage engine) to provide fast query performance when used for OLAP with 
> > the use of novel bitmap indexing techniques when and where appropriate.
> >
> > For that we are expecting to use Spark SQL. We will need to implement a way 
> > to cache the bit map indexes and in-cooperate the use of bitmap indexing at 
> > the catalyst optimizer level when it is possible.
> >
> > I would highly appreciate your feedback regarding the proposed approach.
> >
> > Thank you & Regards
> >
> > Nishadi Kirielle
> > Department of Computer Science and Engineering
> > University of Moratuwa
> > Sri Lanka
> 



Re: Spark 2.0 Performance drop

2016-06-29 Thread Michael Allman
The patch we use in production is for 1.5. We're porting the patch to master 
(and downstream to 2.0, which is presently very similar) with the intention of 
submitting a PR "soon". We'll push it here when it's ready: 
https://github.com/VideoAmp/spark-public.

Regarding benchmarking, we have a suite of Spark SQL regression tests which we 
run to check correctness and performance. I can share our findings when I have 
them.

Cheers,

Michael

> On Jun 29, 2016, at 2:39 PM, Maciej Bryński <mac...@brynski.pl> wrote:
> 
> 2016-06-29 23:22 GMT+02:00 Michael Allman <mich...@videoamp.com>:
>> I'm sorry I don't have any concrete advice for you, but I hope this helps 
>> shed some light on the current support in Spark for projection pushdown.
>> 
>> Michael
> 
> Michael,
> Thanks for the answer. This resolves one of my questions.
> Which Spark version you have patched ? 1.6 ? Are you planning to
> public this patch or just for 2.0 branch ?
> 
> I gladly help with some benchmark in my environment.
> 
> Regards,
> -- 
> Maciek Bryński


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Spark 2.0 Performance drop

2016-06-29 Thread Michael Allman
Hi Maciej,

In Spark, projection pushdown is currently limited to top-level columns 
(StructFields). VideoAmp has very large parquet-based tables (many billions of 
records accumulated per day) with deeply nested schema (four or five levels), 
and we've spent a considerable amount of time optimizing query performance on 
these tables.

We have a patch internally that extends Spark to support projection pushdown 
for arbitrarily nested fields. This has resulted in a *huge* performance 
improvement for many of our queries, like 10x to 100x in some cases.

I'm still putting the finishing touches on our port of this patch to Spark 
master and 2.0. We haven't done any specific benchmarking between versions, but 
I will do that when our patch is complete. We hope to contribute this 
functionality to the Spark project at some point in the near future, but it is 
not ready yet.

I'm sorry I don't have any concrete advice for you, but I hope this helps shed 
some light on the current support in Spark for projection pushdown.

Michael

> On Jun 29, 2016, at 1:48 PM, Maciej Bryński  wrote:
> 
> Hi,
> Did anyone measure performance of Spark 2.0 vs Spark 1.6 ?
> 
> I did some test on parquet file with many nested columns (about 30G in
> 400 partitions) and Spark 2.0 is sometimes 2x slower.
> 
> I tested following queries:
> 1) select count(*) where id > some_id
> In this query we have PPD and performance is similar. (about 1 sec)
> 
> 2) select count(*) where nested_column.id > some_id
> Spark 1.6 -> 1.6 min
> Spark 2.0 -> 2.1 min
> Is it normal that both version didn't do PPD ?
> 
> 3) Spark connected with python
> df.where('id > some_id').rdd.flatMap(lambda r: [r.id] if not r.id %
> 10 else []).collect()
> Spark 1.6 -> 2.3 min
> Spark 2.0 -> 4.6 min (2x slower)
> 
> I used BasicProfiler for this task and cumulative time was:
> Spark 1.6 - 4300 sec
> Spark 2.0 - 5800 sec
> 
> Should I expect such a drop in performance ?
> 
> BTW: why in Spark 2.0 Dataframe lost map and flatmap method ?
> 
> I don't know how to prepare sample data to show the problem.
> Any ideas ? Or public data with many nested columns ?
> 
> I'd like to create Jira for it but Apache server is down at the moment.
> 
> Regards,
> -- 
> Maciek Bryński
> 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> 


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Spark SQL PR looking for love...

2016-06-28 Thread Michael Allman
I should briefly mention what the PR is about... This is a patch to address a 
problem where non-empty partitioned Hive metastore tables are never returned in 
a cache lookup in HiveMetastoreCatalog.getCached.

Thanks,

Michael


> On Jun 28, 2016, at 3:27 PM, Michael Allman <mich...@videoamp.com> wrote:
> 
> Hello,
> 
> Do any Spark SQL committers/experts have bandwidth to review a PR I submitted 
> a week ago, https://github.com/apache/spark/pull/13818 
> <https://github.com/apache/spark/pull/13818>? The associated Jira ticket is 
> https://issues.apache.org/jira/browse/SPARK-15968 
> <https://issues.apache.org/jira/browse/SPARK-15968>.
> 
> Thank you!
> 
> Michael



Spark SQL PR looking for love...

2016-06-28 Thread Michael Allman
Hello,

Do any Spark SQL committers/experts have bandwidth to review a PR I submitted a 
week ago, https://github.com/apache/spark/pull/13818 
? The associated Jira ticket is 
https://issues.apache.org/jira/browse/SPARK-15968 
.

Thank you!

Michael

Re: reading/writing parquet decimal type

2014-10-23 Thread Michael Allman
Hi Matei,

Another thing occurred to me. Will the binary format you're writing sort the 
data in numeric order? Or would the decimals have to be decoded for comparison?

Cheers,

Michael


 On Oct 12, 2014, at 10:48 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 
 The fixed-length binary type can hold fewer bytes than an int64, though many 
 encodings of int64 can probably do the right thing. We can look into 
 supporting multiple ways to do this -- the spec does say that you should at 
 least be able to read int32s and int64s.
 
 Matei
 
 On Oct 12, 2014, at 8:20 PM, Michael Allman mich...@videoamp.com wrote:
 
 Hi Matei,
 
 Thanks, I can see you've been hard at work on this! I examined your patch 
 and do have a question. It appears you're limiting the precision of decimals 
 written to parquet to those that will fit in a long, yet you're writing the 
 values as a parquet binary type. Why not write them using the int64 parquet 
 type instead?
 
 Cheers,
 
 Michael
 
 On Oct 12, 2014, at 3:32 PM, Matei Zaharia matei.zaha...@gmail.com wrote:
 
 Hi Michael,
 
 I've been working on this in my repo: 
 https://github.com/mateiz/spark/tree/decimal. I'll make some pull requests 
 with these features soon, but meanwhile you can try this branch. See 
 https://github.com/mateiz/spark/compare/decimal for the individual commits 
 that went into it. It has exactly the precision stuff you need, plus some 
 optimizations for working on decimals.
 
 Matei
 
 On Oct 12, 2014, at 1:51 PM, Michael Allman mich...@videoamp.com wrote:
 
 Hello,
 
 I'm interested in reading/writing parquet SchemaRDDs that support the 
 Parquet Decimal converted type. The first thing I did was update the Spark 
 parquet dependency to version 1.5.0, as this version introduced support 
 for decimals in parquet. However, conversion between the catalyst decimal 
 type and the parquet decimal type is complicated by the fact that the 
 catalyst type does not specify a decimal precision and scale but the 
 parquet type requires them.
 
 I'm wondering if perhaps we could add an optional precision and scale to 
 the catalyst decimal type? The catalyst decimal type would have 
 unspecified precision and scale by default for backwards compatibility, 
 but users who want to serialize a SchemaRDD with decimal(s) to parquet 
 would have to narrow their decimal type(s) by specifying a precision and 
 scale.
 
 Thoughts?
 
 Michael
 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org
 
 
 
 


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Receiver/DStream storage level

2014-10-23 Thread Michael Allman
I'm implementing a custom ReceiverInputDStream and I'm not sure how to 
initialize the Receiver with the storage level. The storage level is set on the 
DStream, but there doesn't seem to be a way to pass it to the Receiver. At the 
same time, setting the storage level separately on the Receiver seems to 
introduce potential confusion as the storage level of the DStream can be set 
separately. Is this desired behavior---to have distinct DStream and Receiver 
storage levels? Perhaps I'm missing something? Also, the storageLevel property 
of the Receiver[T] class is undocumented.

Cheers,

Michael
-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



reading/writing parquet decimal type

2014-10-12 Thread Michael Allman
Hello,

I'm interested in reading/writing parquet SchemaRDDs that support the Parquet 
Decimal converted type. The first thing I did was update the Spark parquet 
dependency to version 1.5.0, as this version introduced support for decimals in 
parquet. However, conversion between the catalyst decimal type and the parquet 
decimal type is complicated by the fact that the catalyst type does not specify 
a decimal precision and scale but the parquet type requires them.

I'm wondering if perhaps we could add an optional precision and scale to the 
catalyst decimal type? The catalyst decimal type would have unspecified 
precision and scale by default for backwards compatibility, but users who want 
to serialize a SchemaRDD with decimal(s) to parquet would have to narrow their 
decimal type(s) by specifying a precision and scale.

Thoughts?

Michael
-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: reading/writing parquet decimal type

2014-10-12 Thread Michael Allman
Hi Matei,

Thanks, I can see you've been hard at work on this! I examined your patch and 
do have a question. It appears you're limiting the precision of decimals 
written to parquet to those that will fit in a long, yet you're writing the 
values as a parquet binary type. Why not write them using the int64 parquet 
type instead?

Cheers,

Michael

On Oct 12, 2014, at 3:32 PM, Matei Zaharia matei.zaha...@gmail.com wrote:

 Hi Michael,
 
 I've been working on this in my repo: 
 https://github.com/mateiz/spark/tree/decimal. I'll make some pull requests 
 with these features soon, but meanwhile you can try this branch. See 
 https://github.com/mateiz/spark/compare/decimal for the individual commits 
 that went into it. It has exactly the precision stuff you need, plus some 
 optimizations for working on decimals.
 
 Matei
 
 On Oct 12, 2014, at 1:51 PM, Michael Allman mich...@videoamp.com wrote:
 
 Hello,
 
 I'm interested in reading/writing parquet SchemaRDDs that support the 
 Parquet Decimal converted type. The first thing I did was update the Spark 
 parquet dependency to version 1.5.0, as this version introduced support for 
 decimals in parquet. However, conversion between the catalyst decimal type 
 and the parquet decimal type is complicated by the fact that the catalyst 
 type does not specify a decimal precision and scale but the parquet type 
 requires them.
 
 I'm wondering if perhaps we could add an optional precision and scale to the 
 catalyst decimal type? The catalyst decimal type would have unspecified 
 precision and scale by default for backwards compatibility, but users who 
 want to serialize a SchemaRDD with decimal(s) to parquet would have to 
 narrow their decimal type(s) by specifying a precision and scale.
 
 Thoughts?
 
 Michael
 -
 To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
 For additional commands, e-mail: dev-h...@spark.apache.org
 
 


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org