Re: StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-13 Thread Shivaram Venkataraman
Thanks Fred for the detailed reply. The stability points are
especially interesting as a goal for the streaming component in Spark.
In terms of next steps, one approach that might be helpful is trying
to create benchmarks or situations that mimic real-life workloads and
then we can work on isolating specific changes that are required etc.
It'd also be great to hear other approaches / next steps to concretize
some of these goals.

Thanks
Shivaram

On Thu, Oct 13, 2016 at 8:39 AM, Fred Reiss  wrote:
> On Tue, Oct 11, 2016 at 11:02 AM, Shivaram Venkataraman
>  wrote:
>>
>> >
>> Could you expand a little bit more on stability ? Is it just bursty
>> workloads in terms of peak vs. average throughput ? Also what level of
>> latencies do you find users care about ? Is it on the order of 2-3
>> seconds vs. 1 second vs. 100s of milliseconds ?
>> >
>
>
> Regarding stability, I've seen two levels of concrete requirements.
>
> The first is "don't bring down my Spark cluster". That is to say, regardless
> of the input data rate, Spark shouldn't thrash or crash outright. Processing
> may lag behind the data arrival rate, but the cluster should stay up and
> remain fully functional.
>
> The second level is "don't bring down my application". A common use for
> streaming systems is to handle heavyweight computations that are part of a
> larger application, like a web application, a mobile app, or a plant control
> system. For example, an online application for car insurance might need to
> do some pretty involved machine learning to produce an accurate quote and
> suggest good upsells to the customer. If the heavyweight portion times out,
> the whole application times out, and you lose a customer.
>
> In terms of bursty vs. non-bursty, the "don't bring down my Spark cluster
> case" is more about handling bursts, while the "don't bring down my
> application" case is more about delivering acceptable end-to-end response
> times under typical load.
>
> Regarding latency: One group I talked to mentioned requirements in the
> 100-200 msec range, driven by the need to display a web page on a browser or
> mobile device. Another group in the Internet of Things space mentioned times
> ranging from 5 seconds to 30 seconds throughout the conversation. But most
> people I've talked to have been pretty vague about specific numbers.
>
> My impression is that these groups are not motivated by anxiety about
> meeting a particular latency target for a particular application. Rather,
> they want to make low latency the norm so that they can stop having to think
> about latency. Today, low latency is a special requirement of special
> applications. But that policy imposes a lot of hidden costs. IT architects
> have to spend time estimating the latency requirements of every application
> and lobbying for special treatment when those requirements are strict.
> Managers have to spend time engineering business processes around latency.
> Data scientists have to spend time packaging up models and negotiating how
> those models will be shipped over to the low-latency serving tier. And
> customers who are accustomed to Google and smartphones end up with an
> experience that is functional but unsatisfying.
>
> It's best to think of latency as a sliding scale. A given level of latency
> imposes a given level of cost enterprise-wide. Someone who is making a
> decision on middleware policy will balance this cost against other costs:
> How much does it cost to deploy the middleware? How much does it cost to
> train developers to use the system? The winner will be the system that
> minimizes the overall cost.
>
> Fred

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Regularized Logistic regression

2016-10-13 Thread Seth Hendrickson
Spark MLlib provides a cross-validation toolkit for selecting
hyperparameters. I think you'll find the documentation quite helpful:

http://spark.apache.org/docs/latest/ml-tuning.html#example-model-selection-via-cross-validation

There is actually a python example for logistic regression there. If you
still have questions after reading it, then please post back again.

Hope that helps.

On Thu, Oct 13, 2016 at 12:58 PM, aditya1702  wrote:

> Ok so I tried setting the regParam and tried lowering it. how do I evaluate
> which regParam is best. Do I have to to do it by trial and error. I am
> currently calculating the log_loss for the model. Is it good to find the
> best regparam value. here is my code:
>
> from math import exp,log
> #from pyspark.sql.functions import log
> epsilon = 1e-16
> def sigmoid_log_loss(w,x):
>   ans=float(1/(1+exp(-(w.dot(x.features)
>   if ans==0:
> ans=ans+epsilon
>   if ans==1:
> ans=ans-epsilon
>   log_loss=-((x.label)*log(ans)+(1-x.label)*log(1-ans))
>   return ((ans,x.label),log_loss)
>
> ---
> reg=0.02
> from pyspark.ml.classification import LogisticRegression
> lr=LogisticRegression(regParam=reg,maxIter=500,standardization=True,
> elasticNetParam=0.5)
> model=lr.fit(data_train_df)
>
> w=model.coefficients
> intercept=model.intercept
> data_predicted_df=data_val_df.map(lambda x:(sigmoid_log_loss(w,x)))
> log_loss=data_predicted_df.map(lambda x:x[1]).mean()
> print log_loss
>
>
>
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/Regularized-Logistic-regression-
> tp19432p19444.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: DataFrameReader Schema Supersedes Schema Provided by Encoder, Renders Fields Nullable

2016-10-13 Thread Michael Armbrust
There is a  lot
 of
 confusion
 around
 nullable
 in
 StructType
 and we should definitly
come up with a consistent story and make sure we have better
documentation.  This might even mean deprecating this field.  At a high
level, I think the key problem is that internally, nullable is used as an
optimization, not an enforcement mechanism.  This is a lot different than
NOT NULL in a traditional database. Specifically, we take "nulllable =
false" as a promise that that column will never be null and use that fact
to do optimizations like skipping null checks.  This means that if you lie
to us, you actually can get wrong answers (i.e. 0 instead of null).  This
is clearly confusing and not ideal.

A little bit of explanation for some of the specific cases you brought up:
the reason that we call asNullable on file sources is that even if that
column is never null in the data, there are cases that can still produce a
null result.  For example, when parsing JSON, we null out all columns other
than _corrupt_record when we fail to parse a line.  The fact that its
different in streaming is a bug.

Would you mind opening up a JIRA ticket, and we discuss the right path
forward there?

On Thu, Oct 13, 2016 at 12:35 PM, Aleksander Eskilson <
aleksander...@gmail.com> wrote:

> Hi there,
>
> Working in the space of custom Encoders/ExpressionEncoders, I've noticed
> that the StructType schema as set when creating an object of the
> ExpressionEncoder[T] class [1] is not the schema actually used to set types
> for the columns of a Dataset, as created by using the .as(encoder) method
> [2] on read data. Instead, what occurs is that the schema is either
> inferred through analysis of the data, or a schema can be provided using
> the .schema(structType) method [3] of the DataFrameReader. However, when
> using the .schema(..) method of DataFrameReader, potentially undesirable
> behaviour occurs: while the DataSource is being resolved, all FieldTypes of
> the a StructType schema have their nullability set to *true* (using the
> asNullable function of StructTypes) [4] when the data is read from a local
> file, as opposed to a non-streaming source.
>
> Of course, allowing null-values where they shouldn't exist can weaken the
> type-guarantees for DataSets over certain types of encoded data.
>
> Thinking on how this might be resolved, first, if it's a legitimate bug,
> I'm not sure why "non-streaming file based" datasources need to have their
> StructFields all rendered nullable. Simply removing the call to asNullable
> would fix the issue. Second, if it's actually necessary for most
> filesystem-read data-sources to have their StructFields potentially
> nullable in this manner, we could instead let the StructType schema
> provided to the Encoder have the final say in the DataSet's schema.
>
> This latter option seems sensible to me: if a client is willing to provide
> a custom Encoder via the .as(..) method on the reader, presumably in
> setting the schema field of the encoder they have some legitimate notion of
> how their object's types should be mapped to DataSet column types. Any
> failure when resolving their data to a DataSet by means of their Encoder
> can then be traced to their Encoder for their own debugging.
>
> Thoughts? Thanks,
> Alek Eskilson
>
> [1] - https://github.com/apache/spark/blob/master/sql/
> catalyst/src/main/scala/org/apache/spark/sql/catalyst/
> encoders/ExpressionEncoder.scala#L213
> [2] - https://github.com/apache/spark/blob/master/sql/core/
> src/main/scala/org/apache/spark/sql/Dataset.scala#L374
> [3] - https://github.com/apache/spark/blob/master/sql/core/
> src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L62
> [4] - https://github.com/apache/spark/blob/master/sql/core/
> src/main/scala/org/apache/spark/sql/execution/
> datasources/DataSource.scala#L426
>


RE: Regularized Logistic regression

2016-10-13 Thread aditya1702
Ok so I tried setting the regParam and tried lowering it. how do I evaluate
which regParam is best. Do I have to to do it by trial and error. I am
currently calculating the log_loss for the model. Is it good to find the
best regparam value. here is my code:

from math import exp,log
#from pyspark.sql.functions import log
epsilon = 1e-16
def sigmoid_log_loss(w,x):
  ans=float(1/(1+exp(-(w.dot(x.features)
  if ans==0:
ans=ans+epsilon
  if ans==1:
ans=ans-epsilon
  log_loss=-((x.label)*log(ans)+(1-x.label)*log(1-ans))
  return ((ans,x.label),log_loss)

---
reg=0.02
from pyspark.ml.classification import LogisticRegression
lr=LogisticRegression(regParam=reg,maxIter=500,standardization=True,elasticNetParam=0.5)
model=lr.fit(data_train_df)

w=model.coefficients
intercept=model.intercept
data_predicted_df=data_val_df.map(lambda x:(sigmoid_log_loss(w,x)))
log_loss=data_predicted_df.map(lambda x:x[1]).mean()
print log_loss



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Regularized-Logistic-regression-tp19432p19444.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



RE: Regularized Logistic regression

2016-10-13 Thread aditya1702
Thank you Anurag Verma for replying. I tried increasing the iterations.
However I still get underfitted results. I am checking the model's
prediction by seeing how many pairs of labels and predictions it gets right

data_predict_with_model=best_model.transform(data_test_df)
final_pred_df=data_predict_with_model.select(col('label'),col('prediction'))
ans=final_pred_df.map(lambda x:((x[0],x[1]),1)).reduceByKey(lambda
a,b:a+b).toDF()
ans.show()

-+---+
|   _1| _2|
+-+---+
|[1.0,1.0]|  5|
|[0.0,1.0]| 12|
+-+---+

Do you know any other methods by which I can check the model? and what is it
that I am doing wrong. I have filtered the data and arranged it in a
features and label column. So now only the model creation part is wrong I
guess. Can anyone help me please. I am still learning machine learning.



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Regularized-Logistic-regression-tp19432p19443.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



DataFrameReader Schema Supersedes Schema Provided by Encoder, Renders Fields Nullable

2016-10-13 Thread Aleksander Eskilson
Hi there,

Working in the space of custom Encoders/ExpressionEncoders, I've noticed
that the StructType schema as set when creating an object of the
ExpressionEncoder[T] class [1] is not the schema actually used to set types
for the columns of a Dataset, as created by using the .as(encoder) method
[2] on read data. Instead, what occurs is that the schema is either
inferred through analysis of the data, or a schema can be provided using
the .schema(structType) method [3] of the DataFrameReader. However, when
using the .schema(..) method of DataFrameReader, potentially undesirable
behaviour occurs: while the DataSource is being resolved, all FieldTypes of
the a StructType schema have their nullability set to *true* (using the
asNullable function of StructTypes) [4] when the data is read from a local
file, as opposed to a non-streaming source.

Of course, allowing null-values where they shouldn't exist can weaken the
type-guarantees for DataSets over certain types of encoded data.

Thinking on how this might be resolved, first, if it's a legitimate bug,
I'm not sure why "non-streaming file based" datasources need to have their
StructFields all rendered nullable. Simply removing the call to asNullable
would fix the issue. Second, if it's actually necessary for most
filesystem-read data-sources to have their StructFields potentially
nullable in this manner, we could instead let the StructType schema
provided to the Encoder have the final say in the DataSet's schema.

This latter option seems sensible to me: if a client is willing to provide
a custom Encoder via the .as(..) method on the reader, presumably in
setting the schema field of the encoder they have some legitimate notion of
how their object's types should be mapped to DataSet column types. Any
failure when resolving their data to a DataSet by means of their Encoder
can then be traced to their Encoder for their own debugging.

Thoughts? Thanks,
Alek Eskilson

[1] -
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala#L213
[2] -
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L374
[3] -
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L62
[4] -
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala#L426


Re: DAGScheduler.handleJobCancellation uses jobIdToStageIds for verification while jobIdToActiveJob for lookup?

2016-10-13 Thread Mark Hamstra
There were at least a couple of ideas behind the original thinking on using
both of those Maps: 1) We needed the ability to efficiently get from jobId
to both ActiveJob objects and to sets of associated Stages, and using both
Maps here was an opportunity to do a little sanity checking to make sure
that the Maps looked at least minimally consistent for the Job at issue; 2)
Similarly, it could serve as a kind of hierarchical check -- first, for the
Job which we are being asked to cancel, that we ever knew enough to even
register its existence; second, that for a JobId that passes the first
test, that we still have an ActiveJob that can be canceled.

Now, without doing a bunch of digging into the code archives, I can't tell
you for sure whether those ideas were ever implemented completely correctly
or whether they still make valid sense in the current code, but from
looking at the lines that you highlighted, I can tell you that even if the
ideas still make sense and are worth carrying forward, the current code
doesn't implement them correctly.  In particular, if it is possible for the
`jobId` to not be in `jobIdToActiveJob`, we're going to produce a
`NoSuchElementException` for the missing key instead of handling it or even
reporting it in a more useful way.

On Thu, Oct 13, 2016 at 8:11 AM, Jacek Laskowski  wrote:

> Thanks Imran! Not only did the response come so promptly, but also
> it's something I could work on (and have another Spark contributor
> badge unlocked)! Thanks.
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Thu, Oct 13, 2016 at 5:02 PM, Imran Rashid 
> wrote:
> > Hi Jacek,
> >
> > doesn't look like there is any good reason -- Mark Hamstra might know
> this
> > best.  Feel free to open a jira & pr for it, you can ping Mark, Kay
> > Ousterhout, and me (@squito) for review.
> >
> > Imran
> >
> > On Thu, Oct 13, 2016 at 7:56 AM, Jacek Laskowski 
> wrote:
> >>
> >> Hi,
> >>
> >> Is there a reason why DAGScheduler.handleJobCancellation checks the
> >> active job id in jobIdToStageIds [1] while looking the job up in
> >> jobIdToActiveJob [2]? Perhaps synchronized earlier yet still
> >> inconsistent.
> >>
> >> [1]
> >> https://github.com/apache/spark/blob/master/core/src/
> main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1372
> >> [2]
> >> https://github.com/apache/spark/blob/master/core/src/
> main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1376
> >>
> >> Pozdrawiam,
> >> Jacek Laskowski
> >> 
> >> https://medium.com/@jaceklaskowski/
> >> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> >> Follow me at https://twitter.com/jaceklaskowski
> >>
> >> -
> >> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> >>
> >
>


Re: Python Spark Improvements (forked from Spark Improvement Proposals)

2016-10-13 Thread Holden Karau
Awesome, good points everyone. The ranking of the issues is super useful
and I'd also completely forgotten about the lack of built in UDAF support
which is rather important. There is a PR to make it easier to call/register
JVM UDFs from Python which will hopefully help a bit there too. I'm getting
on a flight to London for OSCON but I want to continueo encourage users to
chime in with their experiences (to that end I'm trying to re include user@
since it doesn't seem to have been posted there despite my initial attempt
to do so.)

On Thursday, October 13, 2016, assaf.mendelson 
wrote:

> Hi,
>
> We are actually using pyspark heavily.
>
> I agree with all of your points,  for me I see the following as the main
> hurdles:
>
> 1.   Pyspark does not have support for UDAF. We have had multiple
> needs for UDAF and needed to go to java/scala to support these. Having
> python UDAF would have made life much easier (especially at earlier stages
> when we prototype).
>
> 2.   Performance. I cannot stress this enough. Currently we have
> engineers who take python UDFs and convert them to scala UDFs for
> performance. We are currently even looking at writing UDFs and UDAFs in a
> more native way (e.g. using expressions) to improve performance but working
> with pyspark can be really problematic.
>
>
>
> BTW, other than using jython or arrow, I believe there are a couple of
> other ways to get improve performance:
>
> 1.   Python provides tool to generate AST for python code (
> https://docs.python.org/2/library/ast.html). This means we can use the
> AST to construct scala code very similar to how expressions are build for
> native spark functions in scala. Of course doing full conversion is very
> hard but at least handling simple cases should be simple.
>
> 2.   The above would of course be limited if we use python packages
> but over time it is possible to add some “translation” tools (i.e. take
> python packages and find the appropriate scala equivalent. We can even
> provide this to the user to supply their own conversions thereby looking as
> a regular python code but being converted to scala code behind the scenes).
>
> 3.   In scala, it is possible to use codegen to actually generate
> code from a string. There is no reason why we can’t write the expression in
> python and provide a scala string. This would mean learning some scala but
> would mean we do not have to create a separate code tree.
>
>
>
> BTW, the fact that all of the tools to access java are marked as private
> has me a little worried. Nearly all of our UDFs (and all of our UDAFs) are
> written in scala for performance. The wrapping to provide them in python
> uses way too many private elements for my taste.
>
>
>
>
>
> *From:* msukmanowsky [via Apache Spark Developers List] [mailto:ml-node+
> [hidden email]
> ]
> *Sent:* Thursday, October 13, 2016 3:51 AM
> *To:* Mendelson, Assaf
> *Subject:* Re: Python Spark Improvements (forked from Spark Improvement
> Proposals)
>
>
>
> As very heavy Spark users at Parse.ly, I just wanted to give a +1 to all
> of the issues raised by Holden and Ricardo. I'm also giving a talk at PyCon
> Canada on PySpark https://2016.pycon.ca/en/schedule/096-mike-sukmanowsky/.
>
>
> Being a Python shop, we were extremely pleased to learn about PySpark a
> few years ago as our main ETL pipeline used Apache Pig at the time. I was
> one of the only folks who understood Pig and Java so collaborating on this
> as a team was difficult.
>
> Spark provided a means for the entire team to collaborate, but we've hit
> our fair share of issues all of which are enumerated in this thread.
>
> Besides giving a +1 here, I think if I were to force rank these items for
> us, it'd be:
>
> 1. Configuration difficulties: we've lost literally weeks to
> troubleshooting memory issues for larger jobs. It took a long time to even
> understand *why* certain jobs were failing since Spark would just report
> executors being lost. Finally we tracked things down to understanding that
> spark.yarn.executor.memoryOverhead controls the portion of memory
> reserved for Python processes, but none of this is documented anywhere as
> far as I can tell. We discovered this via trial and error. Both
> documentation and better defaults for this setting when running a PySpark
> application are probably sufficient. We've also had a number of troubles
> with saving Parquet output as part of an ETL flow, but perhaps we'll save
> that for a blog post of its own.
>
> 2. Dependency management: I've tried to help move the conversation on
> https://issues.apache.org/jira/browse/SPARK-13587 but it seems we're a
> bit stalled. Installing the required dependencies for a PySpark application
> is a really messy ordeal right now.
>
> 3. Development workflow: I'd combine both "incomprehensible error
> messages" and "
> difficulty using PySpark from 

Re: StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-13 Thread Holden Karau
This is a thing I often have people ask me about, and then I do my best
dissuade them from using Spark in the "hot path" and it's normally
something which most people eventually accept. Fred might have more
information for people for whom this is a hard requirement though.

On Thursday, October 13, 2016, Cody Koeninger  wrote:

> I've always been confused as to why it would ever be a good idea to
> put any streaming query system on the critical path for synchronous  <
> 100msec requests.  It seems to make a lot more sense to have a
> streaming system do asynch updates of a store that has better latency
> and quality of service characteristics for multiple users.  Then your
> only latency concerns are event to update, not request to response.
>
> On Thu, Oct 13, 2016 at 10:39 AM, Fred Reiss  > wrote:
> > On Tue, Oct 11, 2016 at 11:02 AM, Shivaram Venkataraman
> > > wrote:
> >>
> >> >
> >> Could you expand a little bit more on stability ? Is it just bursty
> >> workloads in terms of peak vs. average throughput ? Also what level of
> >> latencies do you find users care about ? Is it on the order of 2-3
> >> seconds vs. 1 second vs. 100s of milliseconds ?
> >> >
> >
> >
> > Regarding stability, I've seen two levels of concrete requirements.
> >
> > The first is "don't bring down my Spark cluster". That is to say,
> regardless
> > of the input data rate, Spark shouldn't thrash or crash outright.
> Processing
> > may lag behind the data arrival rate, but the cluster should stay up and
> > remain fully functional.
> >
> > The second level is "don't bring down my application". A common use for
> > streaming systems is to handle heavyweight computations that are part of
> a
> > larger application, like a web application, a mobile app, or a plant
> control
> > system. For example, an online application for car insurance might need
> to
> > do some pretty involved machine learning to produce an accurate quote and
> > suggest good upsells to the customer. If the heavyweight portion times
> out,
> > the whole application times out, and you lose a customer.
> >
> > In terms of bursty vs. non-bursty, the "don't bring down my Spark cluster
> > case" is more about handling bursts, while the "don't bring down my
> > application" case is more about delivering acceptable end-to-end response
> > times under typical load.
> >
> > Regarding latency: One group I talked to mentioned requirements in the
> > 100-200 msec range, driven by the need to display a web page on a
> browser or
> > mobile device. Another group in the Internet of Things space mentioned
> times
> > ranging from 5 seconds to 30 seconds throughout the conversation. But
> most
> > people I've talked to have been pretty vague about specific numbers.
> >
> > My impression is that these groups are not motivated by anxiety about
> > meeting a particular latency target for a particular application. Rather,
> > they want to make low latency the norm so that they can stop having to
> think
> > about latency. Today, low latency is a special requirement of special
> > applications. But that policy imposes a lot of hidden costs. IT
> architects
> > have to spend time estimating the latency requirements of every
> application
> > and lobbying for special treatment when those requirements are strict.
> > Managers have to spend time engineering business processes around
> latency.
> > Data scientists have to spend time packaging up models and negotiating
> how
> > those models will be shipped over to the low-latency serving tier. And
> > customers who are accustomed to Google and smartphones end up with an
> > experience that is functional but unsatisfying.
> >
> > It's best to think of latency as a sliding scale. A given level of
> latency
> > imposes a given level of cost enterprise-wide. Someone who is making a
> > decision on middleware policy will balance this cost against other costs:
> > How much does it cost to deploy the middleware? How much does it cost to
> > train developers to use the system? The winner will be the system that
> > minimizes the overall cost.
> >
> > Fred
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org 
>
>

-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-13 Thread Cody Koeninger
I've always been confused as to why it would ever be a good idea to
put any streaming query system on the critical path for synchronous  <
100msec requests.  It seems to make a lot more sense to have a
streaming system do asynch updates of a store that has better latency
and quality of service characteristics for multiple users.  Then your
only latency concerns are event to update, not request to response.

On Thu, Oct 13, 2016 at 10:39 AM, Fred Reiss  wrote:
> On Tue, Oct 11, 2016 at 11:02 AM, Shivaram Venkataraman
>  wrote:
>>
>> >
>> Could you expand a little bit more on stability ? Is it just bursty
>> workloads in terms of peak vs. average throughput ? Also what level of
>> latencies do you find users care about ? Is it on the order of 2-3
>> seconds vs. 1 second vs. 100s of milliseconds ?
>> >
>
>
> Regarding stability, I've seen two levels of concrete requirements.
>
> The first is "don't bring down my Spark cluster". That is to say, regardless
> of the input data rate, Spark shouldn't thrash or crash outright. Processing
> may lag behind the data arrival rate, but the cluster should stay up and
> remain fully functional.
>
> The second level is "don't bring down my application". A common use for
> streaming systems is to handle heavyweight computations that are part of a
> larger application, like a web application, a mobile app, or a plant control
> system. For example, an online application for car insurance might need to
> do some pretty involved machine learning to produce an accurate quote and
> suggest good upsells to the customer. If the heavyweight portion times out,
> the whole application times out, and you lose a customer.
>
> In terms of bursty vs. non-bursty, the "don't bring down my Spark cluster
> case" is more about handling bursts, while the "don't bring down my
> application" case is more about delivering acceptable end-to-end response
> times under typical load.
>
> Regarding latency: One group I talked to mentioned requirements in the
> 100-200 msec range, driven by the need to display a web page on a browser or
> mobile device. Another group in the Internet of Things space mentioned times
> ranging from 5 seconds to 30 seconds throughout the conversation. But most
> people I've talked to have been pretty vague about specific numbers.
>
> My impression is that these groups are not motivated by anxiety about
> meeting a particular latency target for a particular application. Rather,
> they want to make low latency the norm so that they can stop having to think
> about latency. Today, low latency is a special requirement of special
> applications. But that policy imposes a lot of hidden costs. IT architects
> have to spend time estimating the latency requirements of every application
> and lobbying for special treatment when those requirements are strict.
> Managers have to spend time engineering business processes around latency.
> Data scientists have to spend time packaging up models and negotiating how
> those models will be shipped over to the low-latency serving tier. And
> customers who are accustomed to Google and smartphones end up with an
> experience that is functional but unsatisfying.
>
> It's best to think of latency as a sliding scale. A given level of latency
> imposes a given level of cost enterprise-wide. Someone who is making a
> decision on middleware policy will balance this cost against other costs:
> How much does it cost to deploy the middleware? How much does it cost to
> train developers to use the system? The winner will be the system that
> minimizes the overall cost.
>
> Fred

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: StructuredStreaming Custom Sinks (motivated by Structured Streaming Machine Learning)

2016-10-13 Thread Fred Reiss
On Tue, Oct 11, 2016 at 11:02 AM, Shivaram Venkataraman <
shiva...@eecs.berkeley.edu> wrote:
>
> >
> Could you expand a little bit more on stability ? Is it just bursty
> workloads in terms of peak vs. average throughput ? Also what level of
> latencies do you find users care about ? Is it on the order of 2-3
> seconds vs. 1 second vs. 100s of milliseconds ?
> >
>

Regarding stability, I've seen two levels of concrete requirements.

The first is "don't bring down my Spark cluster". That is to say,
regardless of the input data rate, Spark shouldn't thrash or crash
outright. Processing may lag behind the data arrival rate, but the cluster
should stay up and remain fully functional.

The second level is "don't bring down my application". A common use for
streaming systems is to handle heavyweight computations that are part of a
larger application, like a web application, a mobile app, or a plant
control system. For example, an online application for car insurance might
need to do some pretty involved machine learning to produce an accurate
quote and suggest good upsells to the customer. If the heavyweight portion
times out, the whole application times out, and you lose a customer.

In terms of bursty vs. non-bursty, the "don't bring down my Spark cluster
case" is more about handling bursts, while the "don't bring down my
application" case is more about delivering acceptable end-to-end response
times under typical load.

Regarding latency: One group I talked to mentioned requirements in the
100-200 msec range, driven by the need to display a web page on a browser
or mobile device. Another group in the Internet of Things space mentioned
times ranging from 5 seconds to 30 seconds throughout the conversation. But
most people I've talked to have been pretty vague about specific numbers.

My impression is that these groups are not motivated by anxiety about
meeting a particular latency target for a particular application. Rather,
they want to make low latency the norm so that they can stop having to
think about latency. Today, low latency is a special requirement of special
applications. But that policy imposes a lot of hidden costs. IT architects
have to spend time estimating the latency requirements of every application
and lobbying for special treatment when those requirements are strict.
Managers have to spend time engineering business processes around latency.
Data scientists have to spend time packaging up models and negotiating how
those models will be shipped over to the low-latency serving tier. And
customers who are accustomed to Google and smartphones end up with an
experience that is functional but unsatisfying.

It's best to think of latency as a sliding scale. A given level of latency
imposes a given level of cost enterprise-wide. Someone who is making a
decision on middleware policy will balance this cost against other costs:
How much does it cost to deploy the middleware? How much does it cost to
train developers to use the system? The winner will be the system that
minimizes the overall cost.

Fred


Re: DAGScheduler.handleJobCancellation uses jobIdToStageIds for verification while jobIdToActiveJob for lookup?

2016-10-13 Thread Imran Rashid
Hi Jacek,

doesn't look like there is any good reason -- Mark Hamstra might know this
best.  Feel free to open a jira & pr for it, you can ping Mark, Kay
Ousterhout, and me (@squito) for review.

Imran

On Thu, Oct 13, 2016 at 7:56 AM, Jacek Laskowski  wrote:

> Hi,
>
> Is there a reason why DAGScheduler.handleJobCancellation checks the
> active job id in jobIdToStageIds [1] while looking the job up in
> jobIdToActiveJob [2]? Perhaps synchronized earlier yet still
> inconsistent.
>
> [1] https://github.com/apache/spark/blob/master/core/src/main/
> scala/org/apache/spark/scheduler/DAGScheduler.scala#L1372
> [2] https://github.com/apache/spark/blob/master/core/src/main/
> scala/org/apache/spark/scheduler/DAGScheduler.scala#L1376
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


DAGScheduler.handleJobCancellation uses jobIdToStageIds for verification while jobIdToActiveJob for lookup?

2016-10-13 Thread Jacek Laskowski
Hi,

Is there a reason why DAGScheduler.handleJobCancellation checks the
active job id in jobIdToStageIds [1] while looking the job up in
jobIdToActiveJob [2]? Perhaps synchronized earlier yet still
inconsistent.

[1] 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1372
[2] 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1376

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



RE: Regularized Logistic regression

2016-10-13 Thread Anurag Verma
Probably your regularization parameter is set too high. Try regParam=0.1/
0.2  Also you should probably increase the number to iteration to something
like 500. Additionally you can specify elasticNetParam (between 0 and 1).

-Original Message-
From: aditya1702 [mailto:adityavya...@gmail.com] 
Sent: Thursday, October 13, 2016 3:45 PM
To: dev@spark.apache.org
Subject: Regularized Logistic regression

Hello, I am trying to solve a problem using regularized logistic regression
in spark. I am using the model created by LogisticRegression():

lr=LogisticRegression(regParam=10.0,maxIter=10,standardization=True)
model=lr.fit(data_train_df)
data_predict_with_model=model.transform(data_test_df)

However I am not able to get proper results. Can anyone tell me whether we
have to pass any other parameters in the model?



--
View this message in context:
http://apache-spark-developers-list.1001551.n3.nabble.com/Regularized-Logist
ic-regression-tp19432.html
Sent from the Apache Spark Developers List mailing list archive at
Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Regularized Logistic regression

2016-10-13 Thread aditya1702
Hello, I am trying to solve a problem using regularized logistic regression
in spark. I am using the model created by LogisticRegression():

lr=LogisticRegression(regParam=10.0,maxIter=10,standardization=True)
model=lr.fit(data_train_df)
data_predict_with_model=model.transform(data_test_df)

However I am not able to get proper results. Can anyone tell me whether we
have to pass any other parameters in the model?



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Regularized-Logistic-regression-tp19432.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



RE: Python Spark Improvements (forked from Spark Improvement Proposals)

2016-10-13 Thread assaf.mendelson
Hi,
We are actually using pyspark heavily.
I agree with all of your points,  for me I see the following as the main 
hurdles:

1.   Pyspark does not have support for UDAF. We have had multiple needs for 
UDAF and needed to go to java/scala to support these. Having python UDAF would 
have made life much easier (especially at earlier stages when we prototype).

2.   Performance. I cannot stress this enough. Currently we have engineers 
who take python UDFs and convert them to scala UDFs for performance. We are 
currently even looking at writing UDFs and UDAFs in a more native way (e.g. 
using expressions) to improve performance but working with pyspark can be 
really problematic.

BTW, other than using jython or arrow, I believe there are a couple of other 
ways to get improve performance:

1.   Python provides tool to generate AST for python code 
(https://docs.python.org/2/library/ast.html). This means we can use the AST to 
construct scala code very similar to how expressions are build for native spark 
functions in scala. Of course doing full conversion is very hard but at least 
handling simple cases should be simple.

2.   The above would of course be limited if we use python packages but 
over time it is possible to add some "translation" tools (i.e. take python 
packages and find the appropriate scala equivalent. We can even provide this to 
the user to supply their own conversions thereby looking as a regular python 
code but being converted to scala code behind the scenes).

3.   In scala, it is possible to use codegen to actually generate code from 
a string. There is no reason why we can't write the expression in python and 
provide a scala string. This would mean learning some scala but would mean we 
do not have to create a separate code tree.

BTW, the fact that all of the tools to access java are marked as private has me 
a little worried. Nearly all of our UDFs (and all of our UDAFs) are written in 
scala for performance. The wrapping to provide them in python uses way too many 
private elements for my taste.


From: msukmanowsky [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n19426...@n3.nabble.com]
Sent: Thursday, October 13, 2016 3:51 AM
To: Mendelson, Assaf
Subject: Re: Python Spark Improvements (forked from Spark Improvement Proposals)

As very heavy Spark users at Parse.ly, I just wanted to give a +1 to all of the 
issues raised by Holden and Ricardo. I'm also giving a talk at PyCon Canada on 
PySpark https://2016.pycon.ca/en/schedule/096-mike-sukmanowsky/.

Being a Python shop, we were extremely pleased to learn about PySpark a few 
years ago as our main ETL pipeline used Apache Pig at the time. I was one of 
the only folks who understood Pig and Java so collaborating on this as a team 
was difficult.

Spark provided a means for the entire team to collaborate, but we've hit our 
fair share of issues all of which are enumerated in this thread.

Besides giving a +1 here, I think if I were to force rank these items for us, 
it'd be:

1. Configuration difficulties: we've lost literally weeks to troubleshooting 
memory issues for larger jobs. It took a long time to even understand *why* 
certain jobs were failing since Spark would just report executors being lost. 
Finally we tracked things down to understanding that 
spark.yarn.executor.memoryOverhead controls the portion of memory reserved for 
Python processes, but none of this is documented anywhere as far as I can tell. 
We discovered this via trial and error. Both documentation and better defaults 
for this setting when running a PySpark application are probably sufficient. 
We've also had a number of troubles with saving Parquet output as part of an 
ETL flow, but perhaps we'll save that for a blog post of its own.

2. Dependency management: I've tried to help move the conversation on 
https://issues.apache.org/jira/browse/SPARK-13587 but it seems we're a bit 
stalled. Installing the required dependencies for a PySpark application is a 
really messy ordeal right now.

3. Development workflow: I'd combine both "incomprehensible error messages" and 
"
difficulty using PySpark from outside of spark-submit / pyspark shell" here. 
When teaching PySpark to new users, I'm reminded of how much inside knowledge 
is needed to overcome esoteric errors. As one example is hitting 
"PicklingError: Could not pickle object as excessively deep recursion 
required." errors. New users often do something innocent like try to pickle a 
global logging object and hit this and begin the Google -> stackoverflow search 
to try to comprehend what's going on. You can lose days to errors like these 
and they completely kill the productivity flow and send you hunting for 
alternatives.

4. Speed/performance: we are trying to use DataFrame/DataSets where we can and 
do as much in Java as possible but when we do move to Python, we're well aware 
that we're about to take a hit on performance. We're very keen to see what 
Apache 

RE: Official Stance on Not Using Spark Submit

2016-10-13 Thread assaf.mendelson
I actually not use spark submit for several use cases, all of them currently 
revolve around running it directly with python.
One of the most important ones is developing in pycharm.
Basically I have am using pycharm and configure it with a remote interpreter 
which runs on the server while my pycharm runs on my local windows machine.
In order for me to be able to effectively debug (stepping etc.), I want to 
define a run configuration in pycharm which would integrate fully with its 
debug tools. Unfortunately I couldn’t figure out a way to use spark-submit 
effectively. Instead I chose the following solution:
I defined the project to use the remorete interpreter running on the driver in 
the cluster.
I defined environment variables in the run configuration including setting 
PYTHONPATH to include pyspark and py4j manually, set up the relevant 
PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON, set up PYSPARK_SUBMIT_ARGS to include 
relevant configurations (e.g. relevant jars) and made sure it ended with 
pyspark-shell.

By providing this type of behavior I could debug spark remotely as if it was 
local.

Similar use cases include using standard tools that know how to run “python” 
script but are not aware of spark-submit.

I haven’t found similar reasons for scala/java code though (although I wish 
there was a similar “remote” setup for scala).
Assaf.


From: RussS [via Apache Spark Developers List] 
[mailto:ml-node+s1001551n19384...@n3.nabble.com]
Sent: Monday, October 10, 2016 9:14 PM
To: Mendelson, Assaf
Subject: Re: Official Stance on Not Using Spark Submit

Just folks who don't want to use spark-submit, no real use-cases I've seen yet.

I didn't know about SparkLauncher myself and I don't think there are any 
official docs on that or launching spark as an embedded library for tests.

On Mon, Oct 10, 2016 at 11:09 AM Matei Zaharia <[hidden 
email]> wrote:
What are the main use cases you've seen for this? Maybe we can add a page to 
the docs about how to launch Spark as an embedded library.

Matei

On Oct 10, 2016, at 10:21 AM, Russell Spitzer <[hidden 
email]> wrote:

I actually had not seen SparkLauncher before, that looks pretty great :)

On Mon, Oct 10, 2016 at 10:17 AM Russell Spitzer <[hidden 
email]> wrote:
I'm definitely only talking about non-embedded uses here as I also use embedded 
Spark (cassandra, and kafka) to run tests. This is almost always safe since 
everything is in the same JVM. It's only once we get to launching against a 
real distributed env do we end up with issues.

Since Pyspark uses spark submit in the java gateway i'm not sure if that 
matters :)

The cases I see are usually usually going through main directly, adding jars 
programatically.

Usually ends up with classpath errors (Spark not on the CP, their jar not on 
the CP, dependencies not on the cp),
conf errors (executors have the incorrect environment, executor classpath 
broken, not understanding spark-defaults won't do anything),
Jar version mismatches
Etc ...

On Mon, Oct 10, 2016 at 10:05 AM Sean Owen <[hidden 
email]> wrote:
I have also 'embedded' a Spark driver without much trouble. It isn't that it 
can't work.

The Launcher API is ptobably the recommended way to do that though. 
spark-submit is the way to go for non programmatic access.

If you're not doing one of those things and it is not working, yeah I think 
people would tell you you're on your own. I think that's consistent with all 
the JIRA discussions I have seen over time.

On Mon, Oct 10, 2016, 17:33 Russell Spitzer <[hidden 
email]> wrote:
I've seen a variety of users attempting to work around using Spark Submit with 
at best middling levels of success. I think it would be helpful if the project 
had a clear statement that submitting an application without using Spark Submit 
is truly for experts only or is unsupported entirely.

I know this is a pretty strong stance and other people have had different 
experiences than me so please let me know what you think :)



If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/Official-Stance-on-Not-Using-Spark-Submit-tp19376p19384.html
To start a new topic under Apache Spark Developers List, email 
ml-node+s1001551n1...@n3.nabble.com
To unsubscribe from Apache Spark Developers List, click 
here.
NAML




--
View this message in