Re: Maintenance releases for SPARK-23852?

2018-04-17 Thread Dongjoon Hyun
Since it's a backport from master to branch-2.3 for ORC 1.4.3, I made a
backport PR.

https://github.com/apache/spark/pull/21093

Thank you for raising this issues and confirming, Henry and Xiao. :)

Bests,
Dongjoon.


On Tue, Apr 17, 2018 at 12:01 AM, Xiao Li  wrote:

> Yes, it sounds good to me. We can upgrade both Parquet 1.8.2 to 1.8.3 and
> ORC 1.4.1 to 1.4.3 in our upcoming Spark 2.3.1 release.
>
> Thanks for your efforts! @Henry and @Dongjoon
>
> Xiao
>
> 2018-04-16 14:41 GMT-07:00 Henry Robinson :
>
>> Seems like there aren't any objections. I'll pick this thread back up
>> when a Parquet maintenance release has happened.
>>
>> Henry
>>
>> On 11 April 2018 at 14:00, Dongjoon Hyun  wrote:
>>
>>> Great.
>>>
>>> If we can upgrade the parquet dependency from 1.8.2 to 1.8.3 in Apache
>>> Spark 2.3.1, let's upgrade orc dependency from 1.4.1 to 1.4.3 together.
>>>
>>> Currently, the patch is only merged into master branch now. 1.4.1 has
>>> the following issue.
>>>
>>> https://issues.apache.org/jira/browse/SPARK-23340
>>>
>>> Bests,
>>> Dongjoon.
>>>
>>>
>>>
>>> On Wed, Apr 11, 2018 at 1:23 PM, Reynold Xin 
>>> wrote:
>>>
 Seems like this would make sense... we usually make maintenance
 releases for bug fixes after a month anyway.


 On Wed, Apr 11, 2018 at 12:52 PM, Henry Robinson 
 wrote:

>
>
> On 11 April 2018 at 12:47, Ryan Blue 
> wrote:
>
>> I think a 1.8.3 Parquet release makes sense for the 2.3.x releases of
>> Spark.
>>
>> To be clear though, this only affects Spark when reading data written
>> by Impala, right? Or does Parquet CPP also produce data like this?
>>
>
> I don't know about parquet-cpp, but yeah, the only implementation I've
> seen writing the half-completed stats is Impala. (as you know, that's
> compliant with the spec, just an unusual choice).
>
>
>>
>> On Wed, Apr 11, 2018 at 12:35 PM, Henry Robinson 
>> wrote:
>>
>>> Hi all -
>>>
>>> SPARK-23852 (where a query can silently give wrong results thanks to
>>> a predicate pushdown bug in Parquet) is a fairly bad bug. In other 
>>> projects
>>> I've been involved with, we've released maintenance releases for bugs of
>>> this severity.
>>>
>>> Since Spark 2.4.0 is probably a while away, I wanted to see if there
>>> was any consensus over whether we should consider (at least) a 2.3.1.
>>>
>>> The reason this particular issue is a bit tricky is that the Parquet
>>> community haven't yet produced a maintenance release that fixes the
>>> underlying bug, but they are in the process of releasing a new minor
>>> version, 1.10, which includes a fix. Having spoken to a couple of 
>>> Parquet
>>> developers, they'd be willing to consider a maintenance release, but 
>>> would
>>> probably only bother if we (or another affected project) asked them to.
>>>
>>> My guess is that we wouldn't want to upgrade to a new minor version
>>> of Parquet for a Spark maintenance release, so asking for a Parquet
>>> maintenance release makes sense.
>>>
>>> What does everyone think?
>>>
>>> Best,
>>> Henry
>>>
>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>

>>>
>>
>


unsubscribe

2018-04-17 Thread 韩盼
unsubscribe

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Sort-merge join improvement

2018-04-17 Thread Petar Zecevic

Hello everybody

We (at University of Zagreb and University of Washington) have 
implemented an optimization of Spark's sort-merge join (SMJ) which has 
improved performance of our jobs considerably and we would like to know 
if Spark community thinks it would be useful to include this in the main 
distribution.


The problem we are solving is the case where you have two big tables 
partitioned by X column, but also sorted by Y column (within partitions) 
and you need to calculate an expensive function on the joined rows. 
During a sort-merge join, Spark will do cross-joins of all rows that 
have the same X values and calculate the function's value on all of 
them. If the two tables have a large number of rows per X, this can 
result in a huge number of calculations.


Our optimization allows you to reduce the number of matching rows per X 
using a range condition on Y columns of the two tables. Something like:


... WHERE t1.X = t2.X AND t1.Y BETWEEN t2.Y - d AND t2.Y + d

The way SMJ is currently implemented, these extra conditions have no 
influence on the number of rows (per X) being checked because these 
extra conditions are put in the same block with the function being 
calculated.


Our optimization changes the sort-merge join so that, when these extra 
conditions are specified, a queue is used instead of the 
ExternalAppendOnlyUnsafeRowArray class. This queue is then used as a 
moving window across the values from the right relation as the left row 
changes. You could call this a combination of an equi-join and a theta 
join (we call it "sort-merge inner range join").


Potential use-cases for this are joins based on spatial or temporal 
distance calculations.


The optimization is triggered automatically when an equi-join expression 
is present AND lower and upper range conditions on a secondary column 
are specified. If the tables aren't sorted by both columns, appropriate 
sorts will be added.



We have several questions:

1. Do you see any other way to optimize queries like these (eliminate 
unnecessary calculations) without changing the sort-merge join algorithm?


2. We believe there is a more general pattern here and that this could 
help in other similar situations where secondary sorting is available. 
Would you agree?


3. Would you like us to open a JIRA ticket and create a pull request?

Thanks,

Petar Zecevic



-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [MLLib] Logistic Regression and standadization

2018-04-17 Thread Weichen Xu
Not a bug.

When disabling standadization, mllib LR will still do standadization for
features, but it will scale the coefficients back at the end (after
training finished). So it will get the same result with no standadization
training. The purpose of it is to improve the rate of convergence. So the
result should be always exactly the same with R's glmnet, no matter enable
or disable standadization.

Thanks!

On Sat, Apr 14, 2018 at 2:21 AM, Yanbo Liang  wrote:

> Hi Filipp,
>
> MLlib’s LR implementation did the same way as R’s glmnet for
> standardization.
> Actually you don’t need to care about the implementation detail, as the
> coefficients are always returned on the original scale, so it should be
> return the same result as other popular ML libraries.
> Could you point me where glmnet doesn’t scale features?
> I suspect other issues cause your prediction quality dropped. If you can
> share the code and data, I can help to check it.
>
> Thanks
> Yanbo
>
>
> On Apr 8, 2018, at 1:09 PM, Filipp Zhinkin 
> wrote:
>
> Hi all,
>
> While migrating from custom LR implementation to MLLib's LR implementation
> my colleagues noticed that prediction quality dropped (accoring to
> different business metrics).
> It's turned out that this issue caused by features standardization
> perfomed by MLLib's LR: disregard to 'standardization' option's value all
> features are scaled during loss and gradient computation (as well as in few
> other places): https://github.com/apache/spark/blob/
> 6cc7021a40b64c41a51f337ec4be9545a25e838c/mllib/src/main/
> scala/org/apache/spark/ml/optim/aggregator/LogisticAggregator.scala#L229
>
> According to comments in the code, standardization should be implemented
> the same way it was implementes in R's glmnet package. I've looked through
> corresponding Fortran code, an it seems like glmnet don't scale features
> when you're disabling standardisation (but MLLib still does).
>
> Our models contains multiple one-hot encoded features and scaling them is
> a pretty bad idea.
>
> Why MLLib's LR always scale all features? From my POV it's a bug.
>
> Thanks in advance,
> Filipp.
>
>
>


Re: [discuss][data source v2] remove type parameter in DataReader/WriterFactory

2018-04-17 Thread Wenchen Fan
Yea definitely not. The only requirement is, the DataReader/WriterFactory
must support at least one DataFormat.

>  how are we going to express capability of the given reader of its
supported format(s), or specific support for each of “real-time data in row
format, and history data in columnar format”?

When DataSourceReader/Writer create factories, the factory must contain
enough information to decide the data format. Let's take ORC as an example.
In OrcReaderFactory, it knows which files to read, and which columns to
output. Since now Spark only support columnar scan for simple types,
OrcReaderFactory will only output ColumnarBatch if the columns to scan are
all simple types.

On Tue, Apr 17, 2018 at 11:38 AM, Felix Cheung 
wrote:

> Is it required for DataReader to support all known DataFormat?
>
> Hopefully, not, as assumed by the ‘throw’ in the interface. Then
> specifically how are we going to express capability of the given reader of
> its supported format(s), or specific support for each of “real-time data in
> row format, and history data in columnar format”?
>
>
> --
> *From:* Wenchen Fan 
> *Sent:* Sunday, April 15, 2018 7:45:01 PM
> *To:* Spark dev list
> *Subject:* [discuss][data source v2] remove type parameter in
> DataReader/WriterFactory
>
> Hi all,
>
> I'd like to propose an API change to the data source v2.
>
> One design goal of data source v2 is API type safety. The FileFormat API
> is a bad example, it asks the implementation to return InternalRow even
> it's actually ColumnarBatch. In data source v2 we add a type parameter to
> DataReader/WriterFactoty and DataReader/Writer, so that data source
> supporting columnar scan returns ColumnarBatch at API level.
>
> However, we met some problems when migrating streaming and file-based data
> source to data source v2.
>
> For the streaming side, we need a variant of DataReader/WriterFactory to
> add streaming specific concept like epoch id and offset. For details please
> see ContinuousDataReaderFactory and https://docs.google.com/document/d/
> 1PJYfb68s2AG7joRWbhrgpEWhrsPqbhyRwUVl9V1wPOE/edit#
>
> But this conflicts with the special format mixin traits like
> SupportsScanColumnarBatch. We have to make the streaming variant of
> DataReader/WriterFactory to extend the original DataReader/WriterFactory,
> and do type cast at runtime, which is unnecessary and violate the type
> safety.
>
> For the file-based data source side, we have a problem with code
> duplication. Let's take ORC data source as an example. To support both
> unsafe row and columnar batch scan, we need something like
>
> // A lot of parameters to carry to the executor side
> class OrcUnsafeRowFactory(...) extends DataReaderFactory[UnsafeRow] {
>   def createDataReader ...
> }
>
> class OrcColumnarBatchFactory(...) extends DataReaderFactory[ColumnarBatch]
> {
>   def createDataReader ...
> }
>
> class OrcDataSourceReader extends DataSourceReader {
>   def createUnsafeRowFactories = ... // logic to prepare the parameters
> and create factories
>
>   def createColumnarBatchFactories = ... // logic to prepare the
> parameters and create factories
> }
>
> You can see that we have duplicated logic for preparing parameters and
> defining the factory.
>
> Here I propose to remove all the special format mixin traits and change
> the factory interface to
>
> public enum DataFormat {
>   ROW,
>   INTERNAL_ROW,
>   UNSAFE_ROW,
>   COLUMNAR_BATCH
> }
>
> interface DataReaderFactory {
>   DataFormat dataFormat;
>
>   default DataReader createRowDataReader() {
> throw new IllegalStateException();
>   }
>
>   default DataReader createUnsafeRowDataReader() {
> throw new IllegalStateException();
>   }
>
>   default DataReader createColumnarBatchDataReader() {
> throw new IllegalStateException();
>   }
> }
>
> Spark will look at the dataFormat and decide which create data reader
> method to call.
>
> Now we don't have the problem for the streaming side as these special
> format mixin traits go away. And the ORC data source can also be simplified
> to
>
> class OrcReaderFactory(...) extends DataReaderFactory {
>   def createUnsafeRowReader ...
>
>   def createColumnarBatchReader ...
> }
>
> class OrcDataSourceReader extends DataSourceReader {
>   def createReadFactories = ... // logic to prepare the parameters and
> create factories
> }
>
> We also have a potential benefit of supporting hybrid storage data source,
> which may keep real-time data in row format, and history data in columnar
> format. Then they can make some DataReaderFactory output InternalRow and
> some output ColumnarBatch.
>
> Thoughts?
>