[1/2] spark git commit: [SPARK-22789] Map-only continuous processing execution

2017-12-22 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master d23dc5b8e -> 8941a4abc


http://git-wip-us.apache.org/repos/asf/spark/blob/8941a4ab/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
index 94c5dd6..972248d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
@@ -25,6 +25,8 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, 
Complete, Update}
 import org.apache.spark.sql.execution.streaming.Sink
 import org.apache.spark.sql.sources.v2.{ContinuousWriteSupport, DataSourceV2, 
DataSourceV2Options, MicroBatchWriteSupport}
@@ -177,3 +179,14 @@ class MemoryDataWriter(partition: Int, outputMode: 
OutputMode)
 
   override def abort(): Unit = {}
 }
+
+
+/**
+ * Used to query the data that has been written into a [[MemorySink]].
+ */
+case class MemoryPlanV2(sink: MemorySinkV2, override val output: 
Seq[Attribute]) extends LeafNode {
+  private val sizePerRow = output.map(_.dataType.defaultSize).sum
+
+  override def computeStats(): Statistics = Statistics(sizePerRow * 
sink.allData.size)
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/8941a4ab/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 41aa02c..f17935e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -26,8 +26,10 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, 
SparkSession}
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.DataSource
-import org.apache.spark.sql.execution.streaming.StreamingRelation
+import org.apache.spark.sql.execution.streaming.{StreamingRelation, 
StreamingRelationV2}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceV2Options, MicroBatchReadSupport}
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
 
 /**
  * Interface used to load a streaming `Dataset` from external storage systems 
(e.g. file systems,
@@ -153,13 +155,33 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
 "read files of Hive data source directly.")
 }
 
-val dataSource =
-  DataSource(
-sparkSession,
-userSpecifiedSchema = userSpecifiedSchema,
-className = source,
-options = extraOptions.toMap)
-Dataset.ofRows(sparkSession, StreamingRelation(dataSource))
+val ds = DataSource.lookupDataSource(source, 
sparkSession.sqlContext.conf).newInstance()
+val options = new DataSourceV2Options(extraOptions.asJava)
+// We need to generate the V1 data source so we can pass it to the V2 
relation as a shim.
+// We can't be sure at this point whether we'll actually want to use V2, 
since we don't know the
+// writer or whether the query is continuous.
+val v1DataSource = DataSource(
+  sparkSession,
+  userSpecifiedSchema = userSpecifiedSchema,
+  className = source,
+  options = extraOptions.toMap)
+ds match {
+  case s: ContinuousReadSupport =>
+val tempReader = s.createContinuousReader(
+  java.util.Optional.ofNullable(userSpecifiedSchema.orNull),
+  Utils.createTempDir(namePrefix = 
s"temporaryReader").getCanonicalPath,
+  options)
+// Generate the V1 node to catch errors thrown within generation.
+StreamingRelation(v1DataSource)
+Dataset.ofRows(
+  sparkSession,
+  StreamingRelationV2(
+s, source, extraOptions.toMap,
+tempReader.readSchema().toAttributes, v1DataSource)(sparkSession))
+  case _ =>
+// Code path for data source v1.
+Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource))
+}
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/8941a4ab/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
--
diff --git 

[2/2] spark git commit: [SPARK-22789] Map-only continuous processing execution

2017-12-22 Thread zsxwing
[SPARK-22789] Map-only continuous processing execution

## What changes were proposed in this pull request?

Basic continuous execution, supporting map/flatMap/filter, with commits and 
advancement through RPC.

## How was this patch tested?

new unit-ish tests (exercising execution end to end)

Author: Jose Torres 

Closes #19984 from jose-torres/continuous-impl.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8941a4ab
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8941a4ab
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8941a4ab

Branch: refs/heads/master
Commit: 8941a4abcada873c26af924e129173dc33d66d71
Parents: d23dc5b
Author: Jose Torres 
Authored: Fri Dec 22 23:05:03 2017 -0800
Committer: Shixiong Zhu 
Committed: Fri Dec 22 23:05:03 2017 -0800

--
 project/MimaExcludes.scala  |   5 +
 .../analysis/UnsupportedOperationChecker.scala  |  25 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |  21 ++
 .../sql/sources/v2/reader/ContinuousReader.java |   6 +
 .../sql/sources/v2/reader/MicroBatchReader.java |   6 +
 .../org/apache/spark/sql/streaming/Trigger.java |  54 +++
 .../spark/sql/execution/SparkStrategies.scala   |   7 +
 .../datasources/v2/DataSourceV2ScanExec.scala   |  20 +-
 .../datasources/v2/WriteToDataSourceV2.scala|  60 +++-
 .../streaming/BaseStreamingSource.java  |   8 -
 .../execution/streaming/HDFSMetadataLog.scala   |  14 +
 .../streaming/MicroBatchExecution.scala |  44 ++-
 .../sql/execution/streaming/OffsetSeq.scala |   2 +-
 .../execution/streaming/ProgressReporter.scala  |  10 +-
 .../streaming/RateSourceProvider.scala  |   9 +-
 .../execution/streaming/RateStreamOffset.scala  |   5 +-
 .../spark/sql/execution/streaming/Sink.scala|   2 +-
 .../spark/sql/execution/streaming/Source.scala  |   2 +-
 .../execution/streaming/StreamExecution.scala   |  20 +-
 .../execution/streaming/StreamProgress.scala|  19 +-
 .../execution/streaming/StreamingRelation.scala |  47 +++
 .../ContinuousDataSourceRDDIter.scala   | 217 
 .../continuous/ContinuousExecution.scala| 349 +++
 .../continuous/ContinuousRateStreamSource.scala |  11 +-
 .../continuous/ContinuousTrigger.scala  |  70 
 .../streaming/continuous/EpochCoordinator.scala | 191 ++
 .../streaming/sources/RateStreamSourceV2.scala  |  19 +-
 .../execution/streaming/sources/memoryV2.scala  |  13 +
 .../spark/sql/streaming/DataStreamReader.scala  |  38 +-
 .../spark/sql/streaming/DataStreamWriter.scala  |  19 +-
 .../sql/streaming/StreamingQueryManager.scala   |  45 ++-
 .../scala/org/apache/spark/sql/QueryTest.scala  |  56 +--
 .../execution/streaming/RateSourceV2Suite.scala |  30 +-
 .../spark/sql/streaming/StreamSuite.scala   |  17 +-
 .../apache/spark/sql/streaming/StreamTest.scala |  55 ++-
 .../streaming/continuous/ContinuousSuite.scala  | 316 +
 36 files changed, 1682 insertions(+), 150 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8941a4ab/project/MimaExcludes.scala
--
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 9902fed..81584af 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -36,6 +36,11 @@ object MimaExcludes {
 
   // Exclude rules for 2.3.x
   lazy val v23excludes = v22excludes ++ Seq(
+// SPARK-22789: Map-only continuous processing execution
+
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.startQuery$default$8"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.startQuery$default$6"),
+
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.startQuery$default$9"),
+
 // SPARK-22372: Make cluster submission use SparkApplication.
 
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.getSecretKeyFromUserCredentials"),
 
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.isYarnMode"),

http://git-wip-us.apache.org/repos/asf/spark/blob/8941a4ab/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 04502d0..b55043c 100644
--- 

svn commit: r23873 - in /dev/spark/2.3.0-SNAPSHOT-2017_12_22_16_01-d23dc5b-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2017-12-22 Thread pwendell
Author: pwendell
Date: Sat Dec 23 00:14:44 2017
New Revision: 23873

Log:
Apache Spark 2.3.0-SNAPSHOT-2017_12_22_16_01-d23dc5b docs


[This commit notification would consist of 1418 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22346][ML] VectorSizeHint Transformer for using VectorAssembler in StructuredSteaming

2017-12-22 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 13190a4f6 -> d23dc5b8e


[SPARK-22346][ML] VectorSizeHint Transformer for using VectorAssembler in 
StructuredSteaming

## What changes were proposed in this pull request?

A new VectorSizeHint transformer was added. This transformer is meant to be 
used as a pipeline stage ahead of VectorAssembler, on vector columns, so that 
VectorAssembler can join vectors in a streaming context where the size of the 
input vectors is otherwise not known.

## How was this patch tested?

Unit tests.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Bago Amirbekian 

Closes #19746 from MrBago/vector-size-hint.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d23dc5b8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d23dc5b8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d23dc5b8

Branch: refs/heads/master
Commit: d23dc5b8ef6c6aee0a31a304eefeb6ddb1c26c0f
Parents: 13190a4
Author: Bago Amirbekian 
Authored: Fri Dec 22 14:05:57 2017 -0800
Committer: Joseph K. Bradley 
Committed: Fri Dec 22 14:05:57 2017 -0800

--
 .../spark/ml/feature/VectorSizeHint.scala   | 195 +++
 .../spark/ml/feature/VectorSizeHintSuite.scala  | 189 ++
 2 files changed, 384 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d23dc5b8/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSizeHint.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSizeHint.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSizeHint.scala
new file mode 100644
index 000..1fe3cfc
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSizeHint.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import org.apache.spark.SparkException
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.ml.Transformer
+import org.apache.spark.ml.attribute.AttributeGroup
+import org.apache.spark.ml.linalg.{Vector, VectorUDT}
+import org.apache.spark.ml.param.{IntParam, Param, ParamMap, ParamValidators}
+import org.apache.spark.ml.param.shared.{HasHandleInvalid, HasInputCol}
+import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, 
Identifiable}
+import org.apache.spark.sql.{Column, DataFrame, Dataset}
+import org.apache.spark.sql.functions.{col, udf}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * :: Experimental ::
+ * A feature transformer that adds size information to the metadata of a 
vector column.
+ * VectorAssembler needs size information for its input columns and cannot be 
used on streaming
+ * dataframes without this metadata.
+ *
+ */
+@Experimental
+@Since("2.3.0")
+class VectorSizeHint @Since("2.3.0") (@Since("2.3.0") override val uid: String)
+  extends Transformer with HasInputCol with HasHandleInvalid with 
DefaultParamsWritable {
+
+  @Since("2.3.0")
+  def this() = this(Identifiable.randomUID("vectSizeHint"))
+
+  /**
+   * The size of Vectors in `inputCol`.
+   * @group param
+   */
+  @Since("2.3.0")
+  val size: IntParam = new IntParam(
+this,
+"size",
+"Size of vectors in column.",
+{s: Int => s >= 0})
+
+  /** group getParam */
+  @Since("2.3.0")
+  def getSize: Int = getOrDefault(size)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setSize(value: Int): this.type = set(size, value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setInputCol(value: String): this.type = set(inputCol, value)
+
+  /**
+   * Param for how to handle invalid entries. Invalid vectors include nulls 
and vectors with the
+   * wrong size. The options are `skip` (filter out rows with invalid 
vectors), `error` (throw an
+   * error) and `optimistic` (do not check the vector size, and keep all 
rows). `error` by default.
+   *
+ 

svn commit: r23865 - in /dev/spark/2.3.0-SNAPSHOT-2017_12_22_04_01-13190a4-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2017-12-22 Thread pwendell
Author: pwendell
Date: Fri Dec 22 12:18:50 2017
New Revision: 23865

Log:
Apache Spark 2.3.0-SNAPSHOT-2017_12_22_04_01-13190a4 docs


[This commit notification would consist of 1415 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22874][PYSPARK][SQL] Modify checking pandas version to use LooseVersion.

2017-12-22 Thread gurwls223
Repository: spark
Updated Branches:
  refs/heads/master 8df1da396 -> 13190a4f6


[SPARK-22874][PYSPARK][SQL] Modify checking pandas version to use LooseVersion.

## What changes were proposed in this pull request?

Currently we check pandas version by capturing if `ImportError` for the 
specific imports is raised or not but we can compare `LooseVersion` of the 
version strings as the same as we're checking pyarrow version.

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN 

Closes #20054 from ueshin/issues/SPARK-22874.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/13190a4f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/13190a4f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/13190a4f

Branch: refs/heads/master
Commit: 13190a4f60c081a68812df6df1d8262779cd6fcb
Parents: 8df1da3
Author: Takuya UESHIN 
Authored: Fri Dec 22 20:09:51 2017 +0900
Committer: hyukjinkwon 
Committed: Fri Dec 22 20:09:51 2017 +0900

--
 python/pyspark/sql/dataframe.py |  4 ++--
 python/pyspark/sql/session.py   | 15 +++
 python/pyspark/sql/tests.py |  7 ---
 python/pyspark/sql/types.py | 33 +
 python/pyspark/sql/udf.py   |  4 ++--
 python/pyspark/sql/utils.py | 11 ++-
 6 files changed, 38 insertions(+), 36 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/13190a4f/python/pyspark/sql/dataframe.py
--
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 440684d..95eca76 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -1906,9 +1906,9 @@ class DataFrame(object):
 if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", 
"false").lower() == "true":
 try:
 from pyspark.sql.types import 
_check_dataframe_localize_timestamps
-from pyspark.sql.utils import _require_minimum_pyarrow_version
+from pyspark.sql.utils import require_minimum_pyarrow_version
 import pyarrow
-_require_minimum_pyarrow_version()
+require_minimum_pyarrow_version()
 tables = self._collectAsArrow()
 if tables:
 table = pyarrow.concat_tables(tables)

http://git-wip-us.apache.org/repos/asf/spark/blob/13190a4f/python/pyspark/sql/session.py
--
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 86db16e..6e5eec4 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -493,15 +493,14 @@ class SparkSession(object):
 data types will be used to coerce the data in Pandas to Arrow 
conversion.
 """
 from pyspark.serializers import ArrowSerializer, _create_batch
-from pyspark.sql.types import from_arrow_schema, to_arrow_type, \
-_old_pandas_exception_message, TimestampType
-from pyspark.sql.utils import _require_minimum_pyarrow_version
-try:
-from pandas.api.types import is_datetime64_dtype, 
is_datetime64tz_dtype
-except ImportError as e:
-raise ImportError(_old_pandas_exception_message(e))
+from pyspark.sql.types import from_arrow_schema, to_arrow_type, 
TimestampType
+from pyspark.sql.utils import require_minimum_pandas_version, \
+require_minimum_pyarrow_version
+
+require_minimum_pandas_version()
+require_minimum_pyarrow_version()
 
-_require_minimum_pyarrow_version()
+from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype
 
 # Determine arrow types to coerce data when creating batches
 if isinstance(schema, StructType):

http://git-wip-us.apache.org/repos/asf/spark/blob/13190a4f/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 6fdfda1..b977160 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -53,7 +53,8 @@ _have_old_pandas = False
 try:
 import pandas
 try:
-import pandas.api
+from pyspark.sql.utils import require_minimum_pandas_version
+require_minimum_pandas_version()
 _have_pandas = True
 except:
 _have_old_pandas = True
@@ -2600,7 +2601,7 @@ class SQLTests(ReusedSQLTestCase):
 @unittest.skipIf(not _have_old_pandas, "Old Pandas not installed")
 def test_to_pandas_old(self):
 with QuietTest(self.sc):
-with self.assertRaisesRegexp(ImportError, 

svn commit: r23863 - in /dev/spark/2.3.0-SNAPSHOT-2017_12_22_00_01-8df1da3-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2017-12-22 Thread pwendell
Author: pwendell
Date: Fri Dec 22 08:15:11 2017
New Revision: 23863

Log:
Apache Spark 2.3.0-SNAPSHOT-2017_12_22_00_01-8df1da3 docs


[This commit notification would consist of 1415 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org