[1/2] spark git commit: [SPARK-22789] Map-only continuous processing execution
Repository: spark Updated Branches: refs/heads/master d23dc5b8e -> 8941a4abc http://git-wip-us.apache.org/repos/asf/spark/blob/8941a4ab/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala index 94c5dd6..972248d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala @@ -25,6 +25,8 @@ import scala.util.control.NonFatal import org.apache.spark.internal.Logging import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update} import org.apache.spark.sql.execution.streaming.Sink import org.apache.spark.sql.sources.v2.{ContinuousWriteSupport, DataSourceV2, DataSourceV2Options, MicroBatchWriteSupport} @@ -177,3 +179,14 @@ class MemoryDataWriter(partition: Int, outputMode: OutputMode) override def abort(): Unit = {} } + + +/** + * Used to query the data that has been written into a [[MemorySink]]. + */ +case class MemoryPlanV2(sink: MemorySinkV2, override val output: Seq[Attribute]) extends LeafNode { + private val sizePerRow = output.map(_.dataType.defaultSize).sum + + override def computeStats(): Statistics = Statistics(sizePerRow * sink.allData.size) +} + http://git-wip-us.apache.org/repos/asf/spark/blob/8941a4ab/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 41aa02c..f17935e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -26,8 +26,10 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession} import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.DataSource -import org.apache.spark.sql.execution.streaming.StreamingRelation +import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2} +import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceV2Options, MicroBatchReadSupport} import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils /** * Interface used to load a streaming `Dataset` from external storage systems (e.g. file systems, @@ -153,13 +155,33 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo "read files of Hive data source directly.") } -val dataSource = - DataSource( -sparkSession, -userSpecifiedSchema = userSpecifiedSchema, -className = source, -options = extraOptions.toMap) -Dataset.ofRows(sparkSession, StreamingRelation(dataSource)) +val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf).newInstance() +val options = new DataSourceV2Options(extraOptions.asJava) +// We need to generate the V1 data source so we can pass it to the V2 relation as a shim. +// We can't be sure at this point whether we'll actually want to use V2, since we don't know the +// writer or whether the query is continuous. +val v1DataSource = DataSource( + sparkSession, + userSpecifiedSchema = userSpecifiedSchema, + className = source, + options = extraOptions.toMap) +ds match { + case s: ContinuousReadSupport => +val tempReader = s.createContinuousReader( + java.util.Optional.ofNullable(userSpecifiedSchema.orNull), + Utils.createTempDir(namePrefix = s"temporaryReader").getCanonicalPath, + options) +// Generate the V1 node to catch errors thrown within generation. +StreamingRelation(v1DataSource) +Dataset.ofRows( + sparkSession, + StreamingRelationV2( +s, source, extraOptions.toMap, +tempReader.readSchema().toAttributes, v1DataSource)(sparkSession)) + case _ => +// Code path for data source v1. +Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource)) +} } /** http://git-wip-us.apache.org/repos/asf/spark/blob/8941a4ab/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala -- diff --git
[2/2] spark git commit: [SPARK-22789] Map-only continuous processing execution
[SPARK-22789] Map-only continuous processing execution ## What changes were proposed in this pull request? Basic continuous execution, supporting map/flatMap/filter, with commits and advancement through RPC. ## How was this patch tested? new unit-ish tests (exercising execution end to end) Author: Jose TorresCloses #19984 from jose-torres/continuous-impl. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8941a4ab Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8941a4ab Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8941a4ab Branch: refs/heads/master Commit: 8941a4abcada873c26af924e129173dc33d66d71 Parents: d23dc5b Author: Jose Torres Authored: Fri Dec 22 23:05:03 2017 -0800 Committer: Shixiong Zhu Committed: Fri Dec 22 23:05:03 2017 -0800 -- project/MimaExcludes.scala | 5 + .../analysis/UnsupportedOperationChecker.scala | 25 +- .../org/apache/spark/sql/internal/SQLConf.scala | 21 ++ .../sql/sources/v2/reader/ContinuousReader.java | 6 + .../sql/sources/v2/reader/MicroBatchReader.java | 6 + .../org/apache/spark/sql/streaming/Trigger.java | 54 +++ .../spark/sql/execution/SparkStrategies.scala | 7 + .../datasources/v2/DataSourceV2ScanExec.scala | 20 +- .../datasources/v2/WriteToDataSourceV2.scala| 60 +++- .../streaming/BaseStreamingSource.java | 8 - .../execution/streaming/HDFSMetadataLog.scala | 14 + .../streaming/MicroBatchExecution.scala | 44 ++- .../sql/execution/streaming/OffsetSeq.scala | 2 +- .../execution/streaming/ProgressReporter.scala | 10 +- .../streaming/RateSourceProvider.scala | 9 +- .../execution/streaming/RateStreamOffset.scala | 5 +- .../spark/sql/execution/streaming/Sink.scala| 2 +- .../spark/sql/execution/streaming/Source.scala | 2 +- .../execution/streaming/StreamExecution.scala | 20 +- .../execution/streaming/StreamProgress.scala| 19 +- .../execution/streaming/StreamingRelation.scala | 47 +++ .../ContinuousDataSourceRDDIter.scala | 217 .../continuous/ContinuousExecution.scala| 349 +++ .../continuous/ContinuousRateStreamSource.scala | 11 +- .../continuous/ContinuousTrigger.scala | 70 .../streaming/continuous/EpochCoordinator.scala | 191 ++ .../streaming/sources/RateStreamSourceV2.scala | 19 +- .../execution/streaming/sources/memoryV2.scala | 13 + .../spark/sql/streaming/DataStreamReader.scala | 38 +- .../spark/sql/streaming/DataStreamWriter.scala | 19 +- .../sql/streaming/StreamingQueryManager.scala | 45 ++- .../scala/org/apache/spark/sql/QueryTest.scala | 56 +-- .../execution/streaming/RateSourceV2Suite.scala | 30 +- .../spark/sql/streaming/StreamSuite.scala | 17 +- .../apache/spark/sql/streaming/StreamTest.scala | 55 ++- .../streaming/continuous/ContinuousSuite.scala | 316 + 36 files changed, 1682 insertions(+), 150 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8941a4ab/project/MimaExcludes.scala -- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 9902fed..81584af 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -36,6 +36,11 @@ object MimaExcludes { // Exclude rules for 2.3.x lazy val v23excludes = v22excludes ++ Seq( +// SPARK-22789: Map-only continuous processing execution + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.startQuery$default$8"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.startQuery$default$6"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.startQuery$default$9"), + // SPARK-22372: Make cluster submission use SparkApplication. ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.getSecretKeyFromUserCredentials"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.isYarnMode"), http://git-wip-us.apache.org/repos/asf/spark/blob/8941a4ab/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 04502d0..b55043c 100644 ---
svn commit: r23873 - in /dev/spark/2.3.0-SNAPSHOT-2017_12_22_16_01-d23dc5b-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Sat Dec 23 00:14:44 2017 New Revision: 23873 Log: Apache Spark 2.3.0-SNAPSHOT-2017_12_22_16_01-d23dc5b docs [This commit notification would consist of 1418 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22346][ML] VectorSizeHint Transformer for using VectorAssembler in StructuredSteaming
Repository: spark Updated Branches: refs/heads/master 13190a4f6 -> d23dc5b8e [SPARK-22346][ML] VectorSizeHint Transformer for using VectorAssembler in StructuredSteaming ## What changes were proposed in this pull request? A new VectorSizeHint transformer was added. This transformer is meant to be used as a pipeline stage ahead of VectorAssembler, on vector columns, so that VectorAssembler can join vectors in a streaming context where the size of the input vectors is otherwise not known. ## How was this patch tested? Unit tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Bago AmirbekianCloses #19746 from MrBago/vector-size-hint. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d23dc5b8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d23dc5b8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d23dc5b8 Branch: refs/heads/master Commit: d23dc5b8ef6c6aee0a31a304eefeb6ddb1c26c0f Parents: 13190a4 Author: Bago Amirbekian Authored: Fri Dec 22 14:05:57 2017 -0800 Committer: Joseph K. Bradley Committed: Fri Dec 22 14:05:57 2017 -0800 -- .../spark/ml/feature/VectorSizeHint.scala | 195 +++ .../spark/ml/feature/VectorSizeHintSuite.scala | 189 ++ 2 files changed, 384 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d23dc5b8/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSizeHint.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSizeHint.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSizeHint.scala new file mode 100644 index 000..1fe3cfc --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorSizeHint.scala @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.apache.spark.SparkException +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.ml.Transformer +import org.apache.spark.ml.attribute.AttributeGroup +import org.apache.spark.ml.linalg.{Vector, VectorUDT} +import org.apache.spark.ml.param.{IntParam, Param, ParamMap, ParamValidators} +import org.apache.spark.ml.param.shared.{HasHandleInvalid, HasInputCol} +import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable} +import org.apache.spark.sql.{Column, DataFrame, Dataset} +import org.apache.spark.sql.functions.{col, udf} +import org.apache.spark.sql.types.StructType + +/** + * :: Experimental :: + * A feature transformer that adds size information to the metadata of a vector column. + * VectorAssembler needs size information for its input columns and cannot be used on streaming + * dataframes without this metadata. + * + */ +@Experimental +@Since("2.3.0") +class VectorSizeHint @Since("2.3.0") (@Since("2.3.0") override val uid: String) + extends Transformer with HasInputCol with HasHandleInvalid with DefaultParamsWritable { + + @Since("2.3.0") + def this() = this(Identifiable.randomUID("vectSizeHint")) + + /** + * The size of Vectors in `inputCol`. + * @group param + */ + @Since("2.3.0") + val size: IntParam = new IntParam( +this, +"size", +"Size of vectors in column.", +{s: Int => s >= 0}) + + /** group getParam */ + @Since("2.3.0") + def getSize: Int = getOrDefault(size) + + /** @group setParam */ + @Since("2.3.0") + def setSize(value: Int): this.type = set(size, value) + + /** @group setParam */ + @Since("2.3.0") + def setInputCol(value: String): this.type = set(inputCol, value) + + /** + * Param for how to handle invalid entries. Invalid vectors include nulls and vectors with the + * wrong size. The options are `skip` (filter out rows with invalid vectors), `error` (throw an + * error) and `optimistic` (do not check the vector size, and keep all rows). `error` by default. + * +
svn commit: r23865 - in /dev/spark/2.3.0-SNAPSHOT-2017_12_22_04_01-13190a4-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Dec 22 12:18:50 2017 New Revision: 23865 Log: Apache Spark 2.3.0-SNAPSHOT-2017_12_22_04_01-13190a4 docs [This commit notification would consist of 1415 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-22874][PYSPARK][SQL] Modify checking pandas version to use LooseVersion.
Repository: spark Updated Branches: refs/heads/master 8df1da396 -> 13190a4f6 [SPARK-22874][PYSPARK][SQL] Modify checking pandas version to use LooseVersion. ## What changes were proposed in this pull request? Currently we check pandas version by capturing if `ImportError` for the specific imports is raised or not but we can compare `LooseVersion` of the version strings as the same as we're checking pyarrow version. ## How was this patch tested? Existing tests. Author: Takuya UESHINCloses #20054 from ueshin/issues/SPARK-22874. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/13190a4f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/13190a4f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/13190a4f Branch: refs/heads/master Commit: 13190a4f60c081a68812df6df1d8262779cd6fcb Parents: 8df1da3 Author: Takuya UESHIN Authored: Fri Dec 22 20:09:51 2017 +0900 Committer: hyukjinkwon Committed: Fri Dec 22 20:09:51 2017 +0900 -- python/pyspark/sql/dataframe.py | 4 ++-- python/pyspark/sql/session.py | 15 +++ python/pyspark/sql/tests.py | 7 --- python/pyspark/sql/types.py | 33 + python/pyspark/sql/udf.py | 4 ++-- python/pyspark/sql/utils.py | 11 ++- 6 files changed, 38 insertions(+), 36 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/13190a4f/python/pyspark/sql/dataframe.py -- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 440684d..95eca76 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1906,9 +1906,9 @@ class DataFrame(object): if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true": try: from pyspark.sql.types import _check_dataframe_localize_timestamps -from pyspark.sql.utils import _require_minimum_pyarrow_version +from pyspark.sql.utils import require_minimum_pyarrow_version import pyarrow -_require_minimum_pyarrow_version() +require_minimum_pyarrow_version() tables = self._collectAsArrow() if tables: table = pyarrow.concat_tables(tables) http://git-wip-us.apache.org/repos/asf/spark/blob/13190a4f/python/pyspark/sql/session.py -- diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 86db16e..6e5eec4 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -493,15 +493,14 @@ class SparkSession(object): data types will be used to coerce the data in Pandas to Arrow conversion. """ from pyspark.serializers import ArrowSerializer, _create_batch -from pyspark.sql.types import from_arrow_schema, to_arrow_type, \ -_old_pandas_exception_message, TimestampType -from pyspark.sql.utils import _require_minimum_pyarrow_version -try: -from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype -except ImportError as e: -raise ImportError(_old_pandas_exception_message(e)) +from pyspark.sql.types import from_arrow_schema, to_arrow_type, TimestampType +from pyspark.sql.utils import require_minimum_pandas_version, \ +require_minimum_pyarrow_version + +require_minimum_pandas_version() +require_minimum_pyarrow_version() -_require_minimum_pyarrow_version() +from pandas.api.types import is_datetime64_dtype, is_datetime64tz_dtype # Determine arrow types to coerce data when creating batches if isinstance(schema, StructType): http://git-wip-us.apache.org/repos/asf/spark/blob/13190a4f/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 6fdfda1..b977160 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -53,7 +53,8 @@ _have_old_pandas = False try: import pandas try: -import pandas.api +from pyspark.sql.utils import require_minimum_pandas_version +require_minimum_pandas_version() _have_pandas = True except: _have_old_pandas = True @@ -2600,7 +2601,7 @@ class SQLTests(ReusedSQLTestCase): @unittest.skipIf(not _have_old_pandas, "Old Pandas not installed") def test_to_pandas_old(self): with QuietTest(self.sc): -with self.assertRaisesRegexp(ImportError,
svn commit: r23863 - in /dev/spark/2.3.0-SNAPSHOT-2017_12_22_00_01-8df1da3-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri Dec 22 08:15:11 2017 New Revision: 23863 Log: Apache Spark 2.3.0-SNAPSHOT-2017_12_22_00_01-8df1da3 docs [This commit notification would consist of 1415 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org