[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...

2017-10-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17819
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82583/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...

2017-10-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17819
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17819
  
**[Test build #82583 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82583/testReport)**
 for PR 17819 at commit 
[`1889995`](https://github.com/apache/spark/commit/1889995c12e55b2420726540756b4b0b69b1bb28).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19269
  
**[Test build #82589 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82589/testReport)**
 for PR 19269 at commit 
[`3e855a5`](https://github.com/apache/spark/commit/3e855a598c92b58d4abba02ac0f5d90c61b4aeab).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-10-10 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19269
  
> I'm not following what you mean here.

I'm answering the question of @steveloughran about the semantic of data 
writers. Ideally transaction means the readers can only see the data after it's 
committed, but there is no restriction to let the output of data writers be 
visible to other writers, so it's possible to launch a write task just for 
cleaning up the data of other writers.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19438: [SPARK-22208] [SQL] Improve percentile_approx by ...

2017-10-10 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19438#discussion_r143761026
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
 ---
@@ -58,7 +58,7 @@ class QuantileSummariesSuite extends SparkFunSuite {
 if (data.nonEmpty) {
   val approx = summary.query(quant).get
   // The rank of the approximation.
-  val rank = data.count(_ < approx) // has to be <, not <= to be exact
+  val rank = data.count(_ <= approx)
--- End diff --

Yes I think rounding up the average can solve the problem.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19181: [SPARK-21907][CORE] oom during spill

2017-10-10 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/19181
  
Looks good to me.
What do you think @hvanhovell ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18732
  
**[Test build #82587 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82587/testReport)**
 for PR 18732 at commit 
[`9c2b10e`](https://github.com/apache/spark/commit/9c2b10e16da4690afaa72599299346a62a0da668).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19082: [SPARK-21870][SQL] Split aggregation code into small fun...

2017-10-10 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/19082
  
To my best guess, trying to make huge method by method inlining will not 
fail JIT compilation in Hotspot. It may fail method inlining.

According to these blog entries 
[1](https://techblug.wordpress.com/2013/08/19/java-jit-compiler-inlining/), [2](
http://normanmaurer.me/blog/2014/05/15/Inline-all-the-Things/), we can see 
the following annotations for method inlining (explanations from [1])

1. **inline (hot)**: the method was determined hot and inlined
2. **too big**: the method was not inlined as the generated code was 
getting too big (but the method was not hot)
3. **hot method too big**: the method was determined hot but not inlined 
because the resulting code was getting too big.

Annotation 1. shows that inlining are suceeded. Annotation 2. or 3. shows 
that callee is too large. While 2. or 3. is shown in the compilation log [2], 
it still seems to succeed to compile a method. Of course, some method inling is 
are disabled.
[Here](https://issues.apache.org/jira/browse/HBASE-15431) is an interesting 
experiments. While they forced to inlining by increasing threshold, it is not 
easy to see performance improvement.

@viirya, is it answer to your question?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19424: [SPARK-22197][SQL] push down operators to data source be...

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19424
  
**[Test build #82586 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82586/testReport)**
 for PR 19424 at commit 
[`e8e8fee`](https://github.com/apache/spark/commit/e8e8feeeb54ae3e4f79157edb8b4f69886036cd0).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18732
  
**[Test build #82585 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82585/testReport)**
 for PR 18732 at commit 
[`b88a4d8`](https://github.com/apache/spark/commit/b88a4d8fd2c805b883a88eb24100e360c198726a).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19443: [SPARK-22212][SQL][PySpark] Some SQL functions in...

2017-10-10 Thread jsnowacki
Github user jsnowacki closed the pull request at:

https://github.com/apache/spark/pull/19443


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18732
  
**[Test build #82584 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82584/testReport)**
 for PR 18732 at commit 
[`a036f70`](https://github.com/apache/spark/commit/a036f70f89b6fadbbc3b8d80feecf2c086e32d18).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19443: [SPARK-22212][SQL][PySpark] Some SQL functions in Python...

2017-10-10 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19443
  
Let's resolve it as `Later` for now. Will keep my eyes on similar JIRAs and 
ping / cc you in the future. Thanks for bearing with me @jsnowacki.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19438: [SPARK-22208] [SQL] Improve percentile_approx by ...

2017-10-10 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19438#discussion_r143748535
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
 ---
@@ -58,7 +58,7 @@ class QuantileSummariesSuite extends SparkFunSuite {
 if (data.nonEmpty) {
   val approx = summary.query(quant).get
   // The rank of the approximation.
-  val rank = data.count(_ < approx) // has to be <, not <= to be exact
+  val rank = data.count(_ <= approx)
--- End diff --

It queries 50% quantile with relativeError 0.1, then targetError is 0.1*100 
= 10, so the expected rank should be in [40, 60].


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19443: [SPARK-22212][SQL][PySpark] Some SQL functions in Python...

2017-10-10 Thread jsnowacki
Github user jsnowacki commented on the issue:

https://github.com/apache/spark/pull/19443
  
OK, closing than. Should I leave the JIRA issue or close it as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18732: [SPARK-20396][SQL][PySpark] groupby().apply() with panda...

2017-10-10 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/18732
  
add to whitelist


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18979: [SPARK-21762][SQL] FileFormatWriter/BasicWriteTaskStatsT...

2017-10-10 Thread steveloughran
Github user steveloughran commented on the issue:

https://github.com/apache/spark/pull/18979
  
Has anyone had a look at this recently? 

The problem still exists, and while downstream filesystems can address if 
they recognise the use case & lie about values, they will be returning invalid 
values to the caller: spark will be reporting the wrong values. At least with 
this PR Spark will get to make the decisions about how to react itself.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143744197
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2181,30 +2187,66 @@ def udf(f=None, returnType=StringType()):
 @since(2.3)
 def pandas_udf(f=None, returnType=StringType()):
 """
-Creates a :class:`Column` expression representing a user defined 
function (UDF) that accepts
-`Pandas.Series` as input arguments and outputs a `Pandas.Series` of 
the same length.
+Creates a vectorized user defined function (UDF).
 
-:param f: python function if used as a standalone function
+:param f: user-defined function. A python function if used as a 
standalone function
 :param returnType: a :class:`pyspark.sql.types.DataType` object
 
->>> from pyspark.sql.types import IntegerType, StringType
->>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
->>> @pandas_udf(returnType=StringType())
-... def to_upper(s):
-... return s.str.upper()
-...
->>> @pandas_udf(returnType="integer")
-... def add_one(x):
-... return x + 1
-...
->>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", 
"age"))
->>> df.select(slen("name").alias("slen(name)"), to_upper("name"), 
add_one("age")) \\
-... .show()  # doctest: +SKIP
-+--+--++
-|slen(name)|to_upper(name)|add_one(age)|
-+--+--++
-| 8|  JOHN DOE|  22|
-+--+--++
+The user-defined function can define one of the following 
transformations:
+
+1. One or more `pandas.Series` -> A `pandas.Series`
+
+   This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
+   :meth:`pyspark.sql.DataFrame.select`.
+   The returnType should be a primitive data type, e.g., 
`DoubleType()`.
+   The length of the returned `pandas.Series` must be of the same as 
the input `pandas.Series`.
+
+   >>> from pyspark.sql.types import IntegerType, StringType
+   >>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
+   >>> @pandas_udf(returnType=StringType())
+   ... def to_upper(s):
+   ... return s.str.upper()
+   ...
+   >>> @pandas_udf(returnType="integer")
+   ... def add_one(x):
+   ... return x + 1
+   ...
+   >>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", 
"name", "age"))
+   >>> df.select(slen("name").alias("slen(name)"), to_upper("name"), 
add_one("age")) \\
+   ... .show()  # doctest: +SKIP
+   +--+--++
+   |slen(name)|to_upper(name)|add_one(age)|
+   +--+--++
+   | 8|  JOHN DOE|  22|
+   +--+--++
+
+2. A `pandas.DataFrame` -> A `pandas.DataFrame`
+
+   This udf is used with :meth:`pyspark.sql.GroupedData.apply`.
--- End diff --

Change to `This udf is only used with` and added `note`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143741944
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -44,14 +73,18 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], 
output: Seq[Attribute], chi
 val schemaOut = 
StructType.fromAttributes(output.drop(child.output.length).zipWithIndex
   .map { case (attr, i) => attr.withName(s"_$i") })
 
+val batchSize = conf.arrowMaxRecordsPerBatch
+// DO NOT use iter.grouped(). See BatchIterator.
+val batchIter = if (batchSize > 0) new BatchIterator(iter, batchSize) 
else Iterator(iter)
+
 val columnarBatchIter = new ArrowPythonRunner(
-funcs, conf.arrowMaxRecordsPerBatch, bufferSize, reuseWorker,
+funcs, bufferSize, reuseWorker,
 PythonEvalType.SQL_PANDAS_UDF, argOffsets, schema)
-  .compute(iter, context.partitionId(), context)
+  .compute(batchIter, context.partitionId(), context)
 
 new Iterator[InternalRow] {
 
-  var currentIter = if (columnarBatchIter.hasNext) {
+  private var currentIter = if (columnarBatchIter.hasNext) {
--- End diff --

I think so. The variable is reassigned for each columnar batch


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143740882
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression}
+
+/**
+ * Logical nodes specific to PySpark.
+ */
+
+/**
+ * FlatMap groups using a udf: pandas.Dataframe -> pandas.DataFrame.
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143740773
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, 
Expression}
+
+/**
+ * Logical nodes specific to PySpark.
+ */
--- End diff --

Removed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143740636
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -519,3 +519,4 @@ case class CoGroup(
 outputObjAttr: Attribute,
 left: LogicalPlan,
 right: LogicalPlan) extends BinaryNode with ObjectProducer
+
--- End diff --

Reverted


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143740157
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3376,6 +3376,151 @@ def test_vectorized_udf_empty_partition(self):
 res = df.select(f(col('id')))
 self.assertEquals(df.collect(), res.collect())
 
+def test_vectorized_udf_varargs(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 
2))
+f = pandas_udf(lambda *v: v[0], LongType())
+res = df.select(f(col('id')))
+self.assertEquals(df.collect(), res.collect())
+
+
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
+class GroupbyApplyTests(ReusedPySparkTestCase):
+@classmethod
+def setUpClass(cls):
+ReusedPySparkTestCase.setUpClass()
+cls.spark = SparkSession(cls.sc)
+
+@classmethod
+def tearDownClass(cls):
+ReusedPySparkTestCase.tearDownClass()
+cls.spark.stop()
+
+def assertFramesEqual(self, expected, result):
+msg = ("DataFrames are not equal: " +
+   ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) +
+   ("\n\nResult:\n%s\n%s" % (result, result.dtypes)))
+self.assertTrue(expected.equals(result), msg=msg)
+
+@property
+def data(self):
+from pyspark.sql.functions import array, explode, col, lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
+.withColumn("v", explode(col('vs'))).drop('vs')
+
+def test_simple(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+foo_udf = pandas_udf(
+lambda df: df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id),
+StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+
+result = df.groupby('id').apply(foo_udf).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_decorator(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+def foo(df):
+return df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id)
+
+result = df.groupby('id').apply(foo).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_coerce(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+foo = pandas_udf(
+lambda df: df,
+StructType([StructField('id', LongType()), StructField('v', 
DoubleType())]))
+
+result = df.groupby('id').apply(foo).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True)
+expected = expected.assign(v=expected.v.astype('float64'))
+self.assertFramesEqual(expected, result)
+
+def test_complex_groupby(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('norm', DoubleType())]))
+def normalize(pdf):
+v = pdf.v
+return pdf.assign(norm=(v - v.mean()) / v.std())
+
+result = df.groupby(col('id') % 2 == 
0).apply(normalize).sort('id', 'v').toPandas()
+pdf = df.toPandas()
+expected = pdf.groupby(pdf['id'] % 2 == 0).apply(normalize.func)
+expected = expected.sort_values(['id', 'v']).reset_index(drop=True)
+expected = expected.assign(norm=expected.norm.astype('float64'))
+self.assertFramesEqual(expected, result)
+
+def test_empty_groupby(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+

[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143740129
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3376,6 +3376,151 @@ def test_vectorized_udf_empty_partition(self):
 res = df.select(f(col('id')))
 self.assertEquals(df.collect(), res.collect())
 
+def test_vectorized_udf_varargs(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 
2))
+f = pandas_udf(lambda *v: v[0], LongType())
+res = df.select(f(col('id')))
+self.assertEquals(df.collect(), res.collect())
+
+
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
+class GroupbyApplyTests(ReusedPySparkTestCase):
+@classmethod
+def setUpClass(cls):
+ReusedPySparkTestCase.setUpClass()
+cls.spark = SparkSession(cls.sc)
+
+@classmethod
+def tearDownClass(cls):
+ReusedPySparkTestCase.tearDownClass()
+cls.spark.stop()
+
+def assertFramesEqual(self, expected, result):
+msg = ("DataFrames are not equal: " +
+   ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) +
+   ("\n\nResult:\n%s\n%s" % (result, result.dtypes)))
+self.assertTrue(expected.equals(result), msg=msg)
+
+@property
+def data(self):
+from pyspark.sql.functions import array, explode, col, lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
+.withColumn("v", explode(col('vs'))).drop('vs')
+
+def test_simple(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+foo_udf = pandas_udf(
+lambda df: df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id),
+StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+
+result = df.groupby('id').apply(foo_udf).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_decorator(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+def foo(df):
+return df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id)
+
+result = df.groupby('id').apply(foo).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo.func).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_coerce(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+foo = pandas_udf(
+lambda df: df,
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18732: [SPARK-20396][SQL][PySpark] groupby().apply() wit...

2017-10-10 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/18732#discussion_r143740078
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3376,6 +3376,151 @@ def test_vectorized_udf_empty_partition(self):
 res = df.select(f(col('id')))
 self.assertEquals(df.collect(), res.collect())
 
+def test_vectorized_udf_varargs(self):
+from pyspark.sql.functions import pandas_udf, col
+df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 
2))
+f = pandas_udf(lambda *v: v[0], LongType())
+res = df.select(f(col('id')))
+self.assertEquals(df.collect(), res.collect())
+
+
+@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not 
installed")
+class GroupbyApplyTests(ReusedPySparkTestCase):
+@classmethod
+def setUpClass(cls):
+ReusedPySparkTestCase.setUpClass()
+cls.spark = SparkSession(cls.sc)
+
+@classmethod
+def tearDownClass(cls):
+ReusedPySparkTestCase.tearDownClass()
+cls.spark.stop()
+
+def assertFramesEqual(self, expected, result):
+msg = ("DataFrames are not equal: " +
+   ("\n\nExpected:\n%s\n%s" % (expected, expected.dtypes)) +
+   ("\n\nResult:\n%s\n%s" % (result, result.dtypes)))
+self.assertTrue(expected.equals(result), msg=msg)
+
+@property
+def data(self):
+from pyspark.sql.functions import array, explode, col, lit
+return self.spark.range(10).toDF('id') \
+.withColumn("vs", array([lit(i) for i in range(20, 30)])) \
+.withColumn("v", explode(col('vs'))).drop('vs')
+
+def test_simple(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+foo_udf = pandas_udf(
+lambda df: df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id),
+StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+
+result = df.groupby('id').apply(foo_udf).sort('id').toPandas()
+expected = 
df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True)
+self.assertFramesEqual(expected, result)
+
+def test_decorator(self):
+from pyspark.sql.functions import pandas_udf
+df = self.data
+
+@pandas_udf(StructType(
+[StructField('id', LongType()),
+ StructField('v', IntegerType()),
+ StructField('v1', DoubleType()),
+ StructField('v2', LongType())]))
+def foo(df):
+return df.assign(v1=df.v * df.id * 1.0, v2=df.v + df.id)
--- End diff --

Fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19270: [SPARK-21809] : Change Stage Page to use datatables to s...

2017-10-10 Thread pgandhi999
Github user pgandhi999 commented on the issue:

https://github.com/apache/spark/pull/19270
  
@ajbozarth I do not quite understand what you are saying. Everything seems 
to be working fine on my test setup. Can you please let me know how do I 
replicate the issue? Thank you.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer t...

2017-10-10 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/17819#discussion_r143733664
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala 
---
@@ -96,9 +99,71 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") 
override val uid: String
   def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
   setDefault(handleInvalid, Bucketizer.ERROR_INVALID)
 
+  /**
+   * Parameter for specifying multiple splits parameters. Each element in 
this array can be used to
+   * map continuous features into buckets.
+   *
+   * @group param
+   */
+  @Since("2.3.0")
+  val splitsArray: DoubleArrayArrayParam = new DoubleArrayArrayParam(this, 
"splitsArray",
+"The array of split points for mapping continuous features into 
buckets for multiple " +
+  "columns. For each input column, with n+1 splits, there are n 
buckets. A bucket defined by " +
+  "splits x,y holds values in the range [x,y) except the last bucket, 
which also includes y. " +
+  "The splits should be of length >= 3 and strictly increasing. Values 
at -inf, inf must be " +
+  "explicitly provided to cover all Double values; otherwise, values 
outside the splits " +
+  "specified will be treated as errors.",
+Bucketizer.checkSplitsArray)
+
+  /**
+   * Param for output column names.
+   * @group param
+   */
+  @Since("2.3.0")
+  final val outputCols: StringArrayParam = new StringArrayParam(this, 
"outputCols",
--- End diff --

I guess similarly to shared params? I think it makes sense to add a shared 
param since this, `Imputer` and others will use it


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer t...

2017-10-10 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/17819#discussion_r143730103
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala ---
@@ -187,6 +188,196 @@ class BucketizerSuite extends SparkFunSuite with 
MLlibTestSparkContext with Defa
   }
 }
   }
+
+  test("multiple columns: Bucket continuous features, without -inf,inf") {
+// Check a set of valid feature values.
+val splits = Array(Array(-0.5, 0.0, 0.5), Array(-0.1, 0.3, 0.5))
+val validData1 = Array(-0.5, -0.3, 0.0, 0.2)
+val validData2 = Array(0.5, 0.3, 0.0, -0.1)
+val expectedBuckets1 = Array(0.0, 0.0, 1.0, 1.0)
+val expectedBuckets2 = Array(1.0, 1.0, 0.0, 0.0)
+
+val data = (0 until validData1.length).map { idx =>
+  (validData1(idx), validData2(idx), expectedBuckets1(idx), 
expectedBuckets2(idx))
+}
+val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", 
"expected1", "expected2")
+
+val bucketizer1: Bucketizer = new Bucketizer()
+  .setInputCols(Array("feature1", "feature2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setSplitsArray(splits)
+
+assert(bucketizer1.isBucketizeMultipleColumns())
+
+bucketizer1.transform(dataFrame).select("result1", "expected1", 
"result2", "expected2")
+  .collect().foreach {
+case Row(r1: Double, e1: Double, r2: Double, e2: Double) =>
+  assert(r1 === e1,
+s"The feature value is not correct after bucketing. Expected 
$e1 but found $r1")
+  assert(r2 === e2,
+s"The feature value is not correct after bucketing. Expected 
$e2 but found $r2")
+  }
+
+// Check for exceptions when using a set of invalid feature values.
+val invalidData1: Array[Double] = Array(-0.9) ++ validData1
+val invalidData2 = Array(0.51) ++ validData1
+val badDF1 = invalidData1.zipWithIndex.toSeq.toDF("feature", "idx")
+
+val bucketizer2: Bucketizer = new Bucketizer()
+  .setInputCols(Array("feature"))
+  .setOutputCols(Array("result"))
+  .setSplitsArray(Array(splits(0)))
+
+assert(bucketizer2.isBucketizeMultipleColumns())
+
+withClue("Invalid feature value -0.9 was not caught as an invalid 
feature!") {
+  intercept[SparkException] {
+bucketizer2.transform(badDF1).collect()
+  }
+}
+val badDF2 = invalidData2.zipWithIndex.toSeq.toDF("feature", "idx")
+withClue("Invalid feature value 0.51 was not caught as an invalid 
feature!") {
+  intercept[SparkException] {
+bucketizer2.transform(badDF2).collect()
+  }
+}
+  }
+
+  test("multiple columns: Bucket continuous features, with -inf,inf") {
+val splits = Array(
+  Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, 
Double.PositiveInfinity),
+  Array(Double.NegativeInfinity, -0.3, 0.2, 0.5, 
Double.PositiveInfinity))
+
+val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9)
+val validData2 = Array(-0.1, -0.5, -0.2, 0.0, 0.1, 0.3, 0.5)
+val expectedBuckets1 = Array(0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0)
+val expectedBuckets2 = Array(1.0, 0.0, 1.0, 1.0, 1.0, 2.0, 3.0)
+
+val data = (0 until validData1.length).map { idx =>
+  (validData1(idx), validData2(idx), expectedBuckets1(idx), 
expectedBuckets2(idx))
+}
+val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", 
"expected1", "expected2")
+
+val bucketizer: Bucketizer = new Bucketizer()
+  .setInputCols(Array("feature1", "feature2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setSplitsArray(splits)
+
+assert(bucketizer.isBucketizeMultipleColumns())
+
+bucketizer.transform(dataFrame).select("result1", "expected1", 
"result2", "expected2")
+  .collect().foreach {
+case Row(r1: Double, e1: Double, r2: Double, e2: Double) =>
+  assert(r1 === e1,
+s"The feature value is not correct after bucketing. Expected 
$e1 but found $r1")
+  assert(r2 === e2,
+s"The feature value is not correct after bucketing. Expected 
$e2 but found $r2")
+  }
+  }
+
+  test("multiple columns: Bucket continuous features, with NaN data but 
non-NaN splits") {
+val splits = Array(
+  Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, 
Double.PositiveInfinity),
+  Array(Double.NegativeInfinity, -0.1, 0.2, 0.6, 
Double.PositiveInfinity))
+
+val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, 
Double.NaN, Double.NaN, Double.NaN)
+val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, 0.8, 
Double.NaN, Double.NaN)

[GitHub] spark pull request #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer t...

2017-10-10 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/17819#discussion_r143728324
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala ---
@@ -187,6 +188,196 @@ class BucketizerSuite extends SparkFunSuite with 
MLlibTestSparkContext with Defa
   }
 }
   }
+
+  test("multiple columns: Bucket continuous features, without -inf,inf") {
+// Check a set of valid feature values.
+val splits = Array(Array(-0.5, 0.0, 0.5), Array(-0.1, 0.3, 0.5))
+val validData1 = Array(-0.5, -0.3, 0.0, 0.2)
+val validData2 = Array(0.5, 0.3, 0.0, -0.1)
+val expectedBuckets1 = Array(0.0, 0.0, 1.0, 1.0)
+val expectedBuckets2 = Array(1.0, 1.0, 0.0, 0.0)
+
+val data = (0 until validData1.length).map { idx =>
+  (validData1(idx), validData2(idx), expectedBuckets1(idx), 
expectedBuckets2(idx))
+}
+val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", 
"expected1", "expected2")
+
+val bucketizer1: Bucketizer = new Bucketizer()
+  .setInputCols(Array("feature1", "feature2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setSplitsArray(splits)
+
+assert(bucketizer1.isBucketizeMultipleColumns())
+
+bucketizer1.transform(dataFrame).select("result1", "expected1", 
"result2", "expected2")
+  .collect().foreach {
+case Row(r1: Double, e1: Double, r2: Double, e2: Double) =>
+  assert(r1 === e1,
+s"The feature value is not correct after bucketing. Expected 
$e1 but found $r1")
+  assert(r2 === e2,
+s"The feature value is not correct after bucketing. Expected 
$e2 but found $r2")
+  }
+
+// Check for exceptions when using a set of invalid feature values.
+val invalidData1: Array[Double] = Array(-0.9) ++ validData1
+val invalidData2 = Array(0.51) ++ validData1
+val badDF1 = invalidData1.zipWithIndex.toSeq.toDF("feature", "idx")
+
+val bucketizer2: Bucketizer = new Bucketizer()
+  .setInputCols(Array("feature"))
+  .setOutputCols(Array("result"))
+  .setSplitsArray(Array(splits(0)))
+
+assert(bucketizer2.isBucketizeMultipleColumns())
+
+withClue("Invalid feature value -0.9 was not caught as an invalid 
feature!") {
+  intercept[SparkException] {
+bucketizer2.transform(badDF1).collect()
+  }
+}
+val badDF2 = invalidData2.zipWithIndex.toSeq.toDF("feature", "idx")
+withClue("Invalid feature value 0.51 was not caught as an invalid 
feature!") {
+  intercept[SparkException] {
+bucketizer2.transform(badDF2).collect()
+  }
+}
+  }
+
+  test("multiple columns: Bucket continuous features, with -inf,inf") {
+val splits = Array(
+  Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, 
Double.PositiveInfinity),
+  Array(Double.NegativeInfinity, -0.3, 0.2, 0.5, 
Double.PositiveInfinity))
+
+val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9)
+val validData2 = Array(-0.1, -0.5, -0.2, 0.0, 0.1, 0.3, 0.5)
+val expectedBuckets1 = Array(0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0)
+val expectedBuckets2 = Array(1.0, 0.0, 1.0, 1.0, 1.0, 2.0, 3.0)
+
+val data = (0 until validData1.length).map { idx =>
+  (validData1(idx), validData2(idx), expectedBuckets1(idx), 
expectedBuckets2(idx))
+}
+val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", 
"expected1", "expected2")
+
+val bucketizer: Bucketizer = new Bucketizer()
+  .setInputCols(Array("feature1", "feature2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setSplitsArray(splits)
+
+assert(bucketizer.isBucketizeMultipleColumns())
+
+bucketizer.transform(dataFrame).select("result1", "expected1", 
"result2", "expected2")
+  .collect().foreach {
+case Row(r1: Double, e1: Double, r2: Double, e2: Double) =>
+  assert(r1 === e1,
+s"The feature value is not correct after bucketing. Expected 
$e1 but found $r1")
+  assert(r2 === e2,
+s"The feature value is not correct after bucketing. Expected 
$e2 but found $r2")
+  }
+  }
+
+  test("multiple columns: Bucket continuous features, with NaN data but 
non-NaN splits") {
+val splits = Array(
+  Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, 
Double.PositiveInfinity),
+  Array(Double.NegativeInfinity, -0.1, 0.2, 0.6, 
Double.PositiveInfinity))
+
+val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, 
Double.NaN, Double.NaN, Double.NaN)
+val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, 0.8, 
Double.NaN, Double.NaN)

[GitHub] spark pull request #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer t...

2017-10-10 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/17819#discussion_r143728663
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala ---
@@ -187,6 +188,196 @@ class BucketizerSuite extends SparkFunSuite with 
MLlibTestSparkContext with Defa
   }
 }
   }
+
+  test("multiple columns: Bucket continuous features, without -inf,inf") {
+// Check a set of valid feature values.
+val splits = Array(Array(-0.5, 0.0, 0.5), Array(-0.1, 0.3, 0.5))
+val validData1 = Array(-0.5, -0.3, 0.0, 0.2)
+val validData2 = Array(0.5, 0.3, 0.0, -0.1)
+val expectedBuckets1 = Array(0.0, 0.0, 1.0, 1.0)
+val expectedBuckets2 = Array(1.0, 1.0, 0.0, 0.0)
+
+val data = (0 until validData1.length).map { idx =>
+  (validData1(idx), validData2(idx), expectedBuckets1(idx), 
expectedBuckets2(idx))
+}
+val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", 
"expected1", "expected2")
+
+val bucketizer1: Bucketizer = new Bucketizer()
+  .setInputCols(Array("feature1", "feature2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setSplitsArray(splits)
+
+assert(bucketizer1.isBucketizeMultipleColumns())
+
+bucketizer1.transform(dataFrame).select("result1", "expected1", 
"result2", "expected2")
+  .collect().foreach {
+case Row(r1: Double, e1: Double, r2: Double, e2: Double) =>
+  assert(r1 === e1,
+s"The feature value is not correct after bucketing. Expected 
$e1 but found $r1")
+  assert(r2 === e2,
+s"The feature value is not correct after bucketing. Expected 
$e2 but found $r2")
+  }
+
+// Check for exceptions when using a set of invalid feature values.
+val invalidData1: Array[Double] = Array(-0.9) ++ validData1
+val invalidData2 = Array(0.51) ++ validData1
+val badDF1 = invalidData1.zipWithIndex.toSeq.toDF("feature", "idx")
+
+val bucketizer2: Bucketizer = new Bucketizer()
+  .setInputCols(Array("feature"))
+  .setOutputCols(Array("result"))
+  .setSplitsArray(Array(splits(0)))
+
+assert(bucketizer2.isBucketizeMultipleColumns())
+
+withClue("Invalid feature value -0.9 was not caught as an invalid 
feature!") {
+  intercept[SparkException] {
+bucketizer2.transform(badDF1).collect()
+  }
+}
+val badDF2 = invalidData2.zipWithIndex.toSeq.toDF("feature", "idx")
+withClue("Invalid feature value 0.51 was not caught as an invalid 
feature!") {
+  intercept[SparkException] {
+bucketizer2.transform(badDF2).collect()
+  }
+}
+  }
+
+  test("multiple columns: Bucket continuous features, with -inf,inf") {
+val splits = Array(
+  Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, 
Double.PositiveInfinity),
+  Array(Double.NegativeInfinity, -0.3, 0.2, 0.5, 
Double.PositiveInfinity))
+
+val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9)
+val validData2 = Array(-0.1, -0.5, -0.2, 0.0, 0.1, 0.3, 0.5)
+val expectedBuckets1 = Array(0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0)
+val expectedBuckets2 = Array(1.0, 0.0, 1.0, 1.0, 1.0, 2.0, 3.0)
+
+val data = (0 until validData1.length).map { idx =>
+  (validData1(idx), validData2(idx), expectedBuckets1(idx), 
expectedBuckets2(idx))
+}
+val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", 
"expected1", "expected2")
+
+val bucketizer: Bucketizer = new Bucketizer()
+  .setInputCols(Array("feature1", "feature2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setSplitsArray(splits)
+
+assert(bucketizer.isBucketizeMultipleColumns())
+
+bucketizer.transform(dataFrame).select("result1", "expected1", 
"result2", "expected2")
+  .collect().foreach {
+case Row(r1: Double, e1: Double, r2: Double, e2: Double) =>
+  assert(r1 === e1,
+s"The feature value is not correct after bucketing. Expected 
$e1 but found $r1")
+  assert(r2 === e2,
+s"The feature value is not correct after bucketing. Expected 
$e2 but found $r2")
+  }
+  }
+
+  test("multiple columns: Bucket continuous features, with NaN data but 
non-NaN splits") {
+val splits = Array(
+  Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, 
Double.PositiveInfinity),
+  Array(Double.NegativeInfinity, -0.1, 0.2, 0.6, 
Double.PositiveInfinity))
+
+val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, 
Double.NaN, Double.NaN, Double.NaN)
+val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, 0.8, 
Double.NaN, Double.NaN)

[GitHub] spark pull request #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer t...

2017-10-10 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/17819#discussion_r143728145
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala 
---
@@ -96,9 +99,71 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") 
override val uid: String
   def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
--- End diff --

We should make it clear that in the multi column case, the invalid handling 
is applied to all columns (so for `error` it will throw the error if any 
invalids are found in any column, for `skip` it will skip rows with any 
invalids in any column, etc)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer t...

2017-10-10 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/17819#discussion_r143724079
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala ---
@@ -187,6 +188,196 @@ class BucketizerSuite extends SparkFunSuite with 
MLlibTestSparkContext with Defa
   }
 }
   }
+
+  test("multiple columns: Bucket continuous features, without -inf,inf") {
+// Check a set of valid feature values.
+val splits = Array(Array(-0.5, 0.0, 0.5), Array(-0.1, 0.3, 0.5))
+val validData1 = Array(-0.5, -0.3, 0.0, 0.2)
+val validData2 = Array(0.5, 0.3, 0.0, -0.1)
+val expectedBuckets1 = Array(0.0, 0.0, 1.0, 1.0)
+val expectedBuckets2 = Array(1.0, 1.0, 0.0, 0.0)
+
+val data = (0 until validData1.length).map { idx =>
+  (validData1(idx), validData2(idx), expectedBuckets1(idx), 
expectedBuckets2(idx))
+}
+val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", 
"expected1", "expected2")
+
+val bucketizer1: Bucketizer = new Bucketizer()
+  .setInputCols(Array("feature1", "feature2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setSplitsArray(splits)
+
+assert(bucketizer1.isBucketizeMultipleColumns())
+
+bucketizer1.transform(dataFrame).select("result1", "expected1", 
"result2", "expected2")
+  .collect().foreach {
+case Row(r1: Double, e1: Double, r2: Double, e2: Double) =>
+  assert(r1 === e1,
+s"The feature value is not correct after bucketing. Expected 
$e1 but found $r1")
+  assert(r2 === e2,
+s"The feature value is not correct after bucketing. Expected 
$e2 but found $r2")
+  }
+
+// Check for exceptions when using a set of invalid feature values.
+val invalidData1: Array[Double] = Array(-0.9) ++ validData1
--- End diff --

Is this type annotation required?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer t...

2017-10-10 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/17819#discussion_r143710289
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala 
---
@@ -96,9 +99,71 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") 
override val uid: String
   def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
   setDefault(handleInvalid, Bucketizer.ERROR_INVALID)
 
+  /**
+   * Parameter for specifying multiple splits parameters. Each element in 
this array can be used to
+   * map continuous features into buckets.
+   *
+   * @group param
+   */
+  @Since("2.3.0")
+  val splitsArray: DoubleArrayArrayParam = new DoubleArrayArrayParam(this, 
"splitsArray",
+"The array of split points for mapping continuous features into 
buckets for multiple " +
+  "columns. For each input column, with n+1 splits, there are n 
buckets. A bucket defined by " +
+  "splits x,y holds values in the range [x,y) except the last bucket, 
which also includes y. " +
+  "The splits should be of length >= 3 and strictly increasing. Values 
at -inf, inf must be " +
+  "explicitly provided to cover all Double values; otherwise, values 
outside the splits " +
+  "specified will be treated as errors.",
+Bucketizer.checkSplitsArray)
+
+  /**
+   * Param for output column names.
+   * @group param
+   */
+  @Since("2.3.0")
+  final val outputCols: StringArrayParam = new StringArrayParam(this, 
"outputCols",
--- End diff --

why are we making this `final` (and not others)? (also the `getOutputCols`?)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer t...

2017-10-10 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/17819#discussion_r143724681
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala ---
@@ -187,6 +188,196 @@ class BucketizerSuite extends SparkFunSuite with 
MLlibTestSparkContext with Defa
   }
 }
   }
+
+  test("multiple columns: Bucket continuous features, without -inf,inf") {
+// Check a set of valid feature values.
+val splits = Array(Array(-0.5, 0.0, 0.5), Array(-0.1, 0.3, 0.5))
+val validData1 = Array(-0.5, -0.3, 0.0, 0.2)
+val validData2 = Array(0.5, 0.3, 0.0, -0.1)
+val expectedBuckets1 = Array(0.0, 0.0, 1.0, 1.0)
+val expectedBuckets2 = Array(1.0, 1.0, 0.0, 0.0)
+
+val data = (0 until validData1.length).map { idx =>
+  (validData1(idx), validData2(idx), expectedBuckets1(idx), 
expectedBuckets2(idx))
+}
+val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", 
"expected1", "expected2")
+
+val bucketizer1: Bucketizer = new Bucketizer()
+  .setInputCols(Array("feature1", "feature2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setSplitsArray(splits)
+
+assert(bucketizer1.isBucketizeMultipleColumns())
+
+bucketizer1.transform(dataFrame).select("result1", "expected1", 
"result2", "expected2")
+  .collect().foreach {
+case Row(r1: Double, e1: Double, r2: Double, e2: Double) =>
+  assert(r1 === e1,
+s"The feature value is not correct after bucketing. Expected 
$e1 but found $r1")
+  assert(r2 === e2,
+s"The feature value is not correct after bucketing. Expected 
$e2 but found $r2")
+  }
+
+// Check for exceptions when using a set of invalid feature values.
+val invalidData1: Array[Double] = Array(-0.9) ++ validData1
+val invalidData2 = Array(0.51) ++ validData1
+val badDF1 = invalidData1.zipWithIndex.toSeq.toDF("feature", "idx")
+
+val bucketizer2: Bucketizer = new Bucketizer()
+  .setInputCols(Array("feature"))
+  .setOutputCols(Array("result"))
+  .setSplitsArray(Array(splits(0)))
+
+assert(bucketizer2.isBucketizeMultipleColumns())
+
+withClue("Invalid feature value -0.9 was not caught as an invalid 
feature!") {
+  intercept[SparkException] {
+bucketizer2.transform(badDF1).collect()
+  }
+}
+val badDF2 = invalidData2.zipWithIndex.toSeq.toDF("feature", "idx")
+withClue("Invalid feature value 0.51 was not caught as an invalid 
feature!") {
+  intercept[SparkException] {
+bucketizer2.transform(badDF2).collect()
+  }
+}
+  }
+
+  test("multiple columns: Bucket continuous features, with -inf,inf") {
+val splits = Array(
+  Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, 
Double.PositiveInfinity),
+  Array(Double.NegativeInfinity, -0.3, 0.2, 0.5, 
Double.PositiveInfinity))
+
+val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9)
+val validData2 = Array(-0.1, -0.5, -0.2, 0.0, 0.1, 0.3, 0.5)
+val expectedBuckets1 = Array(0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0)
+val expectedBuckets2 = Array(1.0, 0.0, 1.0, 1.0, 1.0, 2.0, 3.0)
+
+val data = (0 until validData1.length).map { idx =>
+  (validData1(idx), validData2(idx), expectedBuckets1(idx), 
expectedBuckets2(idx))
+}
+val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", 
"expected1", "expected2")
--- End diff --

Same here, `toSeq` unnecessary?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer t...

2017-10-10 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/17819#discussion_r143730302
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala ---
@@ -187,6 +188,196 @@ class BucketizerSuite extends SparkFunSuite with 
MLlibTestSparkContext with Defa
   }
 }
   }
+
+  test("multiple columns: Bucket continuous features, without -inf,inf") {
+// Check a set of valid feature values.
+val splits = Array(Array(-0.5, 0.0, 0.5), Array(-0.1, 0.3, 0.5))
+val validData1 = Array(-0.5, -0.3, 0.0, 0.2)
+val validData2 = Array(0.5, 0.3, 0.0, -0.1)
+val expectedBuckets1 = Array(0.0, 0.0, 1.0, 1.0)
+val expectedBuckets2 = Array(1.0, 1.0, 0.0, 0.0)
+
+val data = (0 until validData1.length).map { idx =>
+  (validData1(idx), validData2(idx), expectedBuckets1(idx), 
expectedBuckets2(idx))
+}
+val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", 
"expected1", "expected2")
+
+val bucketizer1: Bucketizer = new Bucketizer()
+  .setInputCols(Array("feature1", "feature2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setSplitsArray(splits)
+
+assert(bucketizer1.isBucketizeMultipleColumns())
+
+bucketizer1.transform(dataFrame).select("result1", "expected1", 
"result2", "expected2")
+  .collect().foreach {
+case Row(r1: Double, e1: Double, r2: Double, e2: Double) =>
+  assert(r1 === e1,
+s"The feature value is not correct after bucketing. Expected 
$e1 but found $r1")
+  assert(r2 === e2,
+s"The feature value is not correct after bucketing. Expected 
$e2 but found $r2")
+  }
+
+// Check for exceptions when using a set of invalid feature values.
+val invalidData1: Array[Double] = Array(-0.9) ++ validData1
+val invalidData2 = Array(0.51) ++ validData1
+val badDF1 = invalidData1.zipWithIndex.toSeq.toDF("feature", "idx")
+
+val bucketizer2: Bucketizer = new Bucketizer()
+  .setInputCols(Array("feature"))
+  .setOutputCols(Array("result"))
+  .setSplitsArray(Array(splits(0)))
+
+assert(bucketizer2.isBucketizeMultipleColumns())
+
+withClue("Invalid feature value -0.9 was not caught as an invalid 
feature!") {
+  intercept[SparkException] {
+bucketizer2.transform(badDF1).collect()
+  }
+}
+val badDF2 = invalidData2.zipWithIndex.toSeq.toDF("feature", "idx")
+withClue("Invalid feature value 0.51 was not caught as an invalid 
feature!") {
+  intercept[SparkException] {
+bucketizer2.transform(badDF2).collect()
+  }
+}
+  }
+
+  test("multiple columns: Bucket continuous features, with -inf,inf") {
+val splits = Array(
+  Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, 
Double.PositiveInfinity),
+  Array(Double.NegativeInfinity, -0.3, 0.2, 0.5, 
Double.PositiveInfinity))
+
+val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9)
+val validData2 = Array(-0.1, -0.5, -0.2, 0.0, 0.1, 0.3, 0.5)
+val expectedBuckets1 = Array(0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0)
+val expectedBuckets2 = Array(1.0, 0.0, 1.0, 1.0, 1.0, 2.0, 3.0)
+
+val data = (0 until validData1.length).map { idx =>
+  (validData1(idx), validData2(idx), expectedBuckets1(idx), 
expectedBuckets2(idx))
+}
+val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", 
"expected1", "expected2")
+
+val bucketizer: Bucketizer = new Bucketizer()
+  .setInputCols(Array("feature1", "feature2"))
+  .setOutputCols(Array("result1", "result2"))
+  .setSplitsArray(splits)
+
+assert(bucketizer.isBucketizeMultipleColumns())
+
+bucketizer.transform(dataFrame).select("result1", "expected1", 
"result2", "expected2")
+  .collect().foreach {
+case Row(r1: Double, e1: Double, r2: Double, e2: Double) =>
+  assert(r1 === e1,
+s"The feature value is not correct after bucketing. Expected 
$e1 but found $r1")
+  assert(r2 === e2,
+s"The feature value is not correct after bucketing. Expected 
$e2 but found $r2")
+  }
+  }
+
+  test("multiple columns: Bucket continuous features, with NaN data but 
non-NaN splits") {
+val splits = Array(
+  Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, 
Double.PositiveInfinity),
+  Array(Double.NegativeInfinity, -0.1, 0.2, 0.6, 
Double.PositiveInfinity))
+
+val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, 
Double.NaN, Double.NaN, Double.NaN)
+val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, 0.8, 
Double.NaN, Double.NaN)

[GitHub] spark pull request #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer t...

2017-10-10 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/17819#discussion_r143730685
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala 
---
@@ -24,20 +24,23 @@ import org.apache.spark.annotation.Since
 import org.apache.spark.ml.Model
 import org.apache.spark.ml.attribute.NominalAttribute
 import org.apache.spark.ml.param._
-import org.apache.spark.ml.param.shared.{HasHandleInvalid, HasInputCol, 
HasOutputCol}
+import org.apache.spark.ml.param.shared.{HasHandleInvalid, HasInputCol, 
HasInputCols, HasOutputCol}
 import org.apache.spark.ml.util._
 import org.apache.spark.sql._
 import org.apache.spark.sql.expressions.UserDefinedFunction
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
 
 /**
- * `Bucketizer` maps a column of continuous features to a column of 
feature buckets.
+ * `Bucketizer` maps a column of continuous features to a column of 
feature buckets. Since 2.3.0,
+ * `Bucketizer` can also map multiple columns at once. Whether it goes to 
map a column or multiple
+ * columns, it depends on which parameter of `inputCol` and `inputCols` is 
set. When both are set,
+ * a log warning will be printed and by default it chooses `inputCol`.
--- End diff --

We should probably also mention that `splits` is only used for single 
column and `splitsArray` for multi column


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer t...

2017-10-10 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/17819#discussion_r143722947
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala ---
@@ -187,6 +188,196 @@ class BucketizerSuite extends SparkFunSuite with 
MLlibTestSparkContext with Defa
   }
 }
   }
+
+  test("multiple columns: Bucket continuous features, without -inf,inf") {
+// Check a set of valid feature values.
+val splits = Array(Array(-0.5, 0.0, 0.5), Array(-0.1, 0.3, 0.5))
+val validData1 = Array(-0.5, -0.3, 0.0, 0.2)
+val validData2 = Array(0.5, 0.3, 0.0, -0.1)
+val expectedBuckets1 = Array(0.0, 0.0, 1.0, 1.0)
+val expectedBuckets2 = Array(1.0, 1.0, 0.0, 0.0)
+
+val data = (0 until validData1.length).map { idx =>
+  (validData1(idx), validData2(idx), expectedBuckets1(idx), 
expectedBuckets2(idx))
+}
+val dataFrame: DataFrame = data.toSeq.toDF("feature1", "feature2", 
"expected1", "expected2")
--- End diff --

`toSeq` not required here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer t...

2017-10-10 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/17819#discussion_r143708258
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala 
---
@@ -24,20 +24,23 @@ import org.apache.spark.annotation.Since
 import org.apache.spark.ml.Model
 import org.apache.spark.ml.attribute.NominalAttribute
 import org.apache.spark.ml.param._
-import org.apache.spark.ml.param.shared.{HasHandleInvalid, HasInputCol, 
HasOutputCol}
+import org.apache.spark.ml.param.shared.{HasHandleInvalid, HasInputCol, 
HasInputCols, HasOutputCol}
 import org.apache.spark.ml.util._
 import org.apache.spark.sql._
 import org.apache.spark.sql.expressions.UserDefinedFunction
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
 
 /**
- * `Bucketizer` maps a column of continuous features to a column of 
feature buckets.
+ * `Bucketizer` maps a column of continuous features to a column of 
feature buckets. Since 2.3.0,
+ * `Bucketizer` can also map multiple columns at once. Whether it goes to 
map a column or multiple
--- End diff --

Perhaps:

>Since 2.3.0, `Bucketizer` can map multiple columns at once by setting the 
`inputCols` parameter. Note that when both the `inputCol` and `inputCols` 
parameters are set, a log warning will be printed and only `inputCol` will take 
effect, while `inputCols` will be ignored.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer t...

2017-10-10 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/17819#discussion_r143712667
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala 
---
@@ -108,26 +173,53 @@ final class Bucketizer @Since("1.4.0") 
(@Since("1.4.0") override val uid: String
   }
 }
 
-val bucketizer: UserDefinedFunction = udf { (feature: Double) =>
-  Bucketizer.binarySearchForBuckets($(splits), feature, keepInvalid)
-}.withName("bucketizer")
+val seqOfSplits = if (isBucketizeMultipleColumns()) {
+  $(splitsArray).toSeq
+} else {
+  Seq($(splits))
+}
 
-val newCol = bucketizer(filteredDataset($(inputCol)).cast(DoubleType))
-val newField = prepOutputField(filteredDataset.schema)
-filteredDataset.withColumn($(outputCol), newCol, newField.metadata)
+val bucketizers: Seq[UserDefinedFunction] = 
seqOfSplits.zipWithIndex.map { case (splits, idx) =>
+  udf { (feature: Double) =>
+Bucketizer.binarySearchForBuckets(splits, feature, keepInvalid)
+  }.withName(s"bucketizer_$idx")
+}
+
+val (inputColumns, outputColumns) = if (isBucketizeMultipleColumns()) {
+  ($(inputCols).toSeq, $(outputCols).toSeq)
+} else {
+  (Seq($(inputCol)), Seq($(outputCol)))
+}
+val newCols = inputColumns.zipWithIndex.map { case (inputCol, idx) =>
+  bucketizers(idx)(filteredDataset(inputCol).cast(DoubleType))
+}
+val newFields = outputColumns.zipWithIndex.map { case (outputCol, idx) 
=>
--- End diff --

Have we not done this already in `transformSchema`? Can we just re-use the 
result of that?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer t...

2017-10-10 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/17819#discussion_r143713315
  
--- Diff: 
examples/src/main/java/org/apache/spark/examples/ml/JavaBucketizerExample.java 
---
@@ -33,6 +33,13 @@
 import org.apache.spark.sql.types.StructType;
 // $example off$
 
--- End diff --

No Scala example?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer t...

2017-10-10 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/17819#discussion_r143723205
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala 
---
@@ -96,9 +99,71 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") 
override val uid: String
   def setHandleInvalid(value: String): this.type = set(handleInvalid, 
value)
   setDefault(handleInvalid, Bucketizer.ERROR_INVALID)
 
+  /**
+   * Parameter for specifying multiple splits parameters. Each element in 
this array can be used to
+   * map continuous features into buckets.
+   *
+   * @group param
+   */
+  @Since("2.3.0")
+  val splitsArray: DoubleArrayArrayParam = new DoubleArrayArrayParam(this, 
"splitsArray",
+"The array of split points for mapping continuous features into 
buckets for multiple " +
+  "columns. For each input column, with n+1 splits, there are n 
buckets. A bucket defined by " +
+  "splits x,y holds values in the range [x,y) except the last bucket, 
which also includes y. " +
+  "The splits should be of length >= 3 and strictly increasing. Values 
at -inf, inf must be " +
+  "explicitly provided to cover all Double values; otherwise, values 
outside the splits " +
+  "specified will be treated as errors.",
+Bucketizer.checkSplitsArray)
+
+  /**
+   * Param for output column names.
+   * @group param
+   */
+  @Since("2.3.0")
+  final val outputCols: StringArrayParam = new StringArrayParam(this, 
"outputCols",
+"output column names")
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getSplitsArray: Array[Array[Double]] = $(splitsArray)
+
+  /** @group getParam */
+  @Since("2.3.0")
+  final def getOutputCols: Array[String] = $(outputCols)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setSplitsArray(value: Array[Array[Double]]): this.type = 
set(splitsArray, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setInputCols(value: Array[String]): this.type = set(inputCols, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setOutputCols(value: Array[String]): this.type = set(outputCols, 
value)
+
+  /**
+   * Determines whether this `Bucketizer` is going to map multiple 
columns. If and only if
+   * `inputCols` is set, it will map multiple columns. Otherwise, it just 
maps a column specified
+   * by `inputCol`. A warning will be printed if both are set.
+   */
+  private[ml] def isBucketizeMultipleColumns(): Boolean = {
--- End diff --

`private[feature]`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19020: [SPARK-3181] [ML] Implement huber loss for Linear...

2017-10-10 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19020#discussion_r143727850
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala ---
@@ -208,6 +292,26 @@ class LinearRegression @Since("1.3.0") 
(@Since("1.3.0") override val uid: String
   def setAggregationDepth(value: Int): this.type = set(aggregationDepth, 
value)
   setDefault(aggregationDepth -> 2)
 
+  /**
+   * Sets the value of param [[loss]].
+   * Default is "squaredError".
+   *
+   * @group setParam
+   */
+  @Since("2.3.0")
+  def setLoss(value: String): this.type = set(loss, value)
+  setDefault(loss -> SquaredError)
+
+  /**
+   * Sets the value of param [[epsilon]].
+   * Default is 1.35.
+   *
+   * @group setExpertParam
+   */
+  @Since("2.3.0")
+  def setEpsilon(value: Double): this.type = set(epsilon, value)
--- End diff --

Document `epsilon` param more clearly, including the comment that it 
matches sklearn and is "M" from the paper.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19020: [SPARK-3181] [ML] Implement huber loss for Linear...

2017-10-10 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19020#discussion_r143726258
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala ---
@@ -744,11 +754,20 @@ object LinearRegressionModel extends 
MLReadable[LinearRegressionModel] {
 
   val dataPath = new Path(path, "data").toString
   val data = sparkSession.read.format("parquet").load(dataPath)
-  val Row(intercept: Double, coefficients: Vector) =
-MLUtils.convertVectorColumnsToML(data, "coefficients")
-  .select("intercept", "coefficients")
-  .head()
-  val model = new LinearRegressionModel(metadata.uid, coefficients, 
intercept)
+  val (majorVersion, minorVersion) = 
majorMinorVersion(metadata.sparkVersion)
+  val model = if (majorVersion < 2 || (majorVersion == 2 && 
minorVersion <= 2)) {
+// Spark 2.2 and before
+val Row(intercept: Double, coefficients: Vector) =
+  MLUtils.convertVectorColumnsToML(data, "coefficients")
+.select("intercept", "coefficients")
+.head()
+new LinearRegressionModel(metadata.uid, coefficients, intercept)
+  } else {
+// Spark 2.3 and later
+val Row(intercept: Double, coefficients: Vector, scale: Double) =
+  data.select("intercept", "coefficients", "scale").head()
+new LinearRegressionModel(metadata.uid, coefficients, intercept, 
scale)
+  }
--- End diff --

Have you test manually, saving model by spark 2.2, and then loading model 
by this PR code, to check backwards compatibility ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19020: [SPARK-3181] [ML] Implement huber loss for Linear...

2017-10-10 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19020#discussion_r143727489
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala 
---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.ml.optim.aggregator
+
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg.Vector
+
+/**
+ * HuberAggregator computes the gradient and loss for a huber loss 
function,
+ * as used in robust regression for samples in sparse or dense vector in 
an online fashion.
+ *
+ * The huber loss function based on:
+ * http://statweb.stanford.edu/~owen/reports/hhu.pdf;>Art B. Owen 
(2006),
+ * A robust hybrid of lasso and ridge regression.
+ *
+ * Two HuberAggregator can be merged together to have a summary of loss 
and gradient of
+ * the corresponding joint dataset.
+ *
+ * The huber loss function is given by
+ *
+ * 
+ *   $$
+ *   \begin{align}
+ *   \min_{w, \sigma}\frac{1}{2n}{\sum_{i=1}^n\left(\sigma +
+ *   H_m\left(\frac{X_{i}w - y_{i}}{\sigma}\right)\sigma\right) + 
\frac{1}{2}\lambda {||w||_2}^2}
+ *   \end{align}
+ *   $$
+ * 
+ *
+ * where
+ *
+ * 
+ *   $$
+ *   \begin{align}
+ *   H_m(z) = \begin{cases}
+ *z^2, & \text {if } |z|  \epsilon, \\
+ *2\epsilon|z| - \epsilon^2, & \text{otherwise}
+ *\end{cases}
+ *   \end{align}
+ *   $$
+ * 
+ *
+ * It is advised to set the parameter $\epsilon$ to 1.35 to achieve 95% 
statistical efficiency
+ * for normally distributed data. Please refer to chapter 2 of
+ * http://statweb.stanford.edu/~owen/reports/hhu.pdf;>
+ * A robust hybrid of lasso and ridge regression for more detail.
+ *
+ * @param fitIntercept Whether to fit an intercept term.
+ * @param epsilon The shape parameter to control the amount of robustness.
--- End diff --

Document `epsilon` param more clearly, including the comment that it 
matches sklearn and is "M" from the paper.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19337: [SPARK-22114][ML][MLLIB]add epsilon for LDA

2017-10-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19337
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82582/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19337: [SPARK-22114][ML][MLLIB]add epsilon for LDA

2017-10-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19337
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19337: [SPARK-22114][ML][MLLIB]add epsilon for LDA

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19337
  
**[Test build #82582 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82582/testReport)**
 for PR 19337 at commit 
[`6a3c6a6`](https://github.com/apache/spark/commit/6a3c6a684b4c7bd6109cf7e228a49341b23e1f2c).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19337: [SPARK-22114][ML][MLLIB]add epsilon for LDA

2017-10-10 Thread mpjlu
Github user mpjlu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19337#discussion_r143723408
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -573,7 +584,8 @@ private[clustering] object OnlineLDAOptimizer {
   expElogbeta: BDM[Double],
   alpha: breeze.linalg.Vector[Double],
   gammaShape: Double,
-  k: Int): (BDV[Double], BDM[Double], List[Int]) = {
+  k: Int,
+  epsilon: Double = 1e-3): (BDV[Double], BDM[Double], List[Int]) = {
--- End diff --

Maybe not need to set, we can add epsilon as a parameter in the model to 
save it. Because most of training parameters are not in the model, so we don't 
add epsilon to the model here. Thanks. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19337: [SPARK-22114][ML][MLLIB]add epsilon for LDA

2017-10-10 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19337#discussion_r143720272
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
@@ -573,7 +584,8 @@ private[clustering] object OnlineLDAOptimizer {
   expElogbeta: BDM[Double],
   alpha: breeze.linalg.Vector[Double],
   gammaShape: Double,
-  k: Int): (BDV[Double], BDM[Double], List[Int]) = {
+  k: Int,
+  epsilon: Double = 1e-3): (BDV[Double], BDM[Double], List[Int]) = {
--- End diff --

shouldn't epsilon be set in LDAModel as well to reflect the value which was 
used while training the model?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17819
  
**[Test build #82583 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82583/testReport)**
 for PR 17819 at commit 
[`1889995`](https://github.com/apache/spark/commit/1889995c12e55b2420726540756b4b0b69b1bb28).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compression.co...

2017-10-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19218
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compression.co...

2017-10-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19218
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82580/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compression.co...

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19218
  
**[Test build #82580 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82580/testReport)**
 for PR 19218 at commit 
[`dd6d635`](https://github.com/apache/spark/commit/dd6d635eee108706296e13a6d09d0c79ff912f13).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19363: [SPARK-22224][Minor]Override toString of KeyValue...

2017-10-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19363#discussion_r143717543
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala ---
@@ -564,4 +565,30 @@ class KeyValueGroupedDataset[K, V] private[sql](
   encoder: Encoder[R]): Dataset[R] = {
 cogroup(other)((key, left, right) => f.call(key, left.asJava, 
right.asJava).asScala)(encoder)
   }
+
+  override def toString: String = {
+try {
+  val builder = new StringBuilder
+  val kFields = kExprEnc.schema.map {
+case f => s"${f.name}: ${f.dataType.simpleString(2)}"
+  }
+  val vFields = vExprEnc.schema.map {
+case f => s"${f.name}: ${f.dataType.simpleString(2)}"
+  }
+  builder.append("[key: [")
+  builder.append(kFields.take(2).mkString(", "))
+  if (kFields.length > 2) {
+builder.append(" ... " + (kFields.length - 2) + " more field(s)")
+  }
+  builder.append("], value: [")
+  builder.append(vFields.take(2).mkString(", "))
+  if (vFields.length > 2) {
+builder.append(" ... " + (vFields.length - 2) + " more field(s)")
+  }
+  builder.append("]]").toString()
+} catch {
+  case NonFatal(e) =>
--- End diff --

same question


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17357: [SPARK-20025][CORE] Ignore SPARK_LOCAL* env, whil...

2017-10-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/17357


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17357: [SPARK-20025][CORE] Ignore SPARK_LOCAL* env, while deplo...

2017-10-10 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/17357
  
LGTM, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19337: [SPARK-22114][ML][MLLIB]add epsilon for LDA

2017-10-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19337
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19337: [SPARK-22114][ML][MLLIB]add epsilon for LDA

2017-10-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19337
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82581/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19337: [SPARK-22114][ML][MLLIB]add epsilon for LDA

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19337
  
**[Test build #82581 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82581/testReport)**
 for PR 19337 at commit 
[`7814968`](https://github.com/apache/spark/commit/78149684e504669bc6a5357de14f339f352ee1ad).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18711: [SPARK-21506][DOC]The description of "spark.execu...

2017-10-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/18711


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18711: [SPARK-21506][DOC]The description of "spark.executor.cor...

2017-10-10 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/18711
  
LGTM, merging to master!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19337: [SPARK-22114][ML][MLLIB]add epsilon for LDA

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19337
  
**[Test build #82582 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82582/testReport)**
 for PR 19337 at commit 
[`6a3c6a6`](https://github.com/apache/spark/commit/6a3c6a684b4c7bd6109cf7e228a49341b23e1f2c).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...

2017-10-10 Thread MLnick
Github user MLnick commented on the issue:

https://github.com/apache/spark/pull/17819
  
Yes, fair enough

On Tue, 10 Oct 2017 at 14:09 Liang-Chi Hsieh 
wrote:

> *@viirya* commented on this pull request.
> --
>
> In sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
> :
>
> > @@ -684,6 +684,34 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
>  }
>}
>
>
> I think as withColumn case, we can re-implement it with withColumns for
> metadata too. So this test case can cover it.
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> , or 
mute
> the thread
> 

> .
>



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer t...

2017-10-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/17819#discussion_r143707170
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
---
@@ -684,6 +684,34 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
 }
   }
 
--- End diff --

I think as `withColumn` case, we can re-implement it with `withColumns` for 
metadata too. So this test case can cover it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer t...

2017-10-10 Thread MLnick
Github user MLnick commented on a diff in the pull request:

https://github.com/apache/spark/pull/17819#discussion_r143706308
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
---
@@ -684,6 +684,34 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
 }
   }
 
--- End diff --

I noticed we don't have a test for the single column `withColumn: given 
metadata`. I wonder if that should be added (though it's not related to this PR)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19337: [SPARK-22114][ML][MLLIB]add epsilon for LDA

2017-10-10 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19337#discussion_r143705705
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala ---
@@ -119,6 +121,8 @@ class LDASuite extends SparkFunSuite with 
MLlibTestSparkContext with DefaultRead
 assert(lda.getLearningDecay === 0.53)
 lda.setLearningOffset(1027)
 assert(lda.getLearningOffset === 1027)
+lda.setEpsilon(1e-3)
--- End diff --

Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19337: [SPARK-22114][ML][MLLIB]add epsilon for LDA

2017-10-10 Thread mpjlu
Github user mpjlu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19337#discussion_r143705102
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala ---
@@ -119,6 +121,8 @@ class LDASuite extends SparkFunSuite with 
MLlibTestSparkContext with DefaultRead
 assert(lda.getLearningDecay === 0.53)
 lda.setLearningOffset(1027)
 assert(lda.getLearningOffset === 1027)
+lda.setEpsilon(1e-3)
--- End diff --

Thanks @mgaido91 , I will update the code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19337: [SPARK-22114][ML][MLLIB]add epsilon for LDA

2017-10-10 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19337#discussion_r143704472
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/clustering/LDASuite.scala ---
@@ -119,6 +121,8 @@ class LDASuite extends SparkFunSuite with 
MLlibTestSparkContext with DefaultRead
 assert(lda.getLearningDecay === 0.53)
 lda.setLearningOffset(1027)
 assert(lda.getLearningOffset === 1027)
+lda.setEpsilon(1e-3)
--- End diff --

here you should test a value different from the default, otherwise this 
tests nothing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17968: [SPARK-9792] Make DenseMatrix equality semantical

2017-10-10 Thread WeichenXu123
Github user WeichenXu123 commented on the issue:

https://github.com/apache/spark/pull/17968
  
@gatorsmile Add this to white list!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17968: [SPARK-9792] Make DenseMatrix equality semantical

2017-10-10 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17968#discussion_r143702830
  
--- Diff: python/pyspark/ml/linalg/__init__.py ---
@@ -976,14 +976,18 @@ def __getitem__(self, indices):
 return self.values[i + j * self.numRows]
 
 def __eq__(self, other):
+
+if isinstance(other, SparseMatrix):
+return np.all(self.toArray() == other.toArray())
 if (not isinstance(other, DenseMatrix) or
--- End diff --

The condition not `isinstance(other, DenseMatrix)` is useless. remove it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17968: [SPARK-9792] Make DenseMatrix equality semantical

2017-10-10 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17968#discussion_r143702719
  
--- Diff: python/pyspark/mllib/linalg/__init__.py ---
@@ -1131,14 +1131,17 @@ def __getitem__(self, indices):
 return self.values[i + j * self.numRows]
 
 def __eq__(self, other):
+if isinstance(other, SparseMatrix):
+return np.all(self.toArray() == other.toArray())
 if (not isinstance(other, DenseMatrix) or
--- End diff --

ditto.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19337: [SPARK-22114][ML][MLLIB]add epsilon for LDA

2017-10-10 Thread mpjlu
Github user mpjlu commented on the issue:

https://github.com/apache/spark/pull/19337
  
Thanks, @hhbyyh.
 I will create a JIRA for python API  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19337: [SPARK-22114][ML][MLLIB]add epsilon for LDA

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19337
  
**[Test build #82581 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82581/testReport)**
 for PR 19337 at commit 
[`7814968`](https://github.com/apache/spark/commit/78149684e504669bc6a5357de14f339f352ee1ad).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #6751: [SPARK-8300] DataFrame hint for broadcast join.

2017-10-10 Thread fjh100456
Github user fjh100456 commented on the issue:

https://github.com/apache/spark/pull/6751
  
@rxin  @marmbrus 
Is there another way to broadcast table with the spark-sql now, except by 
`spark.sql.autoBroadcastJoinThreshold`?
And if no, is it a good way to broadcast table by user conf,such as 
`spark.sql.autoBroadcastJoin.mytable=true` to broadcast a table named `mytable`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18711: [SPARK-21506][DOC]The description of "spark.executor.cor...

2017-10-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18711
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18711: [SPARK-21506][DOC]The description of "spark.executor.cor...

2017-10-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18711
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82578/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18711: [SPARK-21506][DOC]The description of "spark.executor.cor...

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18711
  
**[Test build #82578 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82578/testReport)**
 for PR 18711 at commit 
[`440f936`](https://github.com/apache/spark/commit/440f93611e21011eeb29f0889196463c96d03deb).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compression.co...

2017-10-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19218
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16648: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...

2017-10-10 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/16648
  
@bdrillard Thank you very much


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compression.co...

2017-10-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19218
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/82579/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compression.co...

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19218
  
**[Test build #82579 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82579/testReport)**
 for PR 19218 at commit 
[`90cbcb3`](https://github.com/apache/spark/commit/90cbcb3c58e115995eaa58f61a9cc818d2f17cdf).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19464: Spark 22233

2017-10-10 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/19464
  
Could you please update the title of this PR appropriately? e.g. 
`[SPARK-22233][core] ...`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compression.co...

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19218
  
**[Test build #82580 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82580/testReport)**
 for PR 19218 at commit 
[`dd6d635`](https://github.com/apache/spark/commit/dd6d635eee108706296e13a6d09d0c79ff912f13).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r143689831
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala 
---
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.image
+
+import java.awt.Color
+import java.awt.color.ColorSpace
+import java.io.ByteArrayInputStream
+import javax.imageio.ImageIO
+
+import org.apache.spark.annotation.Since
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.types._
+
+@Since("2.3.0")
+object ImageSchema {
+
+  val undefinedImageType = "Undefined"
+
+  val ocvTypes = Map(
+undefinedImageType -> -1,
+"CV_8U" -> 0, "CV_8UC1" -> 0, "CV_8UC2" -> 8, "CV_8UC3" -> 16, 
"CV_8UC4" -> 24,
+"CV_8S" -> 1, "CV_8SC1" -> 1, "CV_8SC2" -> 9, "CV_8SC3" -> 17, 
"CV_8SC4" -> 25,
+"CV_16U" -> 2, "CV_16UC1" -> 2, "CV_16UC2" -> 10, "CV_16UC3" -> 18, 
"CV_16UC4" -> 26,
+"CV_16S" -> 3, "CV_16SC1" -> 3, "CV_16SC2" -> 11, "CV_16SC3" -> 19, 
"CV_16SC4" -> 27,
+"CV_32S" -> 4, "CV_32SC1" -> 4, "CV_32SC2" -> 12, "CV_32SC3" -> 20, 
"CV_32SC4" -> 28,
+"CV_32F" -> 5, "CV_32FC1" -> 5, "CV_32FC2" -> 13, "CV_32FC3" -> 21, 
"CV_32FC4" -> 29,
+"CV_64F" -> 6, "CV_64FC1" -> 6, "CV_64FC2" -> 14, "CV_64FC3" -> 22, 
"CV_64FC4" -> 30
+  )
+
+  /** Schema for the image column: Row(String, Int, Int, Int, Array[Byte]) 
*/
+  val columnSchema = StructType(
+StructField("origin", StringType, true) ::
+  StructField("height", IntegerType, false) ::
+  StructField("width", IntegerType, false) ::
+  StructField("nChannels", IntegerType, false) ::
+  // OpenCV-compatible type: CV_8UC3 in most cases
+  StructField("mode", StringType, false) ::
+  // Bytes in OpenCV-compatible order: row-wise BGR in most cases
+  StructField("data", BinaryType, false) :: Nil)
+
+  // Dataframe with a single column of images named "image" (nullable)
+  private val imageDFSchema = StructType(StructField("image", 
columnSchema, true) :: Nil)
+
+  @Since("2.3.0")
+  def getOrigin(row: Row): String = row.getString(0)
+
+  @Since("2.3.0")
+  def getHeight(row: Row): Int = row.getInt(1)
+
+  @Since("2.3.0")
+  def getWidth(row: Row): Int = row.getInt(2)
+
+  @Since("2.3.0")
+  def getNChannels(row: Row): Int = row.getInt(3)
+
+  @Since("2.3.0")
+  def getMode(row: Row): String = row.getString(4)
+
+  @Since("2.3.0")
+  def getData(row: Row): Array[Byte] = row.getAs[Array[Byte]](5)
+
+  /** Check if the dataframe column contains images (i.e. has ImageSchema)
+   *
+   * @param df   Dataframe
+   * @param column   Column name
+   * @return True if the given column matches the image schema
+   */
+  @Since("2.3.0")
+  def isImageColumn(df: DataFrame, column: String): Boolean =
+df.schema(column).dataType == columnSchema
+
+  /** Default values for the invalid image
+   *
+   * @param origin Origin of the invalid image
+   * @return   Row with the default values
+   */
+  private def invalidImageRow(origin: String): Row = Row(Row(origin, -1, 
-1, -1, undefinedImageType,
+Array.ofDim[Byte](0)))
+
+  /** Convert the compressed image (jpeg, png, etc.) into OpenCV
+   * representation and store it in dataframe Row
+   *
+   * @param origin   Arbitrary string that identifies the image
+   * @param bytesImage bytes (for example, jpeg)
+   * @return Dataframe Row or None (if the decompression fails)
+   */
+  private[spark] def decode(origin: String, bytes: Array[Byte]): 
Option[Row] = {
+
+val img = ImageIO.read(new ByteArrayInputStream(bytes))
+
+if (img == null) {
+  None
+} else {
+  val is_gray = img.getColorModel.getColorSpace.getType == 
ColorSpace.TYPE_GRAY
+  

[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r143689721
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala 
---
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.image
+
+import java.awt.Color
+import java.awt.color.ColorSpace
+import java.io.ByteArrayInputStream
+import javax.imageio.ImageIO
+
+import org.apache.spark.annotation.Since
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.types._
+
+@Since("2.3.0")
+object ImageSchema {
+
+  val undefinedImageType = "Undefined"
+
+  val ocvTypes = Map(
+undefinedImageType -> -1,
+"CV_8U" -> 0, "CV_8UC1" -> 0, "CV_8UC2" -> 8, "CV_8UC3" -> 16, 
"CV_8UC4" -> 24,
+"CV_8S" -> 1, "CV_8SC1" -> 1, "CV_8SC2" -> 9, "CV_8SC3" -> 17, 
"CV_8SC4" -> 25,
+"CV_16U" -> 2, "CV_16UC1" -> 2, "CV_16UC2" -> 10, "CV_16UC3" -> 18, 
"CV_16UC4" -> 26,
+"CV_16S" -> 3, "CV_16SC1" -> 3, "CV_16SC2" -> 11, "CV_16SC3" -> 19, 
"CV_16SC4" -> 27,
+"CV_32S" -> 4, "CV_32SC1" -> 4, "CV_32SC2" -> 12, "CV_32SC3" -> 20, 
"CV_32SC4" -> 28,
+"CV_32F" -> 5, "CV_32FC1" -> 5, "CV_32FC2" -> 13, "CV_32FC3" -> 21, 
"CV_32FC4" -> 29,
+"CV_64F" -> 6, "CV_64FC1" -> 6, "CV_64FC2" -> 14, "CV_64FC3" -> 22, 
"CV_64FC4" -> 30
+  )
+
+  /** Schema for the image column: Row(String, Int, Int, Int, Array[Byte]) 
*/
+  val columnSchema = StructType(
+StructField("origin", StringType, true) ::
+  StructField("height", IntegerType, false) ::
+  StructField("width", IntegerType, false) ::
+  StructField("nChannels", IntegerType, false) ::
+  // OpenCV-compatible type: CV_8UC3 in most cases
+  StructField("mode", StringType, false) ::
+  // Bytes in OpenCV-compatible order: row-wise BGR in most cases
+  StructField("data", BinaryType, false) :: Nil)
+
+  // Dataframe with a single column of images named "image" (nullable)
+  private val imageDFSchema = StructType(StructField("image", 
columnSchema, true) :: Nil)
+
+  @Since("2.3.0")
+  def getOrigin(row: Row): String = row.getString(0)
+
+  @Since("2.3.0")
+  def getHeight(row: Row): Int = row.getInt(1)
+
+  @Since("2.3.0")
+  def getWidth(row: Row): Int = row.getInt(2)
+
+  @Since("2.3.0")
+  def getNChannels(row: Row): Int = row.getInt(3)
+
+  @Since("2.3.0")
+  def getMode(row: Row): String = row.getString(4)
+
+  @Since("2.3.0")
+  def getData(row: Row): Array[Byte] = row.getAs[Array[Byte]](5)
+
+  /** Check if the dataframe column contains images (i.e. has ImageSchema)
+   *
+   * @param df   Dataframe
+   * @param column   Column name
+   * @return True if the given column matches the image schema
+   */
+  @Since("2.3.0")
+  def isImageColumn(df: DataFrame, column: String): Boolean =
+df.schema(column).dataType == columnSchema
+
+  /** Default values for the invalid image
+   *
+   * @param origin Origin of the invalid image
+   * @return   Row with the default values
+   */
+  private def invalidImageRow(origin: String): Row = Row(Row(origin, -1, 
-1, -1, undefinedImageType,
+Array.ofDim[Byte](0)))
+
+  /** Convert the compressed image (jpeg, png, etc.) into OpenCV
+   * representation and store it in dataframe Row
+   *
+   * @param origin   Arbitrary string that identifies the image
+   * @param bytesImage bytes (for example, jpeg)
+   * @return Dataframe Row or None (if the decompression fails)
+   */
+  private[spark] def decode(origin: String, bytes: Array[Byte]): 
Option[Row] = {
+
+val img = ImageIO.read(new ByteArrayInputStream(bytes))
+
+if (img == null) {
+  None
+} else {
+  val is_gray = img.getColorModel.getColorSpace.getType == 
ColorSpace.TYPE_GRAY
+  

[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r143689246
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala 
---
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.image
+
+import java.awt.Color
+import java.awt.color.ColorSpace
+import java.io.ByteArrayInputStream
+import javax.imageio.ImageIO
+
+import org.apache.spark.annotation.Since
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.types._
+
+@Since("2.3.0")
+object ImageSchema {
+
+  val undefinedImageType = "Undefined"
+
+  val ocvTypes = Map(
+undefinedImageType -> -1,
+"CV_8U" -> 0, "CV_8UC1" -> 0, "CV_8UC2" -> 8, "CV_8UC3" -> 16, 
"CV_8UC4" -> 24,
+"CV_8S" -> 1, "CV_8SC1" -> 1, "CV_8SC2" -> 9, "CV_8SC3" -> 17, 
"CV_8SC4" -> 25,
+"CV_16U" -> 2, "CV_16UC1" -> 2, "CV_16UC2" -> 10, "CV_16UC3" -> 18, 
"CV_16UC4" -> 26,
+"CV_16S" -> 3, "CV_16SC1" -> 3, "CV_16SC2" -> 11, "CV_16SC3" -> 19, 
"CV_16SC4" -> 27,
+"CV_32S" -> 4, "CV_32SC1" -> 4, "CV_32SC2" -> 12, "CV_32SC3" -> 20, 
"CV_32SC4" -> 28,
+"CV_32F" -> 5, "CV_32FC1" -> 5, "CV_32FC2" -> 13, "CV_32FC3" -> 21, 
"CV_32FC4" -> 29,
+"CV_64F" -> 6, "CV_64FC1" -> 6, "CV_64FC2" -> 14, "CV_64FC3" -> 22, 
"CV_64FC4" -> 30
+  )
+
+  /** Schema for the image column: Row(String, Int, Int, Int, Array[Byte]) 
*/
+  val columnSchema = StructType(
+StructField("origin", StringType, true) ::
+  StructField("height", IntegerType, false) ::
+  StructField("width", IntegerType, false) ::
+  StructField("nChannels", IntegerType, false) ::
+  // OpenCV-compatible type: CV_8UC3 in most cases
+  StructField("mode", StringType, false) ::
+  // Bytes in OpenCV-compatible order: row-wise BGR in most cases
+  StructField("data", BinaryType, false) :: Nil)
+
+  // Dataframe with a single column of images named "image" (nullable)
+  private val imageDFSchema = StructType(StructField("image", 
columnSchema, true) :: Nil)
+
+  @Since("2.3.0")
+  def getOrigin(row: Row): String = row.getString(0)
+
+  @Since("2.3.0")
+  def getHeight(row: Row): Int = row.getInt(1)
+
+  @Since("2.3.0")
+  def getWidth(row: Row): Int = row.getInt(2)
+
+  @Since("2.3.0")
+  def getNChannels(row: Row): Int = row.getInt(3)
+
+  @Since("2.3.0")
+  def getMode(row: Row): String = row.getString(4)
+
+  @Since("2.3.0")
+  def getData(row: Row): Array[Byte] = row.getAs[Array[Byte]](5)
+
+  /** Check if the dataframe column contains images (i.e. has ImageSchema)
+   *
+   * @param df   Dataframe
+   * @param column   Column name
+   * @return True if the given column matches the image schema
+   */
+  @Since("2.3.0")
+  def isImageColumn(df: DataFrame, column: String): Boolean =
+df.schema(column).dataType == columnSchema
+
+  /** Default values for the invalid image
+   *
+   * @param origin Origin of the invalid image
+   * @return   Row with the default values
+   */
+  private def invalidImageRow(origin: String): Row = Row(Row(origin, -1, 
-1, -1, undefinedImageType,
+Array.ofDim[Byte](0)))
+
+  /** Convert the compressed image (jpeg, png, etc.) into OpenCV
+   * representation and store it in dataframe Row
+   *
+   * @param origin   Arbitrary string that identifies the image
+   * @param bytesImage bytes (for example, jpeg)
+   * @return Dataframe Row or None (if the decompression fails)
+   */
+  private[spark] def decode(origin: String, bytes: Array[Byte]): 
Option[Row] = {
+
+val img = ImageIO.read(new ByteArrayInputStream(bytes))
+
+if (img == null) {
+  None
+} else {
+  val is_gray = img.getColorModel.getColorSpace.getType == 
ColorSpace.TYPE_GRAY
+  

[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r143688666
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala 
---
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.image
+
+import java.awt.Color
+import java.awt.color.ColorSpace
+import java.io.ByteArrayInputStream
+import javax.imageio.ImageIO
+
+import org.apache.spark.annotation.Since
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.types._
+
+@Since("2.3.0")
+object ImageSchema {
+
+  val undefinedImageType = "Undefined"
+
+  val ocvTypes = Map(
+undefinedImageType -> -1,
+"CV_8U" -> 0, "CV_8UC1" -> 0, "CV_8UC2" -> 8, "CV_8UC3" -> 16, 
"CV_8UC4" -> 24,
+"CV_8S" -> 1, "CV_8SC1" -> 1, "CV_8SC2" -> 9, "CV_8SC3" -> 17, 
"CV_8SC4" -> 25,
+"CV_16U" -> 2, "CV_16UC1" -> 2, "CV_16UC2" -> 10, "CV_16UC3" -> 18, 
"CV_16UC4" -> 26,
+"CV_16S" -> 3, "CV_16SC1" -> 3, "CV_16SC2" -> 11, "CV_16SC3" -> 19, 
"CV_16SC4" -> 27,
+"CV_32S" -> 4, "CV_32SC1" -> 4, "CV_32SC2" -> 12, "CV_32SC3" -> 20, 
"CV_32SC4" -> 28,
+"CV_32F" -> 5, "CV_32FC1" -> 5, "CV_32FC2" -> 13, "CV_32FC3" -> 21, 
"CV_32FC4" -> 29,
+"CV_64F" -> 6, "CV_64FC1" -> 6, "CV_64FC2" -> 14, "CV_64FC3" -> 22, 
"CV_64FC4" -> 30
+  )
+
+  /** Schema for the image column: Row(String, Int, Int, Int, Array[Byte]) 
*/
+  val columnSchema = StructType(
+StructField("origin", StringType, true) ::
+  StructField("height", IntegerType, false) ::
+  StructField("width", IntegerType, false) ::
+  StructField("nChannels", IntegerType, false) ::
+  // OpenCV-compatible type: CV_8UC3 in most cases
+  StructField("mode", StringType, false) ::
+  // Bytes in OpenCV-compatible order: row-wise BGR in most cases
+  StructField("data", BinaryType, false) :: Nil)
+
+  // Dataframe with a single column of images named "image" (nullable)
+  private val imageDFSchema = StructType(StructField("image", 
columnSchema, true) :: Nil)
+
+  @Since("2.3.0")
+  def getOrigin(row: Row): String = row.getString(0)
+
+  @Since("2.3.0")
+  def getHeight(row: Row): Int = row.getInt(1)
+
+  @Since("2.3.0")
+  def getWidth(row: Row): Int = row.getInt(2)
+
+  @Since("2.3.0")
+  def getNChannels(row: Row): Int = row.getInt(3)
+
+  @Since("2.3.0")
+  def getMode(row: Row): String = row.getString(4)
+
+  @Since("2.3.0")
+  def getData(row: Row): Array[Byte] = row.getAs[Array[Byte]](5)
+
+  /** Check if the dataframe column contains images (i.e. has ImageSchema)
+   *
+   * @param df   Dataframe
+   * @param column   Column name
+   * @return True if the given column matches the image schema
+   */
+  @Since("2.3.0")
+  def isImageColumn(df: DataFrame, column: String): Boolean =
+df.schema(column).dataType == columnSchema
+
+  /** Default values for the invalid image
+   *
+   * @param origin Origin of the invalid image
+   * @return   Row with the default values
+   */
+  private def invalidImageRow(origin: String): Row = Row(Row(origin, -1, 
-1, -1, undefinedImageType,
+Array.ofDim[Byte](0)))
+
+  /** Convert the compressed image (jpeg, png, etc.) into OpenCV
+   * representation and store it in dataframe Row
+   *
+   * @param origin   Arbitrary string that identifies the image
+   * @param bytesImage bytes (for example, jpeg)
+   * @return Dataframe Row or None (if the decompression fails)
+   */
+  private[spark] def decode(origin: String, bytes: Array[Byte]): 
Option[Row] = {
+
+val img = ImageIO.read(new ByteArrayInputStream(bytes))
+
+if (img == null) {
+  None
+} else {
+  val is_gray = img.getColorModel.getColorSpace.getType == 
ColorSpace.TYPE_GRAY
+  

[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r143688286
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala 
---
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.image
+
+import java.awt.Color
+import java.awt.color.ColorSpace
+import java.io.ByteArrayInputStream
+import javax.imageio.ImageIO
+
+import org.apache.spark.annotation.Since
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.types._
+
+@Since("2.3.0")
+object ImageSchema {
+
+  val undefinedImageType = "Undefined"
+
+  val ocvTypes = Map(
+undefinedImageType -> -1,
+"CV_8U" -> 0, "CV_8UC1" -> 0, "CV_8UC2" -> 8, "CV_8UC3" -> 16, 
"CV_8UC4" -> 24,
+"CV_8S" -> 1, "CV_8SC1" -> 1, "CV_8SC2" -> 9, "CV_8SC3" -> 17, 
"CV_8SC4" -> 25,
+"CV_16U" -> 2, "CV_16UC1" -> 2, "CV_16UC2" -> 10, "CV_16UC3" -> 18, 
"CV_16UC4" -> 26,
+"CV_16S" -> 3, "CV_16SC1" -> 3, "CV_16SC2" -> 11, "CV_16SC3" -> 19, 
"CV_16SC4" -> 27,
+"CV_32S" -> 4, "CV_32SC1" -> 4, "CV_32SC2" -> 12, "CV_32SC3" -> 20, 
"CV_32SC4" -> 28,
+"CV_32F" -> 5, "CV_32FC1" -> 5, "CV_32FC2" -> 13, "CV_32FC3" -> 21, 
"CV_32FC4" -> 29,
+"CV_64F" -> 6, "CV_64FC1" -> 6, "CV_64FC2" -> 14, "CV_64FC3" -> 22, 
"CV_64FC4" -> 30
+  )
+
+  /** Schema for the image column: Row(String, Int, Int, Int, Array[Byte]) 
*/
+  val columnSchema = StructType(
+StructField("origin", StringType, true) ::
+  StructField("height", IntegerType, false) ::
+  StructField("width", IntegerType, false) ::
+  StructField("nChannels", IntegerType, false) ::
+  // OpenCV-compatible type: CV_8UC3 in most cases
+  StructField("mode", StringType, false) ::
+  // Bytes in OpenCV-compatible order: row-wise BGR in most cases
+  StructField("data", BinaryType, false) :: Nil)
+
+  // Dataframe with a single column of images named "image" (nullable)
+  private val imageDFSchema = StructType(StructField("image", 
columnSchema, true) :: Nil)
+
+  @Since("2.3.0")
+  def getOrigin(row: Row): String = row.getString(0)
+
+  @Since("2.3.0")
+  def getHeight(row: Row): Int = row.getInt(1)
+
+  @Since("2.3.0")
+  def getWidth(row: Row): Int = row.getInt(2)
+
+  @Since("2.3.0")
+  def getNChannels(row: Row): Int = row.getInt(3)
+
+  @Since("2.3.0")
+  def getMode(row: Row): String = row.getString(4)
+
+  @Since("2.3.0")
+  def getData(row: Row): Array[Byte] = row.getAs[Array[Byte]](5)
+
+  /** Check if the dataframe column contains images (i.e. has ImageSchema)
+   *
+   * @param df   Dataframe
+   * @param column   Column name
+   * @return True if the given column matches the image schema
+   */
+  @Since("2.3.0")
+  def isImageColumn(df: DataFrame, column: String): Boolean =
+df.schema(column).dataType == columnSchema
+
+  /** Default values for the invalid image
+   *
+   * @param origin Origin of the invalid image
+   * @return   Row with the default values
+   */
+  private def invalidImageRow(origin: String): Row = Row(Row(origin, -1, 
-1, -1, undefinedImageType,
+Array.ofDim[Byte](0)))
+
+  /** Convert the compressed image (jpeg, png, etc.) into OpenCV
+   * representation and store it in dataframe Row
+   *
+   * @param origin   Arbitrary string that identifies the image
+   * @param bytesImage bytes (for example, jpeg)
+   * @return Dataframe Row or None (if the decompression fails)
+   */
+  private[spark] def decode(origin: String, bytes: Array[Byte]): 
Option[Row] = {
+
+val img = ImageIO.read(new ByteArrayInputStream(bytes))
+
+if (img == null) {
+  None
+} else {
+  val is_gray = img.getColorModel.getColorSpace.getType == 
ColorSpace.TYPE_GRAY
+  

[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r143687378
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala 
---
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.image
+
+import java.awt.Color
+import java.awt.color.ColorSpace
+import java.io.ByteArrayInputStream
+import javax.imageio.ImageIO
+
+import org.apache.spark.annotation.Since
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.types._
+
+@Since("2.3.0")
--- End diff --

Shall we have an appropriate annotation for interface stability? `Unstable` 
or `Evolving`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19464: Spark 22233

2017-10-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19464
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19464: Spark 22233

2017-10-10 Thread liutang123
GitHub user liutang123 opened a pull request:

https://github.com/apache/spark/pull/19464

Spark 22233

## What changes were proposed in this pull request?
add spark.hadoop.filterOutEmptySplit confituration to allow user to filter 
out empty split in HadoopRDD.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/liutang123/spark SPARK-22233

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19464.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19464


commit 2317bfdf18fc1a7b21cd43e0ec12f5e957fb1895
Author: liutang123 
Date:   2017-06-21T04:27:42Z

Merge pull request #1 from apache/master

20170521 pull request

commit e3f993959fabdb80b966a42bf40d1cb5c6b44d95
Author: liulijia 
Date:   2017-09-28T06:12:04Z

Merge branch 'master' of https://github.com/apache/spark

commit 8f57d43b6bf127fc67e3e391d851efae3a859206
Author: liulijia 
Date:   2017-10-10T02:16:18Z

Merge branch 'master' of https://github.com/apache/spark

commit 3610f78837f4a5623f6d47b9feab1e565ed6
Author: liulijia 
Date:   2017-10-10T10:19:29Z

allow user to filter empty split in HadoopRDD




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r143686577
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala 
---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.image
+
+import scala.language.existentials
+import scala.util.Random
+
+import org.apache.commons.io.FilenameUtils
+import org.apache.hadoop.conf.{Configuration, Configured}
+import org.apache.hadoop.fs.{Path, PathFilter}
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+
+import org.apache.spark.sql.SparkSession
+
+private object RecursiveFlag {
+
+  /** Sets a value of spark recursive flag
+   *
+   * @param value value to set
+   * @param spark existing spark session
+   * @return previous value of this flag
+   */
+  def setRecursiveFlag(value: Option[String], spark: SparkSession): 
Option[String] = {
+val flagName = FileInputFormat.INPUT_DIR_RECURSIVE
+val hadoopConf = spark.sparkContext.hadoopConfiguration
+val old = Option(hadoopConf.get(flagName))
+
+value match {
+  case Some(v) => hadoopConf.set(flagName, v)
+  case None => hadoopConf.unset(flagName)
+}
+
+old
+  }
+}
+
+/** Filter that allows loading a fraction of HDFS files. */
+private class SamplePathFilter extends Configured with PathFilter {
+  val random = {
+val rd = new Random()
+rd.setSeed(0)
+rd
+  }
+
+  // Ratio of files to be read from disk
+  var sampleRatio: Double = 1
+
+  override def setConf(conf: Configuration): Unit = {
+if (conf != null) {
+  sampleRatio = conf.getDouble(SamplePathFilter.ratioParam, 1)
+}
+  }
+
+  override def accept(path: Path): Boolean = {
+// Note: checking fileSystem.isDirectory is very slow here, so we use 
basic rules instead
+!SamplePathFilter.isFile(path) ||
+  random.nextDouble() < sampleRatio
+  }
+}
+
+private object SamplePathFilter {
+  val ratioParam = "sampleRatio"
+
+  def isFile(path: Path): Boolean = 
FilenameUtils.getExtension(path.toString) != ""
+
+  /** Set/unset  hdfs PathFilter
--- End diff --

Usual comment style is:

```scala
/**
 *  Set/unset  hdfs PathFilter
 *
 *  @param value
 *  ...
 */
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r143686181
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala 
---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.image
+
+import scala.language.existentials
+import scala.util.Random
+
+import org.apache.commons.io.FilenameUtils
+import org.apache.hadoop.conf.{Configuration, Configured}
+import org.apache.hadoop.fs.{Path, PathFilter}
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+
+import org.apache.spark.sql.SparkSession
+
+private object RecursiveFlag {
+
+  /** Sets a value of spark recursive flag
+   *
+   * @param value value to set
+   * @param spark existing spark session
+   * @return previous value of this flag
+   */
+  def setRecursiveFlag(value: Option[String], spark: SparkSession): 
Option[String] = {
+val flagName = FileInputFormat.INPUT_DIR_RECURSIVE
+val hadoopConf = spark.sparkContext.hadoopConfiguration
+val old = Option(hadoopConf.get(flagName))
+
+value match {
+  case Some(v) => hadoopConf.set(flagName, v)
+  case None => hadoopConf.unset(flagName)
+}
+
+old
+  }
+}
+
+/** Filter that allows loading a fraction of HDFS files. */
+private class SamplePathFilter extends Configured with PathFilter {
+  val random = {
+val rd = new Random()
+rd.setSeed(0)
+rd
+  }
+
+  // Ratio of files to be read from disk
+  var sampleRatio: Double = 1
+
+  override def setConf(conf: Configuration): Unit = {
+if (conf != null) {
+  sampleRatio = conf.getDouble(SamplePathFilter.ratioParam, 1)
+}
+  }
+
+  override def accept(path: Path): Boolean = {
+// Note: checking fileSystem.isDirectory is very slow here, so we use 
basic rules instead
+!SamplePathFilter.isFile(path) ||
+  random.nextDouble() < sampleRatio
+  }
+}
+
+private object SamplePathFilter {
+  val ratioParam = "sampleRatio"
+
+  def isFile(path: Path): Boolean = 
FilenameUtils.getExtension(path.toString) != ""
+
+  /** Set/unset  hdfs PathFilter
+   *
+   * @param value   Filter class that is passed to HDFS
+   * @param sampleRatio Fraction of the files that the filter picks
+   * @param spark   Existing Spark session
+   * @return
+   */
+  def setPathFilter(value: Option[Class[_]], sampleRatio: Option[Double] = 
None,
--- End diff --

Shall we have both set and unset methods instead of one method to do both 
set/unset?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r143686017
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala 
---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.image
+
+import scala.language.existentials
+import scala.util.Random
+
+import org.apache.commons.io.FilenameUtils
+import org.apache.hadoop.conf.{Configuration, Configured}
+import org.apache.hadoop.fs.{Path, PathFilter}
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+
+import org.apache.spark.sql.SparkSession
+
+private object RecursiveFlag {
+
+  /** Sets a value of spark recursive flag
+   *
+   * @param value value to set
+   * @param spark existing spark session
+   * @return previous value of this flag
+   */
+  def setRecursiveFlag(value: Option[String], spark: SparkSession): 
Option[String] = {
+val flagName = FileInputFormat.INPUT_DIR_RECURSIVE
+val hadoopConf = spark.sparkContext.hadoopConfiguration
+val old = Option(hadoopConf.get(flagName))
+
+value match {
+  case Some(v) => hadoopConf.set(flagName, v)
+  case None => hadoopConf.unset(flagName)
+}
+
+old
+  }
+}
+
+/** Filter that allows loading a fraction of HDFS files. */
+private class SamplePathFilter extends Configured with PathFilter {
+  val random = {
+val rd = new Random()
+rd.setSeed(0)
+rd
+  }
+
+  // Ratio of files to be read from disk
+  var sampleRatio: Double = 1
+
+  override def setConf(conf: Configuration): Unit = {
+if (conf != null) {
+  sampleRatio = conf.getDouble(SamplePathFilter.ratioParam, 1)
+}
+  }
+
+  override def accept(path: Path): Boolean = {
+// Note: checking fileSystem.isDirectory is very slow here, so we use 
basic rules instead
+!SamplePathFilter.isFile(path) ||
+  random.nextDouble() < sampleRatio
+  }
+}
+
+private object SamplePathFilter {
+  val ratioParam = "sampleRatio"
+
+  def isFile(path: Path): Boolean = 
FilenameUtils.getExtension(path.toString) != ""
+
+  /** Set/unset  hdfs PathFilter
+   *
+   * @param value   Filter class that is passed to HDFS
+   * @param sampleRatio Fraction of the files that the filter picks
+   * @param spark   Existing Spark session
+   * @return
+   */
+  def setPathFilter(value: Option[Class[_]], sampleRatio: Option[Double] = 
None,
+spark: SparkSession)
+  : Option[Class[_]] = {
+val flagName = FileInputFormat.PATHFILTER_CLASS
+val hadoopConf = spark.sparkContext.hadoopConfiguration
+val old = Option(hadoopConf.getClass(flagName, null))
+if (sampleRatio.isDefined) {
+  hadoopConf.setDouble(SamplePathFilter.ratioParam, sampleRatio.get)
+} else {
+  hadoopConf.unset(SamplePathFilter.ratioParam)
+  None
--- End diff --

Looks like `None` is redundant.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r143685432
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala 
---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.image
+
+import scala.language.existentials
+import scala.util.Random
+
+import org.apache.commons.io.FilenameUtils
+import org.apache.hadoop.conf.{Configuration, Configured}
+import org.apache.hadoop.fs.{Path, PathFilter}
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+
+import org.apache.spark.sql.SparkSession
+
+private object RecursiveFlag {
+
+  /** Sets a value of spark recursive flag
+   *
+   * @param value value to set
+   * @param spark existing spark session
+   * @return previous value of this flag
+   */
+  def setRecursiveFlag(value: Option[String], spark: SparkSession): 
Option[String] = {
+val flagName = FileInputFormat.INPUT_DIR_RECURSIVE
+val hadoopConf = spark.sparkContext.hadoopConfiguration
+val old = Option(hadoopConf.get(flagName))
+
+value match {
+  case Some(v) => hadoopConf.set(flagName, v)
+  case None => hadoopConf.unset(flagName)
+}
+
+old
+  }
+}
+
+/** Filter that allows loading a fraction of HDFS files. */
+private class SamplePathFilter extends Configured with PathFilter {
+  val random = {
+val rd = new Random()
+rd.setSeed(0)
+rd
+  }
+
+  // Ratio of files to be read from disk
+  var sampleRatio: Double = 1
+
+  override def setConf(conf: Configuration): Unit = {
+if (conf != null) {
+  sampleRatio = conf.getDouble(SamplePathFilter.ratioParam, 1)
+}
+  }
+
+  override def accept(path: Path): Boolean = {
+// Note: checking fileSystem.isDirectory is very slow here, so we use 
basic rules instead
+!SamplePathFilter.isFile(path) ||
+  random.nextDouble() < sampleRatio
+  }
+}
+
+private object SamplePathFilter {
+  val ratioParam = "sampleRatio"
+
+  def isFile(path: Path): Boolean = 
FilenameUtils.getExtension(path.toString) != ""
+
+  /** Set/unset  hdfs PathFilter
+   *
+   * @param value   Filter class that is passed to HDFS
+   * @param sampleRatio Fraction of the files that the filter picks
+   * @param spark   Existing Spark session
+   * @return
+   */
+  def setPathFilter(value: Option[Class[_]], sampleRatio: Option[Double] = 
None,
+spark: SparkSession)
+  : Option[Class[_]] = {
--- End diff --

Usually we don't break new line like this in Spark codes. We possibly do it 
like:

```scala
def setPathFilter(
value: Option[Class[_]],
sampleRatio: Option[Double] = None,
spark: SparkSession): Option[Class[_]] = {
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19439: [SPARK-21866][ML][PySpark] Adding spark image rea...

2017-10-10 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19439#discussion_r143684708
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/HadoopUtils.scala 
---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.image
+
+import scala.language.existentials
+import scala.util.Random
+
+import org.apache.commons.io.FilenameUtils
+import org.apache.hadoop.conf.{Configuration, Configured}
+import org.apache.hadoop.fs.{Path, PathFilter}
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+
+import org.apache.spark.sql.SparkSession
+
+private object RecursiveFlag {
+
+  /** Sets a value of spark recursive flag
+   *
+   * @param value value to set
+   * @param spark existing spark session
+   * @return previous value of this flag
+   */
+  def setRecursiveFlag(value: Option[String], spark: SparkSession): 
Option[String] = {
+val flagName = FileInputFormat.INPUT_DIR_RECURSIVE
+val hadoopConf = spark.sparkContext.hadoopConfiguration
+val old = Option(hadoopConf.get(flagName))
+
+value match {
+  case Some(v) => hadoopConf.set(flagName, v)
+  case None => hadoopConf.unset(flagName)
+}
+
+old
+  }
+}
+
+/** Filter that allows loading a fraction of HDFS files. */
+private class SamplePathFilter extends Configured with PathFilter {
+  val random = {
+val rd = new Random()
+rd.setSeed(0)
+rd
+  }
+
+  // Ratio of files to be read from disk
+  var sampleRatio: Double = 1
+
+  override def setConf(conf: Configuration): Unit = {
+if (conf != null) {
+  sampleRatio = conf.getDouble(SamplePathFilter.ratioParam, 1)
+}
+  }
+
+  override def accept(path: Path): Boolean = {
+// Note: checking fileSystem.isDirectory is very slow here, so we use 
basic rules instead
+!SamplePathFilter.isFile(path) ||
+  random.nextDouble() < sampleRatio
+  }
+}
+
+private object SamplePathFilter {
+  val ratioParam = "sampleRatio"
+
+  def isFile(path: Path): Boolean = 
FilenameUtils.getExtension(path.toString) != ""
+
+  /** Set/unset  hdfs PathFilter
+   *
+   * @param value   Filter class that is passed to HDFS
+   * @param sampleRatio Fraction of the files that the filter picks
+   * @param spark   Existing Spark session
+   * @return
--- End diff --

I guess we miss a doc for return?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19438: [SPARK-22208] [SQL] Improve percentile_approx by ...

2017-10-10 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/19438#discussion_r143684083
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/QuantileSummariesSuite.scala
 ---
@@ -58,7 +58,7 @@ class QuantileSummariesSuite extends SparkFunSuite {
 if (data.nonEmpty) {
   val approx = summary.query(quant).get
   // The rank of the approximation.
-  val rank = data.count(_ < approx) // has to be <, not <= to be exact
+  val rank = data.count(_ <= approx)
--- End diff --

@wzhfy that formula is asymmetric which feels wrong; it may happen to fix 
this but maybe would fail another future case. It would be a little more 
principled to round the average.

Yeah I know that [1,2,2,2,2,2,2,2,3] can't happen in this test, just 
illustrating a general point.

Hm, what's the case where the quantile is between 39 and 40? the input is 
0-99 in that case? I don't see a test for the 40% quantile so wondering if we 
really do have a problem or are misunderstanding the failure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19439: [SPARK-21866][ML][PySpark] Adding spark image reader

2017-10-10 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19439
  
I saw there are few images, just want to make sure, are those images are 
safe of license issue to be included in Spark?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19218: [SPARK-21786][SQL] The 'spark.sql.parquet.compression.co...

2017-10-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19218
  
**[Test build #82579 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/82579/testReport)**
 for PR 19218 at commit 
[`90cbcb3`](https://github.com/apache/spark/commit/90cbcb3c58e115995eaa58f61a9cc818d2f17cdf).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   >