Re: Correlated subqueries in the DataFrame API

2018-04-27 Thread Nicholas Chammas
What about exposing transforms that make it easy to coerce data to what the
method needs? Instead of passing a dataframe, you’d pass df.toSet to isin

Assuming toSet returns a local list, wouldn’t that have the problem of not
being able to handle extremely large lists? In contrast, I believe SQL’s IN
is implemented in such a way that the inner query being referenced by IN
does not need to be collected locally. Did I understand your suggestion
correctly?

I think having .isin() accept a Column potentially makes more sense, since
that matches what happens in SQL in terms of semantics, and would hopefully
also preserve the distributed nature of the operation.

For example, I believe in most cases we’d want this

(table1
.where(
table1['name'].isin(
table2.select('name')
# table2['name']  # per Reynold's suggestion
)))

and this

(table1
.join(table2, on='name')
.select(table1['*']))

to compile down to the same physical plan. No?

Nick
​

On Thu, Apr 19, 2018 at 7:13 PM Reynold Xin  wrote:

> Perhaps we can just have a function that turns a DataFrame into a Column?
> That'd work for both correlated and uncorrelated case, although in the
> correlated case we'd need to turn off eager analysis (otherwise there is no
> way to construct a valid DataFrame).
>
>
> On Thu, Apr 19, 2018 at 4:08 PM, Ryan Blue 
> wrote:
>
>> Nick, thanks for raising this.
>>
>> It looks useful to have something in the DF API that behaves like
>> sub-queries, but I’m not sure that passing a DF works. Making every method
>> accept a DF that may contain matching data seems like it puts a lot of work
>> on the API — which now has to accept a DF all over the place.
>>
>> What about exposing transforms that make it easy to coerce data to what
>> the method needs? Instead of passing a dataframe, you’d pass df.toSet to
>> isin:
>>
>> val subQ = spark.sql("select distinct filter_col from source")
>> val df = table.filter($"col".isin(subQ.toSet))
>>
>> That also distinguishes between a sub-query and a correlated sub-query
>> that uses values from the outer query. We would still need to come up with
>> syntax for the correlated case, unless there’s a proposal already.
>>
>> rb
>> ​
>>
>> On Mon, Apr 9, 2018 at 3:56 PM, Nicholas Chammas <
>> nicholas.cham...@gmail.com> wrote:
>>
>>> I just submitted SPARK-23945
>>>  but wanted to
>>> double check here to make sure I didn't miss something fundamental.
>>>
>>> Correlated subqueries are tracked at a high level in SPARK-18455
>>> , but it's not clear
>>> to me whether they are "design-appropriate" for the DataFrame API.
>>>
>>> Are correlated subqueries a thing we can expect to have in the DataFrame
>>> API?
>>>
>>> Nick
>>>
>>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>


Re: [MLLib] Logistic Regression and standadization

2018-04-27 Thread Valeriy Avanesov

Hi all,

maybe I'm missing something, but from what was discussed here I've 
gathered that the current mllib implementation returns exactly the same 
model whether standardization is turned on or off.


I suggest to consider an R script (please, see below) which trains two 
penalized logistic regression models (with glmnet) with and without 
standardization. The models are clearly different.


BTW. If penalization is turned off, the models are exactly the same.

Therefore, the current mllib implementation doesn't follow glmnet. So, 
does that make it a bug?


library(glmnet)
library(e1071)

set.seed(13)

# generate synthetic data
X = cbind(-500:500, (-500:500)*1000)/10

y = sigmoid(X %*% c(1, 1))
y = rbinom(y, 1, y)

# define two testing points
xTest = rbind(c(-10, -10), c(-20, -20))/1000

# train two models: with and without standardization
lambda = 0.01

model = glmnet(X, y, family="binomial", standardize=TRUE, lambda=lambda)
print(predict(model, xTest, type="link"))

model = glmnet(X, y, family="binomial", standardize=FALSE, lambda=lambda)
print(predict(model, xTest, type="link"))

Best,

Valeriy.


On 04/25/2018 12:32 AM, DB Tsai wrote:

As I’m one of the original authors, let me chime in for some comments.

Without the standardization, the LBFGS will be unstable. For example, 
if a feature is being x 10, then the corresponding coefficient should 
be / 10 to make the same prediction. But without standardization, the 
LBFGS will converge to different solution due to numerical stability.


TLDR, this can be implemented in the optimizer or in the trainer. We 
choose to implement in the trainer as LBFGS optimizer in breeze 
suffers this issue. As an user, you don’t need to care much even you 
have one-hot encoding features, and the result should match R.


DB Tsai  |  Siri Open Source Technologies [not a contribution]  |   
Apple, Inc


On Apr 20, 2018, at 5:56 PM, Weichen Xu > wrote:


Right. If regularization item isn't zero, then enable/disable 
standardization will get different result.
But, if comparing results between R-glmnet and mllib, if we set the 
same parameters for regularization/standardization/... , then we 
should get the same result. If not, thenmaybe there's a bug. In this 
case you can paste your testing code and I can help fix it.


On Sat, Apr 21, 2018 at 1:06 AM, Valeriy Avanesov > wrote:


Hi all.

Filipp, do you use l1/l2/elstic-net penalization? I believe in
this case standardization matters.

Best,

Valeriy.


On 04/17/2018 11:40 AM, Weichen Xu wrote:

Not a bug.

When disabling standadization, mllib LR will still do
standadization for features, but it will scale the coefficients
back at the end (after training finished). So it will get the
same result with no standadization training. The purpose of it
is to improve the rate of convergence. So the result should be
always exactly the same with R's glmnet, no matter enable or
disable standadization.

Thanks!

On Sat, Apr 14, 2018 at 2:21 AM, Yanbo Liang mailto:yblia...@gmail.com>> wrote:

Hi Filipp,

MLlib’s LR implementation did the same way as R’s glmnet for
standardization.
Actually you don’t need to care about the implementation
detail, as the coefficients are always returned on the
original scale, so it should be return the same result as
other popular ML libraries.
Could you point me where glmnet doesn’t scale features?
I suspect other issues cause your prediction quality
dropped. If you can share the code and data, I can help to
check it.

Thanks
Yanbo



On Apr 8, 2018, at 1:09 PM, Filipp Zhinkin
mailto:filipp.zhin...@gmail.com>> wrote:

Hi all,

While migrating from custom LR implementation to MLLib's LR
implementation my colleagues noticed that prediction
quality dropped (accoring to different business metrics).
It's turned out that this issue caused by features
standardization perfomed by MLLib's LR: disregard to
'standardization' option's value all features are scaled
during loss and gradient computation (as well as in few
other places):

https://github.com/apache/spark/blob/6cc7021a40b64c41a51f337ec4be9545a25e838c/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala#L229



According to comments in the code, standardization should
be implemented the same way it was implementes in R's
glmnet package. I've looked through corresponding Fortran
code, an it seems like glmnet don't scale features when
you're disabling standardisation (but MLLib still does).

   

Re: Datasource API V2 and checkpointing

2018-04-27 Thread Thakrar, Jayesh
Thanks Joseph!

From: Joseph Torres 
Date: Friday, April 27, 2018 at 11:23 AM
To: "Thakrar, Jayesh" 
Cc: "dev@spark.apache.org" 
Subject: Re: Datasource API V2 and checkpointing

The precise interactions with the DataSourceV2 API haven't yet been hammered 
out in design. But much of this comes down to the core of Structured Streaming 
rather than the API details.

The execution engine handles checkpointing and recovery. It asks the streaming 
data source for offsets, and then determines that batch N contains the data 
between offset A and offset B. On recovery, if batch N needs to be re-run, the 
execution engine just asks the source for the same offset range again. Sources 
also get a handle to their own subfolder of the checkpoint, which they can use 
as scratch space if they need. For example, Spark's FileStreamReader keeps a 
log of all the files it's seen, so its offsets can be simply indices into the 
log rather than huge strings containing all the paths.

SPARK-23323 is orthogonal. That commit coordinator is responsible for ensuring 
that, within a single Spark job, two different tasks can't commit the same 
partition.

On Fri, Apr 27, 2018 at 8:53 AM, Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> wrote:
Wondering if this issue is related to SPARK-23323?

Any pointers will be greatly appreciated….

Thanks,
Jayesh

From: "Thakrar, Jayesh" 
mailto:jthak...@conversantmedia.com>>
Date: Monday, April 23, 2018 at 9:49 PM
To: "dev@spark.apache.org" 
mailto:dev@spark.apache.org>>
Subject: Datasource API V2 and checkpointing

I was wondering when checkpointing is enabled, who does the actual work?
The streaming datasource or the execution engine/driver?

I have written a small/trivial datasource that just generates strings.
After enabling checkpointing, I do see a folder being created under the 
checkpoint folder, but there's nothing else in there.

Same question for write-ahead and recovery?
And on a restart from a failed streaming session - who should set the offsets?
The driver/Spark or the datasource?

Any pointers to design docs would also be greatly appreciated.

Thanks,
Jayesh




Re: Datasource API V2 and checkpointing

2018-04-27 Thread Joseph Torres
The precise interactions with the DataSourceV2 API haven't yet been
hammered out in design. But much of this comes down to the core of
Structured Streaming rather than the API details.

The execution engine handles checkpointing and recovery. It asks the
streaming data source for offsets, and then determines that batch N
contains the data between offset A and offset B. On recovery, if batch N
needs to be re-run, the execution engine just asks the source for the same
offset range again. Sources also get a handle to their own subfolder of the
checkpoint, which they can use as scratch space if they need. For example,
Spark's FileStreamReader keeps a log of all the files it's seen, so its
offsets can be simply indices into the log rather than huge strings
containing all the paths.

SPARK-23323 is orthogonal. That commit coordinator is responsible for
ensuring that, within a single Spark job, two different tasks can't commit
the same partition.

On Fri, Apr 27, 2018 at 8:53 AM, Thakrar, Jayesh <
jthak...@conversantmedia.com> wrote:

> Wondering if this issue is related to SPARK-23323?
>
>
>
> Any pointers will be greatly appreciated….
>
>
>
> Thanks,
>
> Jayesh
>
>
>
> *From: *"Thakrar, Jayesh" 
> *Date: *Monday, April 23, 2018 at 9:49 PM
> *To: *"dev@spark.apache.org" 
> *Subject: *Datasource API V2 and checkpointing
>
>
>
> I was wondering when checkpointing is enabled, who does the actual work?
>
> The streaming datasource or the execution engine/driver?
>
>
>
> I have written a small/trivial datasource that just generates strings.
>
> After enabling checkpointing, I do see a folder being created under the
> checkpoint folder, but there's nothing else in there.
>
>
>
> Same question for write-ahead and recovery?
>
> And on a restart from a failed streaming session - who should set the
> offsets?
>
> The driver/Spark or the datasource?
>
>
>
> Any pointers to design docs would also be greatly appreciated.
>
>
>
> Thanks,
>
> Jayesh
>
>
>


Re: Datasource API V2 and checkpointing

2018-04-27 Thread Thakrar, Jayesh
Wondering if this issue is related to SPARK-23323?

Any pointers will be greatly appreciated….

Thanks,
Jayesh

From: "Thakrar, Jayesh" 
Date: Monday, April 23, 2018 at 9:49 PM
To: "dev@spark.apache.org" 
Subject: Datasource API V2 and checkpointing

I was wondering when checkpointing is enabled, who does the actual work?
The streaming datasource or the execution engine/driver?

I have written a small/trivial datasource that just generates strings.
After enabling checkpointing, I do see a folder being created under the 
checkpoint folder, but there's nothing else in there.

Same question for write-ahead and recovery?
And on a restart from a failed streaming session - who should set the offsets?
The driver/Spark or the datasource?

Any pointers to design docs would also be greatly appreciated.

Thanks,
Jayesh



unsubscribe

2018-04-27 Thread Deepesh Maheshwari



unsubscribe

2018-04-27 Thread hari haran
-- 
- Hariharan M K


Re: Sorting on a streaming dataframe

2018-04-27 Thread Hemant Bhanawat
I see.

monotonically_increasing_id on streaming dataFrames will be really helpful
to me and I believe to many more users. Adding this functionality in Spark
would be efficient in terms of performance as compared to implementing this
functionality inside the applications.

Hemant

On Thu, Apr 26, 2018 at 11:59 PM, Michael Armbrust 
wrote:

> The basic tenet of structured streaming is that a query should return the
> same answer in streaming or batch mode. We support sorting in complete mode
> because we have all the data and can sort it correctly and return the full
> answer.  In update or append mode, sorting would only return a correct
> answer if we could promise that records that sort lower are going to arrive
> later (and we can't).  Therefore, it is disallowed.
>
> If you are just looking for a unique, stable id and you are already using
> kafka as the source, you could just combine the partition id and the
> offset. The structured streaming connector to Kafka
> 
> exposes both of these in the schema of the streaming DataFrame. (similarly
> for kinesis you can use the shard id and sequence number)
>
> If you need the IDs to be contiguous, then this is a somewhat
> fundamentally hard problem.  I think the best we could do is add support
> for monotonically_increasing_id() in streaming dataframes.
>
> On Tue, Apr 24, 2018 at 1:38 PM, Chayapan Khannabha 
> wrote:
>
>> Perhaps your use case fits to Apache Kafka better.
>>
>> More info at:
>> https://kafka.apache.org/documentation/streams/
>>
>> Everything really comes down to the architecture design and algorithm
>> spec. However, from my experience with Spark, there are many good reasons
>> why this requirement is not supported ;)
>>
>> Best,
>>
>> Chayapan (A)
>>
>>
>> On Apr 24, 2018, at 2:18 PM, Hemant Bhanawat 
>> wrote:
>>
>> Thanks Chris. There are many ways in which I can solve this problem but
>> they are cumbersome. The easiest way would have been to sort the streaming
>> dataframe. The reason I asked this question is because I could not find a
>> reason why sorting on streaming dataframe is disallowed.
>>
>> Hemant
>>
>> On Mon, Apr 16, 2018 at 6:09 PM, Bowden, Chris <
>> chris.bow...@microfocus.com> wrote:
>>
>>> You can happily sort the underlying RDD of InternalRow(s) inside a sink,
>>> assuming you are willing to implement and maintain your own sink(s). That
>>> is, just grabbing the parquet sink, etc. isn’t going to work out of the
>>> box. Alternatively map/flatMapGroupsWithState is probably sufficient and
>>> requires less working knowledge to make effective reuse of internals. Just
>>> group by foo and then sort accordingly and assign ids. The id counter can
>>> be stateful per group. Sometimes this problem may not need to be solved at
>>> all. For example, if you are using kafka, a proper partitioning scheme and
>>> message offsets may be “good enough”.
>>> --
>>> *From:* Hemant Bhanawat 
>>> *Sent:* Thursday, April 12, 2018 11:42:59 PM
>>> *To:* Reynold Xin
>>> *Cc:* dev
>>> *Subject:* Re: Sorting on a streaming dataframe
>>>
>>> Well, we want to assign snapshot ids (incrementing counters) to the
>>> incoming records. For that, we are zipping the streaming rdds with that
>>> counter using a modified version of ZippedWithIndexRDD. We are ok if the
>>> records in the streaming dataframe gets counters in random order but the
>>> counter should always be incrementing.
>>>
>>> This is working fine until we have a failure. When we have a failure, we
>>> re-assign the records to snapshot ids  and this time same snapshot id can
>>> get assigned to a different record. This is a problem because the primary
>>> key in our storage engine is . So we want to sort the
>>> dataframe so that the records always get the same snapshot id.
>>>
>>>
>>>
>>> On Fri, Apr 13, 2018 at 11:43 AM, Reynold Xin 
>>> wrote:
>>>
>>> Can you describe your use case more?
>>>
>>> On Thu, Apr 12, 2018 at 11:12 PM Hemant Bhanawat 
>>> wrote:
>>>
>>> Hi Guys,
>>>
>>> Why is sorting on streaming dataframes not supported(unless it is
>>> complete mode)? My downstream needs me to sort the streaming dataframe.
>>>
>>> Hemant
>>>
>>>
>>>
>>
>>
>