Re: HashingTFModel/IDFModel in Structured Streaming

2017-11-15 Thread Davis Varghese
Since we are on spark 2.2, I backported/fixed it. Here is the diff file
comparing against
https://github.com/apache/spark/blob/73fe1d8087cfc2d59ac5b9af48b4cf5f5b86f920/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSizeHint.scala

24c24
< import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators}
---
> import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators,
> IntParam}
44c44,46
<   val size = new Param[Int](this, "size", "Size of vectors in column.",
{s: Int => s >= 0})
---
>   val size: IntParam =
> new IntParam(this, "size", "Size of vectors in column.",
> ParamValidators.gt(0))
> 
57c59
<   @Since("2.3.0")
---
> /*  @Since("2.3.0")
64c66
< ParamValidators.inArray(VectorSizeHint.supportedHandleInvalids))
---
> ParamValidators.inArray(VectorSizeHint.supportedHandleInvalids))*/
134c136
<   override def copy(extra: ParamMap): VectorAssembler = defaultCopy(extra)
---
>   override def copy(extra: ParamMap): VectorSizeHint = defaultCopy(extra)



The first 2 changes are required to make it save the model with
VectorSizeHint info
3rd one is required as the overridden method is final in spark 2.2
4th one was wrong code as it was giving ClassCastException


Here is the working code after using this new transformer

import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.CountVectorizer;
import org.apache.spark.ml.feature.CountVectorizerModel;
import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.feature.VectorSizeHint;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.ml.tuning.TrainValidationSplit;
import org.apache.spark.ml.tuning.TrainValidationSplitModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 * A simple text classification pipeline that recognizes "spark" from input
text.
 */
public class StreamingIssueCountVectorizerSplitFailed {

  public static void main(String[] args) throws Exception {
SparkSession sparkSession =
SparkSession.builder().appName("StreamingIssueCountVectorizer")
.master("local[2]")
.getOrCreate();

List _trainData = Arrays.asList(
RowFactory.create("sunny fantastic day", 1, "Positive"),
RowFactory.create("fantastic morning match", 1, "Positive"),
RowFactory.create("good morning", 1, "Positive"),
RowFactory.create("boring evening", 5, "Negative"),
RowFactory.create("tragic evening event", 5, "Negative"),
RowFactory.create("today is bad ", 5, "Negative")
);
List _testData = Arrays.asList(
RowFactory.create("sunny morning", 1),
RowFactory.create("bad evening", 5)
);
StructType schema = new StructType(new StructField[]{
new StructField("tweet", DataTypes.StringType, false,
Metadata.empty()),
new StructField("time", DataTypes.IntegerType, false,
Metadata.empty()),
new StructField("sentiment", DataTypes.StringType, true,
Metadata.empty())
});
StructType testSchema = new StructType(new StructField[]{
new StructField("tweet", DataTypes.StringType, false,
Metadata.empty()),
new StructField("time", DataTypes.IntegerType, false,
Metadata.empty())
});

Dataset trainData = sparkSession.createDataFrame(_trainData,
schema);
Dataset testData = sparkSession.createDataFrame(_testData,
testSchema);
StringIndexerModel labelIndexerModel = new StringIndexer()
.setInputCol("sentiment")
.setOutputCol("label")
.setHandleInvalid("skip")
.fit(trainData);
Tokenizer tokenizer = new Tokenizer()
.setInputCol("tweet")
.setOutputCol("words");
CountVectorizer countVectorizer = new CountVectorizer()
.setInputCol(tokenizer.getOutputCol())
.setOutputCol("wordfeatures")
.setVocabSize(3)
.setMinDF(2)
.setMinTF(2)
.setBinary(true);

VectorSizeHint wordfeatures = new VectorSizeHint();
wordfeatures.setInputCol("wordfeatures");
wordfeatures.setSize(3);

VectorAssembler vectorAssembler = new VectorAssembler()
.setInputCols(new String[]{"wordfeatures", "time"}).

Re: HashingTFModel/IDFModel in Structured Streaming

2017-11-15 Thread Davis Varghese
Since we are on spark 2.2, I backported/fixed it. Here is the diff file
comparing against
https://github.com/apache/spark/blob/73fe1d8087cfc2d59ac5b9af48b4cf5f5b86f920/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSizeHint.scala

24c24
< import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators}
---
> import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators,
> IntParam}
44c44,46
<   val size = new Param[Int](this, "size", "Size of vectors in column.",
{s: Int => s >= 0})
---
>   val size: IntParam =
> new IntParam(this, "size", "Size of vectors in column.",
> ParamValidators.gt(0))
> 
57c59
<   @Since("2.3.0")
---
> /*  @Since("2.3.0")
64c66
< ParamValidators.inArray(VectorSizeHint.supportedHandleInvalids))
---
> ParamValidators.inArray(VectorSizeHint.supportedHandleInvalids))*/
134c136
<   override def copy(extra: ParamMap): VectorAssembler = defaultCopy(extra)
---
>   override def copy(extra: ParamMap): VectorSizeHint = defaultCopy(extra)



The first 2 changes are required to make it save the model with
VectorSizeHint info
3rd one is required as the overridden method is final in spark 2.2
4th one was wrong code as it was giving ClassCastException


Here is the working code after using this new transformer

import java.util.Arrays;
import java.util.List;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
import org.apache.spark.ml.feature.CountVectorizer;
import org.apache.spark.ml.feature.CountVectorizerModel;
import org.apache.spark.ml.feature.IndexToString;
import org.apache.spark.ml.feature.StringIndexer;
import org.apache.spark.ml.feature.StringIndexerModel;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.feature.VectorSizeHint;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.ml.tuning.TrainValidationSplit;
import org.apache.spark.ml.tuning.TrainValidationSplitModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 * A simple text classification pipeline that recognizes "spark" from input
text.
 */
public class StreamingIssueCountVectorizerSplitFailed {

  public static void main(String[] args) throws Exception {
SparkSession sparkSession =
SparkSession.builder().appName("StreamingIssueCountVectorizer")
.master("local[2]")
.getOrCreate();

List _trainData = Arrays.asList(
RowFactory.create("sunny fantastic day", 1, "Positive"),
RowFactory.create("fantastic morning match", 1, "Positive"),
RowFactory.create("good morning", 1, "Positive"),
RowFactory.create("boring evening", 5, "Negative"),
RowFactory.create("tragic evening event", 5, "Negative"),
RowFactory.create("today is bad ", 5, "Negative")
);
List _testData = Arrays.asList(
RowFactory.create("sunny morning", 1),
RowFactory.create("bad evening", 5)
);
StructType schema = new StructType(new StructField[]{
new StructField("tweet", DataTypes.StringType, false,
Metadata.empty()),
new StructField("time", DataTypes.IntegerType, false,
Metadata.empty()),
new StructField("sentiment", DataTypes.StringType, true,
Metadata.empty())
});
StructType testSchema = new StructType(new StructField[]{
new StructField("tweet", DataTypes.StringType, false,
Metadata.empty()),
new StructField("time", DataTypes.IntegerType, false,
Metadata.empty())
});

Dataset trainData = sparkSession.createDataFrame(_trainData,
schema);
Dataset testData = sparkSession.createDataFrame(_testData,
testSchema);
StringIndexerModel labelIndexerModel = new StringIndexer()
.setInputCol("sentiment")
.setOutputCol("label")
.setHandleInvalid("skip")
.fit(trainData);
Tokenizer tokenizer = new Tokenizer()
.setInputCol("tweet")
.setOutputCol("words");
CountVectorizer countVectorizer = new CountVectorizer()
.setInputCol(tokenizer.getOutputCol())
.setOutputCol("wordfeatures")
.setVocabSize(3)
.setMinDF(2)
.setMinTF(2)
.setBinary(true);

VectorSizeHint wordfeatures = new VectorSizeHint();
wordfeatures.setInputCol("wordfeatures");
wordfeatures.setSize(3);

VectorAssembler vectorAssembler = new VectorAssembler()
.setInputCols(new String[]{"wordfeatures", "time"}).

Re: [discuss][PySpark] Can we drop support old Pandas (<0.19.2) or what version should we support?

2017-11-15 Thread Takuya UESHIN
Thanks for feedback.

Hyukjin Kwon:
> My only worry is, users who depends on lower pandas versions

That's what I worried and one of the reasons I moved this discussion here.

Li Jin:
> how complicated it is to support pandas < 0.19.2 with old non-Arrow
interops

In my original PR (https://github.com/apache/spark/pull/19607) we will fix
the behavior of timestamp values for Pandas.
If we need to support old Pandas, we will need at least some workarounds
like in the following link:
https://github.com/apache/spark/blob/e919ed55758f75733d56287d5a49326b1067a44c/python/pyspark/sql/types.py#L1718-L1774


Thanks.


On Wed, Nov 15, 2017 at 12:59 AM, Li Jin  wrote:

> I think this makes sense. PySpark/Pandas interops in 2.3 are new anyway, I
> don't think we need to support the new functionality with older version of
> pandas (Takuya's reason 3)
>
> One thing I am not sure is how complicated it is to support pandas <
> 0.19.2 with old non-Arrow interops and require pandas >= 0.19.2 for new
> Arrow interops. Maybe it makes sense to allow user keep using their PySpark
> code if they don't want to use any of the new stuff. If this is still
> complicated, I would be leaning towards not supporting < 0.19.2.
>
>
> On Tue, Nov 14, 2017 at 6:04 AM, Hyukjin Kwon  wrote:
>
>> +0 to drop it as I said in the PR. I am seeing It brings a lot of hard
>> time to get the cool changes through, and is slowing down them to get
>> pushed.
>>
>> My only worry is, users who depends on lower pandas versions (Pandas
>> 0.19.2 seems released less then a year before. In the similar time, Spark
>> 2.1.0 was released).
>>
>> If this worry is less than I expected, I definitely support it. It should
>> speed up those cool changes.
>>
>>
>> On 14 Nov 2017 7:14 pm, "Takuya UESHIN"  wrote:
>>
>> Hi all,
>>
>> I'd like to raise a discussion about Pandas version.
>> Originally we are discussing it at https://github.com/apache/s
>> park/pull/19607 but we'd like to ask for feedback from community.
>>
>>
>> Currently we don't explicitly specify the Pandas version we are
>> supporting but we need to decide what version we should support because:
>>
>>   - There have been a number of API evolutions around extension dtypes
>> that make supporting pandas 0.18.x and lower challenging.
>>
>>   - Sometimes Pandas older than 0.19.2 doesn't handle timestamp values
>> properly. We want to provide properer support for timestamp values.
>>
>>   - If users want to use vectorized UDFs, or toPandas / createDataFrame
>> from Pandas DataFrame with Arrow which will be released in Spark 2.3, users
>> have to upgrade Pandas 0.19.2 or upper anyway because we need pyarrow
>> internally, which supports only 0.19.2 or upper.
>>
>>
>> The point I'd like to ask is:
>>
>> Can we drop support old Pandas (<0.19.2)?
>> If not, what version should we support?
>>
>>
>> References:
>>
>> - vectorized UDF
>>   - https://github.com/apache/spark/pull/18659
>>   - https://github.com/apache/spark/pull/18732
>> - toPandas with Arrow
>>   - https://github.com/apache/spark/pull/18459
>> - createDataFrame from pandas DataFrame with Arrow
>>   - https://github.com/apache/spark/pull/19646
>>
>>
>> Any comments are welcome!
>>
>> Thanks.
>>
>> --
>> Takuya UESHIN
>> Tokyo, Japan
>>
>> http://twitter.com/ueshin
>>
>>
>>
>


-- 
Takuya UESHIN
Tokyo, Japan

http://twitter.com/ueshin


Re: [VOTE] Spark 2.2.1 (RC1)

2017-11-15 Thread Sean Owen
The signature is fine, with your new sig. Updated hashes look fine too.
LICENSE is still fine to my knowledge.

Is anyone else seeing this failure?

- GenerateOrdering with ShortType
*** RUN ABORTED ***
java.lang.StackOverflowError:
at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:370)
at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:541)
at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:541)
at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:541)
at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:541)
at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:541)
at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:541)
at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:541)

This looks like SPARK-16845 again; see
https://issues.apache.org/jira/browse/SPARK-16845?focusedCommentId=16018840=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16018840


On Wed, Nov 15, 2017 at 12:25 AM Felix Cheung 
wrote:

Please vote on releasing the following candidate as Apache Spark version
2.2.1. The vote is open until Monday November 20, 2017 at 23:00 UTC and
passes if a majority of at least 3 PMC +1 votes are cast.




[ ] +1 Release this package as Apache Spark 2.2.1


[ ] -1 Do not release this package because ...




To learn more about Apache Spark, please see https://spark.apache.org/




The tag to be voted on is v2.2.1-rc1
https://github.com/apache/spark/tree/v2.2.1-rc1
(41116ab7fca46db7255b01e8727e2e5d571a3e35)


List of JIRA tickets resolved in this release can be found here
https://issues.apache.org/jira/projects/SPARK/versions/12340470




The release files, including signatures, digests, etc. can be found at:
https://dist.apache.org/repos/dist/dev/spark/spark-2.2.1-rc1-bin/


Release artifacts are signed with the following key:
https://dist.apache.org/repos/dist/dev/spark/KEYS


The staging repository for this release can be found at:
https://repository.apache.org/content/repositories/orgapachespark-1256/


The documentation corresponding to this release can be found at:
https://dist.apache.org/repos/dist/dev/spark/spark-2.2.1-rc1-docs/_site/index.html




FAQ


How can I help test this release?


If you are a Spark user, you can help us test this release by taking an
existing Spark workload and running on this release candidate, then
reporting any regressions.


If you're working in PySpark you can set up a virtual env and install the
current RC and see if anything important breaks, in the Java/Scala you can
add the staging repository to your projects resolvers and test with the RC
(make sure to clean up the artifact cache before/after so you don't end up
building with a out of date RC going forward).


What should happen to JIRA tickets still targeting 2.2.1?


Committers should look at those and triage. Extremely important bug fixes,
documentation, and API tweaks that impact compatibility should be worked on
immediately. Everything else please retarget to 2.2.2.


But my bug isn't fixed!??!


In order to make timely releases, we will typically not hold the release
unless the bug in question is a regression from 2.2.0. That being said if
there is something which is a regression form 2.2.0 that has not been
correctly targeted please ping a committer to help target the issue (you
can see the open issues listed as impacting Spark 2.2.1 / 2.2.2 here

.


What are the unresolved issues targeted for 2.2.1

?


At the time of the writing, there is one resolved SPARK-22471
 would help stability,
and one in progress on joins SPARK-22042



Re: [VOTE] Spark 2.2.1 (RC1)

2017-11-15 Thread Felix Cheung
Thanks Xiao, please continue to merge them to branch-2.2 and tag with
TargetVersion 2.2.2

They look to be fairly isolated, please continue to test this RC1 as much
as possible and I think we should hold on rolling another RC till Sunday.


On Wed, Nov 15, 2017 at 2:15 PM Xiao Li  wrote:

> Another issue https://issues.apache.org/jira/browse/SPARK-22479 is also
> critical for security. We should also merge it to 2.2.1?
>
> 2017-11-15 9:12 GMT-08:00 Xiao Li :
>
>> Hi, Felix,
>>
>> https://issues.apache.org/jira/browse/SPARK-22469
>>
>> Maybe also include this regression of 2.2? It works in 2.1
>>
>> Thanks,
>>
>> Xiao
>>
>>
>>
>> 2017-11-14 22:25 GMT-08:00 Felix Cheung :
>>
>>> Please vote on releasing the following candidate as Apache Spark version
>>> 2.2.1. The vote is open until Monday November 20, 2017 at 23:00 UTC and
>>> passes if a majority of at least 3 PMC +1 votes are cast.
>>>
>>>
>>> [ ] +1 Release this package as Apache Spark 2.2.1
>>>
>>> [ ] -1 Do not release this package because ...
>>>
>>>
>>> To learn more about Apache Spark, please see https://spark.apache.org/
>>>
>>>
>>> The tag to be voted on is v2.2.1-rc1
>>> https://github.com/apache/spark/tree/v2.2.1-rc1
>>> (41116ab7fca46db7255b01e8727e2e5d571a3e35)
>>>
>>> List of JIRA tickets resolved in this release can be found here
>>> https://issues.apache.org/jira/projects/SPARK/versions/12340470
>>>
>>>
>>> The release files, including signatures, digests, etc. can be found at:
>>> https://dist.apache.org/repos/dist/dev/spark/spark-2.2.1-rc1-bin/
>>>
>>> Release artifacts are signed with the following key:
>>> https://dist.apache.org/repos/dist/dev/spark/KEYS
>>>
>>> The staging repository for this release can be found at:
>>> https://repository.apache.org/content/repositories/orgapachespark-1256/
>>>
>>> The documentation corresponding to this release can be found at:
>>>
>>> https://dist.apache.org/repos/dist/dev/spark/spark-2.2.1-rc1-docs/_site/index.html
>>>
>>>
>>> *FAQ*
>>>
>>> *How can I help test this release?*
>>>
>>> If you are a Spark user, you can help us test this release by taking an
>>> existing Spark workload and running on this release candidate, then
>>> reporting any regressions.
>>>
>>> If you're working in PySpark you can set up a virtual env and install
>>> the current RC and see if anything important breaks, in the Java/Scala you
>>> can add the staging repository to your projects resolvers and test with the
>>> RC (make sure to clean up the artifact cache before/after so you don't end
>>> up building with a out of date RC going forward).
>>>
>>> *What should happen to JIRA tickets still targeting 2.2.1?*
>>>
>>> Committers should look at those and triage. Extremely important bug
>>> fixes, documentation, and API tweaks that impact compatibility should be
>>> worked on immediately. Everything else please retarget to 2.2.2.
>>>
>>> *But my bug isn't fixed!??!*
>>>
>>> In order to make timely releases, we will typically not hold the release
>>> unless the bug in question is a regression from 2.2.0. That being said if
>>> there is something which is a regression form 2.2.0 that has not been
>>> correctly targeted please ping a committer to help target the issue (you
>>> can see the open issues listed as impacting Spark 2.2.1 / 2.2.2 here
>>> 
>>> .
>>>
>>> What are the unresolved issues targeted for 2.2.1
>>> 
>>> ?
>>>
>>> At the time of the writing, there is one resolved SPARK-22471
>>>  would help
>>> stability, and one in progress on joins SPARK-22042
>>> 
>>>
>>>
>>>
>>>
>>
>


Re: [VOTE] Spark 2.2.1 (RC1)

2017-11-15 Thread Xiao Li
Another issue https://issues.apache.org/jira/browse/SPARK-22479 is also
critical for security. We should also merge it to 2.2.1?

2017-11-15 9:12 GMT-08:00 Xiao Li :

> Hi, Felix,
>
> https://issues.apache.org/jira/browse/SPARK-22469
>
> Maybe also include this regression of 2.2? It works in 2.1
>
> Thanks,
>
> Xiao
>
>
>
> 2017-11-14 22:25 GMT-08:00 Felix Cheung :
>
>> Please vote on releasing the following candidate as Apache Spark version
>> 2.2.1. The vote is open until Monday November 20, 2017 at 23:00 UTC and
>> passes if a majority of at least 3 PMC +1 votes are cast.
>>
>>
>> [ ] +1 Release this package as Apache Spark 2.2.1
>>
>> [ ] -1 Do not release this package because ...
>>
>>
>> To learn more about Apache Spark, please see https://spark.apache.org/
>>
>>
>> The tag to be voted on is v2.2.1-rc1 https://github.com/apache/spar
>> k/tree/v2.2.1-rc1 (41116ab7fca46db7255b01e8727e2e5d571a3e35)
>>
>> List of JIRA tickets resolved in this release can be found here
>> https://issues.apache.org/jira/projects/SPARK/versions/12340470
>>
>>
>> The release files, including signatures, digests, etc. can be found at:
>> https://dist.apache.org/repos/dist/dev/spark/spark-2.2.1-rc1-bin/
>>
>> Release artifacts are signed with the following key:
>> https://dist.apache.org/repos/dist/dev/spark/KEYS
>>
>> The staging repository for this release can be found at:
>> https://repository.apache.org/content/repositories/orgapachespark-1256/
>>
>> The documentation corresponding to this release can be found at:
>> https://dist.apache.org/repos/dist/dev/spark/spark-2.2.1-rc1
>> -docs/_site/index.html
>>
>>
>> *FAQ*
>>
>> *How can I help test this release?*
>>
>> If you are a Spark user, you can help us test this release by taking an
>> existing Spark workload and running on this release candidate, then
>> reporting any regressions.
>>
>> If you're working in PySpark you can set up a virtual env and install the
>> current RC and see if anything important breaks, in the Java/Scala you can
>> add the staging repository to your projects resolvers and test with the RC
>> (make sure to clean up the artifact cache before/after so you don't end up
>> building with a out of date RC going forward).
>>
>> *What should happen to JIRA tickets still targeting 2.2.1?*
>>
>> Committers should look at those and triage. Extremely important bug
>> fixes, documentation, and API tweaks that impact compatibility should be
>> worked on immediately. Everything else please retarget to 2.2.2.
>>
>> *But my bug isn't fixed!??!*
>>
>> In order to make timely releases, we will typically not hold the release
>> unless the bug in question is a regression from 2.2.0. That being said if
>> there is something which is a regression form 2.2.0 that has not been
>> correctly targeted please ping a committer to help target the issue (you
>> can see the open issues listed as impacting Spark 2.2.1 / 2.2.2 here
>> 
>> .
>>
>> What are the unresolved issues targeted for 2.2.1
>> 
>> ?
>>
>> At the time of the writing, there is one resolved SPARK-22471
>>  would help
>> stability, and one in progress on joins SPARK-22042
>> 
>>
>>
>>
>>
>


Re: HashingTFModel/IDFModel in Structured Streaming

2017-11-15 Thread Jorge Sánchez
Hi,

after seeing that IDF needed refactoring to use ML vectors instead of MLLib
ones, I have created a Jira ticket in
 https://issues.apache.org/jira/browse/SPARK-22531
 and submitted a PR for
it.
If anyone can have a look and suggest any changes it would be really
appreciated.

Thank you.


2017-11-15 1:11 GMT+00:00 Bago Amirbekian :

> There is a known issue with VectorAssembler which causes it to fail in
> streaming if any of the input columns are of VectorType & don't have size
> information, https://issues.apache.org/jira/browse/SPARK-22346.
>
> This can be fixed by adding size information to the vector columns, I've
> made a PR to add a transformer to spark to help with this,
> https://github.com/apache/spark/pull/19746. It would be awesome if you
> could take a look and see if this would fix your issue.
>
> On Sun, Nov 12, 2017 at 5:37 PM Davis Varghese  wrote:
>
>> Bago,
>>
>> Finally I am able to create one which fails consistently. I think the
>> issue
>> is caused by the VectorAssembler in the model. In the new code, I have 2
>> features(1 text and 1 number) and I have to run through a VectorAssembler
>> before giving to LogisticRegression. Code and test data below
>>
>> import java.util.Arrays;
>> import java.util.List;
>> import org.apache.spark.ml.Pipeline;
>> import org.apache.spark.ml.PipelineModel;
>> import org.apache.spark.ml.PipelineStage;
>> import org.apache.spark.ml.classification.LogisticRegression;
>> import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator;
>> import org.apache.spark.ml.feature.CountVectorizer;
>> import org.apache.spark.ml.feature.CountVectorizerModel;
>> import org.apache.spark.ml.feature.IndexToString;
>> import org.apache.spark.ml.feature.StringIndexer;
>> import org.apache.spark.ml.feature.StringIndexerModel;
>> import org.apache.spark.ml.feature.Tokenizer;
>> import org.apache.spark.ml.feature.VectorAssembler;
>> import org.apache.spark.ml.param.ParamMap;
>> import org.apache.spark.ml.tuning.ParamGridBuilder;
>> import org.apache.spark.ml.tuning.TrainValidationSplit;
>> import org.apache.spark.ml.tuning.TrainValidationSplitModel;
>> import org.apache.spark.sql.Dataset;
>> import org.apache.spark.sql.Row;
>> import org.apache.spark.sql.RowFactory;
>> import org.apache.spark.sql.SparkSession;
>> import org.apache.spark.sql.streaming.StreamingQuery;
>> import org.apache.spark.sql.types.DataTypes;
>> import org.apache.spark.sql.types.Metadata;
>> import org.apache.spark.sql.types.StructField;
>> import org.apache.spark.sql.types.StructType;
>>
>> /**
>>  * A simple text classification pipeline that recognizes "spark" from
>> input
>> text.
>>  */
>> public class StreamingIssueCountVectorizerSplitFailed {
>>
>>   public static void main(String[] args) throws Exception {
>> SparkSession sparkSession =
>> SparkSession.builder().appName("StreamingIssueCountVectorizer")
>> .master("local[2]")
>> .getOrCreate();
>>
>> List _trainData = Arrays.asList(
>> RowFactory.create("sunny fantastic day", 1, "Positive"),
>> RowFactory.create("fantastic morning match", 1, "Positive"),
>> RowFactory.create("good morning", 1, "Positive"),
>> RowFactory.create("boring evening", 5, "Negative"),
>> RowFactory.create("tragic evening event", 5, "Negative"),
>> RowFactory.create("today is bad ", 5, "Negative")
>> );
>> List _testData = Arrays.asList(
>> RowFactory.create("sunny morning", 1),
>> RowFactory.create("bad evening", 5)
>> );
>> StructType schema = new StructType(new StructField[]{
>> new StructField("tweet", DataTypes.StringType, false,
>> Metadata.empty()),
>> new StructField("time", DataTypes.IntegerType, false,
>> Metadata.empty()),
>> new StructField("sentiment", DataTypes.StringType, true,
>> Metadata.empty())
>> });
>> StructType testSchema = new StructType(new StructField[]{
>> new StructField("tweet", DataTypes.StringType, false,
>> Metadata.empty()),
>> new StructField("time", DataTypes.IntegerType, false,
>> Metadata.empty())
>> });
>>
>> Dataset trainData = sparkSession.createDataFrame(_trainData,
>> schema);
>> Dataset testData = sparkSession.createDataFrame(_testData,
>> testSchema);
>> StringIndexerModel labelIndexerModel = new StringIndexer()
>> .setInputCol("sentiment")
>> .setOutputCol("label")
>> .setHandleInvalid("skip")
>> .fit(trainData);
>> Tokenizer tokenizer = new Tokenizer()
>> .setInputCol("tweet")
>> .setOutputCol("words");
>> CountVectorizer countVectorizer = new CountVectorizer()
>> .setInputCol(tokenizer.getOutputCol())
>> .setOutputCol("wordfeatures")
>> .setVocabSize(3)
>> .setMinDF(2)
>> .setMinTF(2)
>> .setBinary(true);
>>
>> VectorAssembler vectorAssembler = new 

[SQL] Why no numOutputRows metric for LocalTableScanExec in webUI?

2017-11-15 Thread Jacek Laskowski
Hi,

I've been playing with LocalTableScanExec and noticed that it
defines numOutputRows metric, but I couldn't find it in the diagram in web
UI's Details for Query in SQL tab. Why?

scala> spark.version
res1: String = 2.3.0-SNAPSHOT

scala> val hello = udf { s: String => s"Hello $s" }
hello: org.apache.spark.sql.expressions.UserDefinedFunction =
UserDefinedFunction(,StringType,Some(List(StringType)))

scala> Seq("Jacek").toDF("name").select(hello($"name")).show
+---+
|  UDF(name)|
+---+
|Hello Jacek|
+---+

http://localhost:4040/SQL/execution/?id=0 shows no metrics for
LocalTableScan. Is this intended?

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


Re: [VOTE] Spark 2.2.1 (RC1)

2017-11-15 Thread Xiao Li
Hi, Felix,

https://issues.apache.org/jira/browse/SPARK-22469

Maybe also include this regression of 2.2? It works in 2.1

Thanks,

Xiao



2017-11-14 22:25 GMT-08:00 Felix Cheung :

> Please vote on releasing the following candidate as Apache Spark version
> 2.2.1. The vote is open until Monday November 20, 2017 at 23:00 UTC and
> passes if a majority of at least 3 PMC +1 votes are cast.
>
>
> [ ] +1 Release this package as Apache Spark 2.2.1
>
> [ ] -1 Do not release this package because ...
>
>
> To learn more about Apache Spark, please see https://spark.apache.org/
>
>
> The tag to be voted on is v2.2.1-rc1 https://github.com/apache/
> spark/tree/v2.2.1-rc1 (41116ab7fca46db7255b01e8727e2e5d571a3e35)
>
> List of JIRA tickets resolved in this release can be found here
> https://issues.apache.org/jira/projects/SPARK/versions/12340470
>
>
> The release files, including signatures, digests, etc. can be found at:
> https://dist.apache.org/repos/dist/dev/spark/spark-2.2.1-rc1-bin/
>
> Release artifacts are signed with the following key:
> https://dist.apache.org/repos/dist/dev/spark/KEYS
>
> The staging repository for this release can be found at:
> https://repository.apache.org/content/repositories/orgapachespark-1256/
>
> The documentation corresponding to this release can be found at:
> https://dist.apache.org/repos/dist/dev/spark/spark-2.2.1-
> rc1-docs/_site/index.html
>
>
> *FAQ*
>
> *How can I help test this release?*
>
> If you are a Spark user, you can help us test this release by taking an
> existing Spark workload and running on this release candidate, then
> reporting any regressions.
>
> If you're working in PySpark you can set up a virtual env and install the
> current RC and see if anything important breaks, in the Java/Scala you can
> add the staging repository to your projects resolvers and test with the RC
> (make sure to clean up the artifact cache before/after so you don't end up
> building with a out of date RC going forward).
>
> *What should happen to JIRA tickets still targeting 2.2.1?*
>
> Committers should look at those and triage. Extremely important bug fixes,
> documentation, and API tweaks that impact compatibility should be worked on
> immediately. Everything else please retarget to 2.2.2.
>
> *But my bug isn't fixed!??!*
>
> In order to make timely releases, we will typically not hold the release
> unless the bug in question is a regression from 2.2.0. That being said if
> there is something which is a regression form 2.2.0 that has not been
> correctly targeted please ping a committer to help target the issue (you
> can see the open issues listed as impacting Spark 2.2.1 / 2.2.2 here
> 
> .
>
> What are the unresolved issues targeted for 2.2.1
> 
> ?
>
> At the time of the writing, there is one resolved SPARK-22471
>  would help stability,
> and one in progress on joins SPARK-22042
> 
>
>
>
>


Re: SPARK-22267 issue: Spark SQL incorrectly reads ORC file when column order is different

2017-11-15 Thread Mark Petruska
  Hi Dongjoon,
Thanks for the info.
Unfortunately I did not find any means to fix the issue without
forcing CONVERT_METASTORE_ORC
or changing the ORC reader implementation.
Closing the PR, as it was only used to demonstrate the root cause.
Best regards,
Mark

On Tue, Nov 14, 2017 at 6:58 PM, Dongjoon Hyun 
wrote:

> Hi, Mark.
>
> That is one of the reasons why I left it behind from the previous PR
> (below) and I'm focusing is the second approach; use OrcFileFormat with
> convertMetastoreOrc.
>
> https://github.com/apache/spark/pull/19470
> [SPARK-14387][SPARK-16628][SPARK-18355][SQL] Use Spark schema to read ORC
> table instead of ORC file schema
>
> With `convertMetastoreOrc=true`, Spark 2.3 will become stabler and faster.
> Also, it's the default Spark way to handle Parquet.
>
> BTW, thank you for looking at SPARK-22267. So far, I'm not looking at that
> issue.
> If we have a fix for SPARK-22267 in Spark 2.3, it would be great!
>
> Bests,
> Dongjoon.
>
>
> On Tue, Nov 14, 2017 at 3:46 AM, Mark Petruska 
> wrote:
>
>>   Hi,
>> I'm very new to spark development, and would like to get guidance from
>> more experienced members.
>> Sorry this email will be long as I try to explain the details.
>>
>> Started to investigate the issue SPARK-22267
>> ; added some test
>> cases to highlight the problem in the PR
>> . Here are my findings:
>>
>> - for parquet the test case succeeds as expected
>>
>> - the sql test case for orc:
>> - when CONVERT_METASTORE_ORC is set to "true" the data fields are
>> presented in the desired order
>> - when it is "false" the columns are read in the wrong order
>> - Reason: when `isConvertible` returns true in `RelationConversions`
>> the plan executes `convertToLogicalRelation`, which in turn uses
>> `OrcFileFormat` to read the data; otherwise it uses the classes in
>> "hive-exec:1.2.1".
>>
>> - the HadoopRDD test case was added to further investigate the parameter
>> values to discover a working combination, but unfortunately no combination
>> of "serialization.ddl" and "columns" result in success. It seems that those
>> fields do not have any effect on the order of the resulting data fields.
>>
>>
>> At this point I do not see any option to fix this issue without risking
>> "backward compatibility" problems.
>> The possible actions (as I see them):
>> - link a new version of "hive-exec": surely this bug has been fixed in a
>> newer version
>> - use `OrcFileFormat` for reading orc data regardless of the setting of
>> CONVERT_METASTORE_ORC
>> - also there's an `OrcNewInputFormat` class in "hive-exec", but it
>> implements an InputFormat interface from a different package, hence it is
>> incompatible with HadoopRDD at the moment
>>
>> Please help me. Did I miss some viable options?
>>
>> Thanks,
>> Mark
>>
>>
>