spark git commit: [SPARK-21070][PYSPARK] Attempt to update cloudpickle again

2017-08-21 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master c108a5d30 -> 751f51336


[SPARK-21070][PYSPARK] Attempt to update cloudpickle again

## What changes were proposed in this pull request?

Based on https://github.com/apache/spark/pull/18282 by rgbkrk this PR attempts 
to update to the current released cloudpickle and minimize the difference 
between Spark cloudpickle and "stock" cloud pickle with the goal of eventually 
using the stock cloud pickle.

Some notable changes:
* Import submodules accessed by pickled functions (cloudpipe/cloudpickle#80)
* Support recursive functions inside closures (cloudpipe/cloudpickle#89, 
cloudpipe/cloudpickle#90)
* Fix ResourceWarnings and DeprecationWarnings (cloudpipe/cloudpickle#88)
* Assume modules with __file__ attribute are not dynamic 
(cloudpipe/cloudpickle#85)
* Make cloudpickle Python 3.6 compatible (cloudpipe/cloudpickle#72)
* Allow pickling of builtin methods (cloudpipe/cloudpickle#57)
* Add ability to pickle dynamically created modules (cloudpipe/cloudpickle#52)
* Support method descriptor (cloudpipe/cloudpickle#46)
* No more pickling of closed files, was broken on Python 3 
(cloudpipe/cloudpickle#32)
* ** Remove non-standard __transient__check (cloudpipe/cloudpickle#110)** -- 
while we don't use this internally, and have no tests or documentation for its 
use, downstream code may use __transient__, although it has never been part of 
the API, if we merge this we should include a note about this in the release 
notes.
* Support for pickling loggers (yay!) (cloudpipe/cloudpickle#96)
* BUG: Fix crash when pickling dynamic class cycles. (cloudpipe/cloudpickle#102)

## How was this patch tested?

Existing PySpark unit tests + the unit tests from the cloudpickle project on 
their own.

Author: Holden Karau 
Author: Kyle Kelley 

Closes #18734 from holdenk/holden-rgbkrk-cloudpickle-upgrades.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/751f5133
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/751f5133
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/751f5133

Branch: refs/heads/master
Commit: 751f513367ae776c6d6815e1ce138078924872eb
Parents: c108a5d
Author: Kyle Kelley 
Authored: Tue Aug 22 11:17:53 2017 +0900
Committer: hyukjinkwon 
Committed: Tue Aug 22 11:17:53 2017 +0900

--
 python/pyspark/cloudpickle.py | 599 +++--
 1 file changed, 446 insertions(+), 153 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/751f5133/python/pyspark/cloudpickle.py
--
diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py
index 389bee7..40e91a2 100644
--- a/python/pyspark/cloudpickle.py
+++ b/python/pyspark/cloudpickle.py
@@ -9,10 +9,10 @@ The goals of it follow:
 It does not include an unpickler, as standard python unpickling suffices.
 
 This module was extracted from the `cloud` package, developed by `PiCloud, Inc.
-`_.
+`_.
 
 Copyright (c) 2012, Regents of the University of California.
-Copyright (c) 2009 `PiCloud, Inc. `_.
+Copyright (c) 2009 `PiCloud, Inc. 
`_.
 All rights reserved.
 
 Redistribution and use in source and binary forms, with or without
@@ -42,18 +42,19 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 """
 from __future__ import print_function
 
-import operator
-import opcode
-import os
+import dis
+from functools import partial
+import imp
 import io
+import itertools
+import logging
+import opcode
+import operator
 import pickle
 import struct
 import sys
-import types
-from functools import partial
-import itertools
-import dis
 import traceback
+import types
 import weakref
 
 from pyspark.util import _exception_message
@@ -71,6 +72,92 @@ else:
 from io import BytesIO as StringIO
 PY3 = True
 
+
+def _make_cell_set_template_code():
+"""Get the Python compiler to emit LOAD_FAST(arg); STORE_DEREF
+
+Notes
+-
+In Python 3, we could use an easier function:
+
+.. code-block:: python
+
+   def f():
+   cell = None
+
+   def _stub(value):
+   nonlocal cell
+   cell = value
+
+   return _stub
+
+_cell_set_template_code = f()
+
+This function is _only_ a LOAD_FAST(arg); STORE_DEREF, but that is
+invalid syntax on Python 2. If we use this function we also don't need
+to do the weird freevars/cellvars swap below
+"""
+def inner(value):
+lambda: cell  # make ``cell`` a closure so that we get a STORE_DEREF
+cell 

spark git commit: [SPARK-19762][ML][FOLLOWUP] Add necessary comments to L2Regularization.

2017-08-21 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/master 84b5b16ea -> c108a5d30


[SPARK-19762][ML][FOLLOWUP] Add necessary comments to L2Regularization.

## What changes were proposed in this pull request?
MLlib ```LinearRegression/LogisticRegression/LinearSVC``` always standardize 
the data during training to improve the rate of convergence regardless of 
_standardization_ is true or false. If _standardization_ is false, we perform 
reverse standardization by penalizing each component differently to get 
effectively the same objective function when the training dataset is not 
standardized. We should keep these comments in the code to let developers 
understand how we handle it correctly.

## How was this patch tested?
Existing tests, only adding some comments in code.

Author: Yanbo Liang 

Closes #18992 from yanboliang/SPARK-19762.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c108a5d3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c108a5d3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c108a5d3

Branch: refs/heads/master
Commit: c108a5d30e821fef23709681fca7da22bc507129
Parents: 84b5b16
Author: Yanbo Liang 
Authored: Tue Aug 22 08:43:18 2017 +0800
Committer: Yanbo Liang 
Committed: Tue Aug 22 08:43:18 2017 +0800

--
 .../ml/optim/loss/DifferentiableRegularization.scala  | 14 --
 1 file changed, 12 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c108a5d3/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala
index 7ac7c22..929374e 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/optim/loss/DifferentiableRegularization.scala
@@ -39,9 +39,13 @@ private[ml] trait DifferentiableRegularization[T] extends 
DiffFunction[T] {
  *
  * @param regParam The magnitude of the regularization.
  * @param shouldApply A function (Int => Boolean) indicating whether a given 
index should have
- *regularization applied to it.
+ *regularization applied to it. Usually we don't apply 
regularization to
+ *the intercept.
  * @param applyFeaturesStd Option for a function which maps coefficient index 
(column major) to the
- * feature standard deviation. If `None`, no 
standardization is applied.
+ * feature standard deviation. Since we always 
standardize the data during
+ * training, if `standardization` is false, we have to 
reverse
+ * standardization by penalizing each component 
differently by this param.
+ * If `standardization` is true, this should be `None`.
  */
 private[ml] class L2Regularization(
 override val regParam: Double,
@@ -57,6 +61,11 @@ private[ml] class L2Regularization(
   val coef = coefficients(j)
   applyFeaturesStd match {
 case Some(getStd) =>
+  // If `standardization` is false, we still standardize the data
+  // to improve the rate of convergence; as a result, we have to
+  // perform this reverse standardization by penalizing each 
component
+  // differently to get effectively the same objective function 
when
+  // the training dataset is not standardized.
   val std = getStd(j)
   if (std != 0.0) {
 val temp = coef / (std * std)
@@ -66,6 +75,7 @@ private[ml] class L2Regularization(
 0.0
   }
 case None =>
+  // If `standardization` is true, compute L2 regularization 
normally.
   sum += coef * coef
   gradient(j) = coef * regParam
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21617][SQL] Store correct table metadata when altering schema in Hive metastore.

2017-08-21 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 0f640e96c -> 526087f9e


[SPARK-21617][SQL] Store correct table metadata when altering schema in Hive 
metastore.

For Hive tables, the current "replace the schema" code is the correct
path, except that an exception in that path should result in an error, and
not in retrying in a different way.

For data source tables, Spark may generate a non-compatible Hive table;
but for that to work with Hive 2.1, the detection of data source tables needs
to be fixed in the Hive client, to also consider the raw tables used by code
such as `alterTableSchema`.

Tested with existing and added unit tests (plus internal tests with a 2.1 
metastore).

Author: Marcelo Vanzin 

Closes #18849 from vanzin/SPARK-21617.

(cherry picked from commit 84b5b16ea6c9816c70f7471a50eb5e4acb7fb1a1)
Signed-off-by: gatorsmile 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/526087f9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/526087f9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/526087f9

Branch: refs/heads/branch-2.2
Commit: 526087f9ebca90f77f78d699c5f8d0243dd8ab61
Parents: 0f640e9
Author: Marcelo Vanzin 
Authored: Mon Aug 21 15:09:02 2017 -0700
Committer: gatorsmile 
Committed: Mon Aug 21 15:09:12 2017 -0700

--
 .../spark/sql/execution/command/DDLSuite.scala  |  15 +--
 .../spark/sql/hive/HiveExternalCatalog.scala|  55 +---
 .../spark/sql/hive/client/HiveClientImpl.scala  |   3 +-
 .../sql/hive/execution/Hive_2_1_DDLSuite.scala  | 126 +++
 4 files changed, 171 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/526087f9/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index e4dd077..56d2937 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -2281,18 +2281,9 @@ abstract class DDLSuite extends QueryTest with 
SQLTestUtils {
 }.getMessage
 assert(e.contains("Found duplicate column(s)"))
   } else {
-if (isUsingHiveMetastore) {
-  // hive catalog will still complains that c1 is duplicate column 
name because hive
-  // identifiers are case insensitive.
-  val e = intercept[AnalysisException] {
-sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
-  }.getMessage
-  assert(e.contains("HiveException"))
-} else {
-  sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
-  assert(spark.table("t1").schema
-.equals(new StructType().add("c1", IntegerType).add("C1", 
StringType)))
-}
+sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
+assert(spark.table("t1").schema ==
+  new StructType().add("c1", IntegerType).add("C1", StringType))
   }
 }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/526087f9/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 9fea0c6..2ea4e15 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -114,7 +114,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
* should interpret these special data source properties and restore the 
original table metadata
* before returning it.
*/
-  private def getRawTable(db: String, table: String): CatalogTable = 
withClient {
+  private[hive] def getRawTable(db: String, table: String): CatalogTable = 
withClient {
 client.getTable(db, table)
   }
 
@@ -386,6 +386,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
* can be used as table properties later.
*/
   private def tableMetaToTableProps(table: CatalogTable): mutable.Map[String, 
String] = {
+tableMetaToTableProps(table, table.schema)
+  }
+
+  private def tableMetaToTableProps(
+  table: CatalogTable,
+  schema: StructType): mutable.Map[String, String] = {
 val partitionColumns = 

spark git commit: [SPARK-21617][SQL] Store correct table metadata when altering schema in Hive metastore.

2017-08-21 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master ba843292e -> 84b5b16ea


[SPARK-21617][SQL] Store correct table metadata when altering schema in Hive 
metastore.

For Hive tables, the current "replace the schema" code is the correct
path, except that an exception in that path should result in an error, and
not in retrying in a different way.

For data source tables, Spark may generate a non-compatible Hive table;
but for that to work with Hive 2.1, the detection of data source tables needs
to be fixed in the Hive client, to also consider the raw tables used by code
such as `alterTableSchema`.

Tested with existing and added unit tests (plus internal tests with a 2.1 
metastore).

Author: Marcelo Vanzin 

Closes #18849 from vanzin/SPARK-21617.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84b5b16e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84b5b16e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84b5b16e

Branch: refs/heads/master
Commit: 84b5b16ea6c9816c70f7471a50eb5e4acb7fb1a1
Parents: ba84329
Author: Marcelo Vanzin 
Authored: Mon Aug 21 15:09:02 2017 -0700
Committer: gatorsmile 
Committed: Mon Aug 21 15:09:02 2017 -0700

--
 .../spark/sql/execution/command/DDLSuite.scala  |  15 +--
 .../spark/sql/hive/HiveExternalCatalog.scala|  55 +---
 .../spark/sql/hive/client/HiveClientImpl.scala  |   3 +-
 .../sql/hive/execution/Hive_2_1_DDLSuite.scala  | 126 +++
 4 files changed, 171 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/84b5b16e/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 9332f77..ad6fc20 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -2357,18 +2357,9 @@ abstract class DDLSuite extends QueryTest with 
SQLTestUtils {
 }.getMessage
 assert(e.contains("Found duplicate column(s)"))
   } else {
-if (isUsingHiveMetastore) {
-  // hive catalog will still complains that c1 is duplicate column 
name because hive
-  // identifiers are case insensitive.
-  val e = intercept[AnalysisException] {
-sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
-  }.getMessage
-  assert(e.contains("HiveException"))
-} else {
-  sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
-  assert(spark.table("t1").schema
-.equals(new StructType().add("c1", IntegerType).add("C1", 
StringType)))
-}
+sql("ALTER TABLE t1 ADD COLUMNS (C1 string)")
+assert(spark.table("t1").schema ==
+  new StructType().add("c1", IntegerType).add("C1", StringType))
   }
 }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/84b5b16e/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 547447b..bdbb8bc 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -114,7 +114,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
* should interpret these special data source properties and restore the 
original table metadata
* before returning it.
*/
-  private def getRawTable(db: String, table: String): CatalogTable = 
withClient {
+  private[hive] def getRawTable(db: String, table: String): CatalogTable = 
withClient {
 client.getTable(db, table)
   }
 
@@ -386,6 +386,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
* can be used as table properties later.
*/
   private def tableMetaToTableProps(table: CatalogTable): mutable.Map[String, 
String] = {
+tableMetaToTableProps(table, table.schema)
+  }
+
+  private def tableMetaToTableProps(
+  table: CatalogTable,
+  schema: StructType): mutable.Map[String, String] = {
 val partitionColumns = table.partitionColumnNames
 val bucketSpec = table.bucketSpec
 
@@ -397,7 +403,7 @@ private[spark] class HiveExternalCatalog(conf: 

spark git commit: [SPARK-21790][TESTS][FOLLOW-UP] Add filter pushdown verification back.

2017-08-21 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 988b84d7e -> ba843292e


[SPARK-21790][TESTS][FOLLOW-UP] Add filter pushdown verification back.

## What changes were proposed in this pull request?

The previous PR(https://github.com/apache/spark/pull/19000) removed filter 
pushdown verification, This PR add them back.

## How was this patch tested?
manual tests

Author: Yuming Wang 

Closes #19002 from wangyum/SPARK-21790-follow-up.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba843292
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba843292
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba843292

Branch: refs/heads/master
Commit: ba843292e37368e1f5e4ae5c99ba1f5f90ca6025
Parents: 988b84d
Author: Yuming Wang 
Authored: Mon Aug 21 10:16:56 2017 -0700
Committer: gatorsmile 
Committed: Mon Aug 21 10:16:56 2017 -0700

--
 .../apache/spark/sql/jdbc/OracleIntegrationSuite.scala | 13 +
 1 file changed, 13 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ba843292/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
--
diff --git 
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
 
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
index 80a129a..1b2c1b9 100644
--- 
a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
+++ 
b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/OracleIntegrationSuite.scala
@@ -22,6 +22,7 @@ import java.util.Properties
 import java.math.BigDecimal
 
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.{WholeStageCodegenExec, 
RowDataSourceScanExec}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
 import org.apache.spark.tags.DockerTest
@@ -255,6 +256,18 @@ class OracleIntegrationSuite extends 
DockerJDBCIntegrationSuite with SharedSQLCo
 val df = dfRead.filter(dfRead.col("date_type").lt(dt))
   .filter(dfRead.col("timestamp_type").lt(ts))
 
+val parentPlan = df.queryExecution.executedPlan
+assert(parentPlan.isInstanceOf[WholeStageCodegenExec])
+val node = parentPlan.asInstanceOf[WholeStageCodegenExec]
+val metadata = node.child.asInstanceOf[RowDataSourceScanExec].metadata
+// The "PushedFilters" part should exist in Dataframe's
+// physical plan and the existence of right literals in
+// "PushedFilters" is used to prove that the predicates
+// pushing down have been effective.
+assert(metadata.get("PushedFilters").isDefined)
+assert(metadata("PushedFilters").contains(dt.toString))
+assert(metadata("PushedFilters").contains(ts.toString))
+
 val row = df.collect()(0)
 assert(row.getDate(0).equals(dateVal))
 assert(row.getTimestamp(1).equals(timestampVal))


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21468][PYSPARK][ML] Python API for FeatureHasher

2017-08-21 Thread mlnick
Repository: spark
Updated Branches:
  refs/heads/master b3a07526f -> 988b84d7e


[SPARK-21468][PYSPARK][ML] Python API for FeatureHasher

Add Python API for `FeatureHasher` transformer.

## How was this patch tested?

New doc test.

Author: Nick Pentreath 

Closes #18970 from MLnick/SPARK-21468-pyspark-hasher.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/988b84d7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/988b84d7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/988b84d7

Branch: refs/heads/master
Commit: 988b84d7ed43bea2616527ff050dffcf20548ab2
Parents: b3a0752
Author: Nick Pentreath 
Authored: Mon Aug 21 14:35:38 2017 +0200
Committer: Nick Pentreath 
Committed: Mon Aug 21 14:35:38 2017 +0200

--
 .../apache/spark/ml/feature/FeatureHasher.scala | 16 ++--
 python/pyspark/ml/feature.py| 77 
 2 files changed, 85 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/988b84d7/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala
index d22bf16..4b91fa9 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/FeatureHasher.scala
@@ -64,17 +64,17 @@ import org.apache.spark.util.collection.OpenHashMap
  *   ).toDF("real", "bool", "stringNum", "string")
  *
  *   val hasher = new FeatureHasher()
- *.setInputCols("real", "bool", "stringNum", "num")
+ *.setInputCols("real", "bool", "stringNum", "string")
  *.setOutputCol("features")
  *
- *   hasher.transform(df).show()
+ *   hasher.transform(df).show(false)
  *
- *   ++-+-+--++
- *   |real| bool|stringNum|string|features|
- *   ++-+-+--++
- *   | 2.0| true|1|   foo|(262144,[51871,63...|
- *   | 3.0|false|2|   bar|(262144,[6031,806...|
- *   ++-+-+--++
+ *   
++-+-+--+--+
+ *   |real|bool |stringNum|string|features 
 |
+ *   
++-+-+--+--+
+ *   |2.0 |true |1|foo   
|(262144,[51871,63643,174475,253195],[1.0,1.0,2.0,1.0])|
+ *   |3.0 |false|2|bar   
|(262144,[6031,80619,140467,174475],[1.0,1.0,1.0,3.0]) |
+ *   
++-+-+--+--+
  * }}}
  */
 @Experimental

http://git-wip-us.apache.org/repos/asf/spark/blob/988b84d7/python/pyspark/ml/feature.py
--
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index 54b4026..050537b 100755
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -34,6 +34,7 @@ __all__ = ['Binarizer',
'CountVectorizer', 'CountVectorizerModel',
'DCT',
'ElementwiseProduct',
+   'FeatureHasher',
'HashingTF',
'IDF', 'IDFModel',
'Imputer', 'ImputerModel',
@@ -697,6 +698,82 @@ class ElementwiseProduct(JavaTransformer, HasInputCol, 
HasOutputCol, JavaMLReada
 
 
 @inherit_doc
+class FeatureHasher(JavaTransformer, HasInputCols, HasOutputCol, 
HasNumFeatures, JavaMLReadable,
+JavaMLWritable):
+"""
+.. note:: Experimental
+
+Feature hashing projects a set of categorical or numerical features into a 
feature vector of
+specified dimension (typically substantially smaller than that of the 
original feature
+space). This is done using the hashing trick 
(https://en.wikipedia.org/wiki/Feature_hashing)
+to map features to indices in the feature vector.
+
+The FeatureHasher transformer operates on multiple columns. Each column 
may contain either
+numeric or categorical features. Behavior and handling of column data 
types is as follows:
+
+* Numeric columns:
+For numeric features, the hash value of the column name is used to map 
the
+feature value to its index in the feature vector. Numeric features are 
never
+treated as categorical, even when they are integers. You must 
explicitly
+convert numeric columns containing categorical features to strings 
first.
+
+* String columns:
+For categorical features, the hash value of the string 
"column_name=value"
+is used to map to the vector 

spark git commit: [SPARK-21718][SQL] Heavy log of type: "Skipping partition based on stats ..."

2017-08-21 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 77d046ec4 -> b3a07526f


[SPARK-21718][SQL] Heavy log of type: "Skipping partition based on stats ..."

## What changes were proposed in this pull request?

Reduce 'Skipping partitions' message to debug

## How was this patch tested?

Existing tests

Author: Sean Owen 

Closes #19010 from srowen/SPARK-21718.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b3a07526
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b3a07526
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b3a07526

Branch: refs/heads/master
Commit: b3a07526fe774fd64fe3a2b9a2381eff9a3c49a3
Parents: 77d046e
Author: Sean Owen 
Authored: Mon Aug 21 14:20:40 2017 +0200
Committer: Herman van Hovell 
Committed: Mon Aug 21 14:20:40 2017 +0200

--
 .../sql/execution/columnar/InMemoryTableScanExec.scala  | 9 +
 1 file changed, 5 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b3a07526/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index 1d60137..c7ddec5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -166,12 +166,13 @@ case class InMemoryTableScanExec(
 if (inMemoryPartitionPruningEnabled) {
   cachedBatchIterator.filter { cachedBatch =>
 if (!partitionFilter.eval(cachedBatch.stats)) {
-  def statsString: String = schemaIndex.map {
-case (a, i) =>
+  logDebug {
+val statsString = schemaIndex.map { case (a, i) =>
   val value = cachedBatch.stats.get(i, a.dataType)
   s"${a.name}: $value"
-  }.mkString(", ")
-  logInfo(s"Skipping partition based on stats $statsString")
+}.mkString(", ")
+s"Skipping partition based on stats $statsString"
+  }
   false
 } else {
   true


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21782][CORE] Repartition creates skews when numPartitions is a power of 2

2017-08-21 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 28a6cca7d -> 77d046ec4


[SPARK-21782][CORE] Repartition creates skews when numPartitions is a power of 2

## Problem
When an RDD (particularly with a low item-per-partition ratio) is repartitioned 
to numPartitions = power of 2, the resulting partitions are very uneven-sized, 
due to using fixed seed to initialize PRNG, and using the PRNG only once. See 
details in https://issues.apache.org/jira/browse/SPARK-21782

## What changes were proposed in this pull request?
Instead of directly using `0, 1, 2,...` seeds to initialize `Random`, hash them 
with `scala.util.hashing.byteswap32()`.

## How was this patch tested?
`build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.rdd.RDDSuite test`

Author: Sergey Serebryakov 

Closes #18990 from megaserg/repartition-skew.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77d046ec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77d046ec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77d046ec

Branch: refs/heads/master
Commit: 77d046ec47a9bfa6323aa014869844c28e18e049
Parents: 28a6cca
Author: Sergey Serebryakov 
Authored: Mon Aug 21 08:21:25 2017 +0100
Committer: Sean Owen 
Committed: Mon Aug 21 08:21:25 2017 +0100

--
 core/src/main/scala/org/apache/spark/rdd/RDD.scala  | 3 ++-
 core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 6 --
 2 files changed, 6 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/77d046ec/core/src/main/scala/org/apache/spark/rdd/RDD.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 5435f59..8798dfc 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
 import scala.io.Codec
 import scala.language.implicitConversions
 import scala.reflect.{classTag, ClassTag}
+import scala.util.hashing
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
 import org.apache.hadoop.io.{BytesWritable, NullWritable, Text}
@@ -448,7 +449,7 @@ abstract class RDD[T: ClassTag](
 if (shuffle) {
   /** Distributes elements evenly across output partitions, starting from 
a random partition. */
   val distributePartition = (index: Int, items: Iterator[T]) => {
-var position = (new Random(index)).nextInt(numPartitions)
+var position = (new 
Random(hashing.byteswap32(index))).nextInt(numPartitions)
 items.map { t =>
   // Note that the hash code of the key will just be the key itself. 
The HashPartitioner
   // will mod it with the number of total partitions.

http://git-wip-us.apache.org/repos/asf/spark/blob/77d046ec/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 386c006..e994d72 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -347,16 +347,18 @@ class RDDSuite extends SparkFunSuite with 
SharedSparkContext {
   val partitions = repartitioned.glom().collect()
   // assert all elements are present
   assert(repartitioned.collect().sortWith(_ > _).toSeq === 
input.toSeq.sortWith(_ > _).toSeq)
-  // assert no bucket is overloaded
+  // assert no bucket is overloaded or empty
   for (partition <- partitions) {
 val avg = input.size / finalPartitions
 val maxPossible = avg + initialPartitions
-assert(partition.length <=  maxPossible)
+assert(partition.length <= maxPossible)
+assert(!partition.isEmpty)
   }
 }
 
 testSplitPartitions(Array.fill(100)(1), 10, 20)
 testSplitPartitions(Array.fill(1)(1) ++ Array.fill(1)(2), 20, 100)
+testSplitPartitions(Array.fill(1000)(1), 250, 128)
   }
 
   test("coalesced RDDs") {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org