spark git commit: [SPARK-18120][SPARK-19557][SQL] Call QueryExecutionListener callback methods for DataFrameWriter methods
Repository: spark Updated Branches: refs/heads/master 21fde57f1 -> 54d23599d [SPARK-18120][SPARK-19557][SQL] Call QueryExecutionListener callback methods for DataFrameWriter methods ## What changes were proposed in this pull request? We only notify `QueryExecutionListener` for several `Dataset` operations, e.g. collect, take, etc. We should also do the notification for `DataFrameWriter` operations. ## How was this patch tested? new regression test close https://github.com/apache/spark/pull/16664 Author: Wenchen FanCloses #16962 from cloud-fan/insert. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/54d23599 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/54d23599 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/54d23599 Branch: refs/heads/master Commit: 54d23599df7c28a7685416ced6ad8fcde047e534 Parents: 21fde57 Author: Wenchen Fan Authored: Thu Feb 16 21:09:14 2017 -0800 Committer: Wenchen Fan Committed: Thu Feb 16 21:09:14 2017 -0800 -- .../org/apache/spark/sql/DataFrameWriter.scala | 49 - .../datasources/SaveIntoDataSourceCommand.scala | 52 ++ .../spark/sql/util/DataFrameCallbackSuite.scala | 57 +++- 3 files changed, 142 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/54d23599/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index cdae8ea..3939251 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -25,9 +25,9 @@ import org.apache.spark.annotation.InterfaceStability import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType} -import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable +import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation, SaveIntoDataSourceCommand} import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.StructType @@ -211,13 +211,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } assertNotBucketed("save") -val dataSource = DataSource( - df.sparkSession, - className = source, - partitionColumns = partitioningColumns.getOrElse(Nil), - options = extraOptions.toMap) -dataSource.write(mode, df) +runCommand(df.sparkSession, "save") { + SaveIntoDataSourceCommand( +query = df.logicalPlan, +provider = source, +partitionColumns = partitioningColumns.getOrElse(Nil), +options = extraOptions.toMap, +mode = mode) +} } /** @@ -260,13 +262,14 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { ) } -df.sparkSession.sessionState.executePlan( +runCommand(df.sparkSession, "insertInto") { InsertIntoTable( table = UnresolvedRelation(tableIdent), partition = Map.empty[String, Option[String]], query = df.logicalPlan, overwrite = mode == SaveMode.Overwrite, -ifNotExists = false)).toRdd +ifNotExists = false) +} } private def getBucketSpec: Option[BucketSpec] = { @@ -389,10 +392,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { schema = new StructType, provider = Some(source), partitionColumnNames = partitioningColumns.getOrElse(Nil), - bucketSpec = getBucketSpec -) -df.sparkSession.sessionState.executePlan( - CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd + bucketSpec = getBucketSpec) + +runCommand(df.sparkSession, "saveAsTable")(CreateTable(tableDesc, mode, Some(df.logicalPlan))) } /** @@ -573,6 +575,25 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { format("csv").save(path) } + /** + * Wrap a DataFrameWriter action to track the QueryExecution and time cost, then report to the + * user-registered callback functions. + */ + private def runCommand(session: SparkSession, name: String)(command: LogicalPlan): Unit = { +val qe =
spark git commit: [SPARK-18352][SQL] Support parsing multiline json files
Repository: spark Updated Branches: refs/heads/master dcc2d540a -> 21fde57f1 [SPARK-18352][SQL] Support parsing multiline json files ## What changes were proposed in this pull request? If a new option `wholeFile` is set to `true` the JSON reader will parse each file (instead of a single line) as a value. This is done with Jackson streaming and it should be capable of parsing very large documents, assuming the row will fit in memory. Because the file is not buffered in memory the corrupt record handling is also slightly different when `wholeFile` is enabled: the corrupt column will contain the filename instead of the literal JSON if there is a parsing failure. It would be easy to extend this to add the parser location (line, column and byte offsets) to the output if desired. These changes have allowed types other than `String` to be parsed. Support for `UTF8String` and `Text` have been added (alongside `String` and `InputFormat`) and no longer require a conversion to `String` just for parsing. I've also included a few other changes that generate slightly better bytecode and (imo) make it more obvious when and where boxing is occurring in the parser. These are included as separate commits, let me know if they should be flattened into this PR or moved to a new one. ## How was this patch tested? New and existing unit tests. No performance or load tests have been run. Author: Nathan HowellCloses #16386 from NathanHowell/SPARK-18352. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/21fde57f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/21fde57f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/21fde57f Branch: refs/heads/master Commit: 21fde57f15db974b710e7b00e72c744da7c1ac3c Parents: dcc2d54 Author: Nathan Howell Authored: Thu Feb 16 20:51:19 2017 -0800 Committer: Wenchen Fan Committed: Thu Feb 16 20:51:19 2017 -0800 -- .../apache/spark/unsafe/types/UTF8String.java | 20 +- .../apache/spark/input/PortableDataStream.scala | 7 + python/pyspark/sql/readwriter.py| 13 +- python/pyspark/sql/streaming.py | 14 +- python/pyspark/sql/tests.py | 7 + python/test_support/sql/people_array.json | 13 + .../catalyst/expressions/jsonExpressions.scala | 10 +- .../sql/catalyst/json/CreateJacksonParser.scala | 46 +++ .../spark/sql/catalyst/json/JSONOptions.scala | 20 +- .../spark/sql/catalyst/json/JacksonParser.scala | 287 +++ .../org/apache/spark/sql/DataFrameReader.scala | 32 ++- .../execution/datasources/CodecStreams.scala| 17 +- .../datasources/json/JsonDataSource.scala | 216 ++ .../datasources/json/JsonFileFormat.scala | 96 +++ .../datasources/json/JsonInferSchema.scala | 13 +- .../spark/sql/streaming/DataStreamReader.scala | 8 +- .../execution/datasources/json/JsonSuite.scala | 152 +- 17 files changed, 740 insertions(+), 231 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/21fde57f/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java -- diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java index 3800d53..87b9e8e 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java @@ -147,7 +147,13 @@ public final class UTF8String implements Comparable, Externalizable, buffer.position(pos + numBytes); } - public void writeTo(OutputStream out) throws IOException { + /** + * Returns a {@link ByteBuffer} wrapping the base object if it is a byte array + * or a copy of the data if the base object is not a byte array. + * + * Unlike getBytes this will not create a copy the array if this is a slice. + */ + public @Nonnull ByteBuffer getByteBuffer() { if (base instanceof byte[] && offset >= BYTE_ARRAY_OFFSET) { final byte[] bytes = (byte[]) base; @@ -160,12 +166,20 @@ public final class UTF8String implements Comparable, Externalizable, throw new ArrayIndexOutOfBoundsException(); } - out.write(bytes, (int) arrayOffset, numBytes); + return ByteBuffer.wrap(bytes, (int) arrayOffset, numBytes); } else { - out.write(getBytes()); + return ByteBuffer.wrap(getBytes()); } } + public void writeTo(OutputStream out) throws IOException { +final ByteBuffer bb = this.getByteBuffer(); +assert(bb.hasArray()); + +// similar to Utils.writeByteBuffer but without the
spark git commit: [SPARK-19436][SQL] Add missing tests for approxQuantile
Repository: spark Updated Branches: refs/heads/master 3b4376876 -> 54a30c8a7 [SPARK-19436][SQL] Add missing tests for approxQuantile ## What changes were proposed in this pull request? 1, check the behavior with illegal `quantiles` and `relativeError` 2, add tests for `relativeError` > 1 3, update tests for `null` data 4, update some docs for javadoc8 ## How was this patch tested? local test in spark-shell Author: Zheng RuiFengAuthor: Ruifeng Zheng Closes #16776 from zhengruifeng/fix_approxQuantile. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/54a30c8a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/54a30c8a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/54a30c8a Branch: refs/heads/master Commit: 54a30c8a70c86294059e6eb6b30cb81978b47b54 Parents: 3b43768 Author: Zheng RuiFeng Authored: Thu Feb 16 09:42:13 2017 -0800 Committer: Xiao Li Committed: Thu Feb 16 09:42:13 2017 -0800 -- .../spark/sql/DataFrameStatFunctions.scala | 30 .../sql/execution/stat/StatFunctions.scala | 4 +- .../apache/spark/sql/DataFrameStatSuite.scala | 77 +--- 3 files changed, 88 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/54a30c8a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index 2b782fd..bdcdf0c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -58,12 +58,13 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { * @param probabilities a list of quantile probabilities * Each number must belong to [0, 1]. * For example 0 is the minimum, 0.5 is the median, 1 is the maximum. - * @param relativeError The relative target precision to achieve (greater or equal to 0). + * @param relativeError The relative target precision to achieve (greater than or equal to 0). * If set to zero, the exact quantiles are computed, which could be very expensive. * Note that values greater than 1 are accepted but give the same result as 1. * @return the approximate quantiles at the given probabilities * - * @note NaN values will be removed from the numerical column before calculation + * @note null and NaN values will be removed from the numerical column before calculation. If + * the dataframe is empty or all rows contain null or NaN, null is returned. * * @since 2.0.0 */ @@ -71,27 +72,25 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { col: String, probabilities: Array[Double], relativeError: Double): Array[Double] = { -StatFunctions.multipleApproxQuantiles(df.select(col).na.drop(), - Seq(col), probabilities, relativeError).head.toArray +val res = approxQuantile(Array(col), probabilities, relativeError) +Option(res).map(_.head).orNull } /** * Calculates the approximate quantiles of numerical columns of a DataFrame. - * @see [[DataFrameStatsFunctions.approxQuantile(col:Str* approxQuantile]] for - * detailed description. + * @see `approxQuantile(col:Str* approxQuantile)` for detailed description. * - * Note that rows containing any null or NaN values values will be removed before - * calculation. * @param cols the names of the numerical columns * @param probabilities a list of quantile probabilities * Each number must belong to [0, 1]. * For example 0 is the minimum, 0.5 is the median, 1 is the maximum. - * @param relativeError The relative target precision to achieve (>= 0). + * @param relativeError The relative target precision to achieve (greater than or equal to 0). * If set to zero, the exact quantiles are computed, which could be very expensive. * Note that values greater than 1 are accepted but give the same result as 1. * @return the approximate quantiles at the given probabilities of each column * - * @note Rows containing any NaN values will be removed before calculation + * @note Rows containing any null or NaN values will be removed before calculation. If + * the dataframe is empty or all rows contain null or NaN, null is returned. * * @since 2.2.0 */ @@ -99,8 +98,13 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { cols: Array[String], probabilities: Array[Double], relativeError:
[2/2] spark-website git commit: Remove references to Java 7 and Hadoop support before 2.5
Remove references to Java 7 and Hadoop support before 2.5 Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/ae58782b Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/ae58782b Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/ae58782b Branch: refs/heads/asf-site Commit: ae58782baafff924a904e01197400104d85471f1 Parents: fe49ab1 Author: Sean OwenAuthored: Tue Feb 14 21:33:56 2017 + Committer: Sean Owen Committed: Tue Feb 14 21:35:47 2017 + -- developer-tools.md | 7 ++-- js/downloads.js | 18 +++-- release-process.md | 6 +-- site/committers.html| 40 +++- site/community.html | 8 ++-- site/contributing.html | 20 +- site/developer-tools.html | 21 +- site/documentation.html | 5 ++- site/js/downloads.js| 18 +++-- site/news/index.html| 10 ++--- site/news/spark-0-9-1-released.html | 2 +- site/news/spark-0-9-2-released.html | 2 +- site/news/spark-1-1-0-released.html | 2 +- site/news/spark-1-2-2-released.html | 2 +- site/news/spark-and-shark-in-the-news.html | 2 +- .../spark-summit-east-2015-videos-posted.html | 2 +- site/release-process.html | 8 ++-- site/releases/spark-release-0-8-0.html | 4 +- site/releases/spark-release-0-9-1.html | 20 +- site/releases/spark-release-1-0-1.html | 8 ++-- site/releases/spark-release-1-0-2.html | 2 +- site/releases/spark-release-1-1-0.html | 6 +-- site/releases/spark-release-1-2-0.html | 2 +- site/releases/spark-release-1-3-0.html | 6 +-- site/releases/spark-release-1-3-1.html | 6 +-- site/releases/spark-release-1-4-0.html | 4 +- site/releases/spark-release-1-5-0.html | 30 +++ site/releases/spark-release-1-6-0.html | 20 +- site/releases/spark-release-2-0-0.html | 36 +- site/releases/spark-release-2-1-0.html | 24 ++-- site/sitemap.xml| 12 +++--- site/third-party-projects.html | 2 +- site/versioning-policy.html | 2 +- 33 files changed, 174 insertions(+), 183 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark-website/blob/ae58782b/developer-tools.md -- diff --git a/developer-tools.md b/developer-tools.md index 77d225f..e8853b8 100644 --- a/developer-tools.md +++ b/developer-tools.md @@ -115,8 +115,7 @@ When running tests for a pull request on Jenkins, you can add special phrases to your pull request to change testing behavior. This includes: - `[test-maven]` - signals to test the pull request using maven -- `[test-hadoop1.0]` - signals to test using Spark's Hadoop 1.0 profile (other options include -Hadoop 2.0, 2.2, and 2.3) +- `[test-hadoop2.7]` - signals to test using Spark's Hadoop 2.7 profile Organizing Imports @@ -143,8 +142,8 @@ automatically update the IntelliJ project. - As documented in http://spark.apache.org/docs/latest/building-spark.html;>Building Spark, some build configurations require specific profiles to be enabled. The same profiles that are enabled with `-P[profile name]` above may be enabled on the -Profiles screen in the Import wizard. For example, if developing for Hadoop 2.4 with YARN support, -enable profiles yarn and hadoop-2.4. These selections can be changed later by accessing the +Profiles screen in the Import wizard. For example, if developing for Hadoop 2.7 with YARN support, +enable profiles `yarn` and `hadoop-2.7`. These selections can be changed later by accessing the "Maven Projects" tool window from the View menu, and expanding the Profiles section. Other tips: http://git-wip-us.apache.org/repos/asf/spark-website/blob/ae58782b/js/downloads.js -- diff --git a/js/downloads.js b/js/downloads.js index 36a04c7..04a6b13 100644 --- a/js/downloads.js +++ b/js/downloads.js @@ -16,26 +16,18 @@ var hadoop2p3 = {pretty: "Pre-built for Hadoop 2.3", tag: "hadoop2.3"}; var hadoop2p4 = {pretty: "Pre-built for Hadoop 2.4", tag: "hadoop2.4"}; var hadoop2p6 = {pretty: "Pre-built for Hadoop 2.6", tag: "hadoop2.6"}; var hadoop2p7 = {pretty: "Pre-built for Hadoop 2.7 and later", tag: "hadoop2.7"}; -//var mapr3 = {pretty:
[1/2] spark-website git commit: Remove references to Java 7 and Hadoop support before 2.5
Repository: spark-website Updated Branches: refs/heads/asf-site fe49ab1ef -> ae58782ba http://git-wip-us.apache.org/repos/asf/spark-website/blob/ae58782b/site/releases/spark-release-2-1-0.html -- diff --git a/site/releases/spark-release-2-1-0.html b/site/releases/spark-release-2-1-0.html index 26e65a2..7c7df63 100644 --- a/site/releases/spark-release-2-1-0.html +++ b/site/releases/spark-release-2-1-0.html @@ -200,15 +200,15 @@ To download Apache Spark 2.1.0, visit the downloads page. You can consult JIRA for the https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315420version=12335644;>detailed changes. We have curated a list of high level changes here, grouped by major modules. - Core and Spark SQL - Structured Streaming - MLlib - SparkR - GraphX - Deprecations - Changes of behavior - Known Issues - Credits + Core and Spark SQL + Structured Streaming + MLlib + SparkR + GraphX + Deprecations + Changes of behavior + Known Issues + Credits Core and Spark SQL @@ -216,7 +216,7 @@ API updates - SPARK-17864: Data type APIs are stable APIs. + SPARK-17864: Data type APIs are stable APIs. SPARK-18351: from_json and to_json for parsing JSON for string columns SPARK-16700: When creating a DataFrame in PySpark, Python dictionaries can be used as values of a StructType. @@ -318,9 +318,9 @@ SPARK-18377: spark.sql.warehouse.dir is a static configuration now. Users need to set it before the start of the first SparkSession and its value is shared by sessions in the same application. SPARK-14393: Values generated by non-deterministic functions will not change after coalesce or union. SPARK-18076: Fix default Locale used in DateFormat, NumberFormat to Locale.US - SPARK-16216: CSV and JSON data sources write timestamp and date values in https://www.w3.org/TR/NOTE-datetime;>ISO 8601 formatted string. Two options, timestampFormat and dateFormat, are added to these two data sources to let users control the format of timestamp and date value in string representation, respectively. Please refer to the API doc of DataFrameReader and DataFrameWriter for more details about these two configurations. + SPARK-16216: CSV and JSON data sources write timestamp and date values in https://www.w3.org/TR/NOTE-datetime;>ISO 8601 formatted string. Two options, timestampFormat and dateFormat, are added to these two data sources to let users control the format of timestamp and date value in string representation, respectively. Please refer to the API doc of DataFrameReader and DataFrameWriter for more details about these two configurations. SPARK-17427: Function SIZE returns -1 when its input parameter is null. - SPARK-16498: LazyBinaryColumnarSerDe is fixed as the the SerDe for RCFile. + SPARK-16498: LazyBinaryColumnarSerDe is fixed as the the SerDe for RCFile. SPARK-16552: If a user does not specify the schema to a table and relies on schema inference, the inferred schema will be stored in the metastore. The schema will be not inferred again when this table is used. http://git-wip-us.apache.org/repos/asf/spark-website/blob/ae58782b/site/sitemap.xml -- diff --git a/site/sitemap.xml b/site/sitemap.xml index 1ed4c74..67a171b 100644 --- a/site/sitemap.xml +++ b/site/sitemap.xml @@ -632,27 +632,27 @@ weekly - http://spark.apache.org/sql/ + http://spark.apache.org/mllib/ weekly - http://spark.apache.org/screencasts/ + http://spark.apache.org/news/ weekly - http://spark.apache.org/streaming/ + http://spark.apache.org/screencasts/ weekly - http://spark.apache.org/ + http://spark.apache.org/sql/ weekly - http://spark.apache.org/mllib/ + http://spark.apache.org/streaming/ weekly - http://spark.apache.org/news/ + http://spark.apache.org/ weekly http://git-wip-us.apache.org/repos/asf/spark-website/blob/ae58782b/site/third-party-projects.html -- diff --git a/site/third-party-projects.html b/site/third-party-projects.html index 74db810..8cb21d1 100644 --- a/site/third-party-projects.html +++ b/site/third-party-projects.html @@ -212,7 +212,7 @@ for details) http://mesos.apache.org/;>Apache Mesos - Cluster management system that supports running Spark http://alluxio.org/;>Alluxio (née Tachyon) - Memory speed virtual distributed -storage system that supports running Spark +storage system that supports running Spark https://github.com/datastax/spark-cassandra-connector;>Spark Cassandra Connector - Easily load your Cassandra data into Spark and Spark SQL; from Datastax http://github.com/tuplejump/FiloDB;>FiloDB - a Spark integrated analytical/columnar
spark git commit: [MINOR][BUILD] Fix javadoc8 break
Repository: spark Updated Branches: refs/heads/master 0e2405490 -> 3b4376876 [MINOR][BUILD] Fix javadoc8 break ## What changes were proposed in this pull request? These error below seems caused by unidoc that does not understand double commented block. ``` [error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:69: error: class, interface, or enum expected [error] * MapGroupsWithStateFunctionString, Integer, Integer, String mappingFunction = [error] ^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:69: error: class, interface, or enum expected [error] * MapGroupsWithStateFunctionString, Integer, Integer, String mappingFunction = [error] ^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:70: error: class, interface, or enum expected [error] *new MapGroupsWithStateFunctionString, Integer, Integer, String() { [error] ^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:70: error: class, interface, or enum expected [error] *new MapGroupsWithStateFunctionString, Integer, Integer, String() { [error] ^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:72: error: illegal character: '#' [error] * Override [error] ^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:72: error: class, interface, or enum expected [error] * Override [error] ^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:73: error: class, interface, or enum expected [error] * public String call(String key, IteratorInteger value, KeyedStateInteger state) { [error]^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:73: error: class, interface, or enum expected [error] * public String call(String key, IteratorInteger value, KeyedStateInteger state) { [error]^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:73: error: class, interface, or enum expected [error] * public String call(String key, IteratorInteger value, KeyedStateInteger state) { [error]^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:73: error: class, interface, or enum expected [error] * public String call(String key, IteratorInteger value, KeyedStateInteger state) { [error] ^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:73: error: class, interface, or enum expected [error] * public String call(String key, IteratorInteger value, KeyedStateInteger state) { [error] ^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:76: error: class, interface, or enum expected [error] * boolean shouldRemove = ...; // Decide whether to remove the state [error] ^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:77: error: class, interface, or enum expected [error] * if (shouldRemove) { [error] ^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:79: error: class, interface, or enum expected [error] * } else { [error] ^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:81: error: class, interface, or enum expected [error] *state.update(newState); // Set the new state [error] ^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:82: error: class, interface, or enum expected [error] * } [error] ^ [error] .../forked/spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:85: error: class, interface, or enum expected [error] * state.update(initialState); [error] ^ [error] .../forked/spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:86: error: class, interface, or enum expected [error] *} [error] ^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:90: error: class, interface, or enum expected [error] * [error] ^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:92: error: class, interface, or enum expected [error] * tparam S User-defined type of the state to be stored for each key. Must be encodable into [error]^ [error] .../spark/sql/core/target/java/org/apache/spark/sql/KeyedState.java:93: error: class, interface, or enum expected [error] * Spark SQL types (see {link Encoder} for more
[6/8] spark git commit: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/test/java/test/org/apache/spark/JavaAPISuite.java -- diff --git a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java new file mode 100644 index 000..80aab10 --- /dev/null +++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java @@ -0,0 +1,1842 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark; + +import java.io.*; +import java.nio.channels.FileChannel; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.*; + +import org.apache.spark.Accumulator; +import org.apache.spark.AccumulatorParam; +import org.apache.spark.Partitioner; +import org.apache.spark.SparkConf; +import org.apache.spark.TaskContext; +import org.apache.spark.TaskContext$; +import scala.Tuple2; +import scala.Tuple3; +import scala.Tuple4; +import scala.collection.JavaConverters; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import com.google.common.base.Throwables; +import com.google.common.io.Files; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.junit.After; +import static org.junit.Assert.*; +import org.junit.Before; +import org.junit.Test; + +import org.apache.spark.api.java.JavaDoubleRDD; +import org.apache.spark.api.java.JavaFutureAction; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.Optional; +import org.apache.spark.api.java.function.*; +import org.apache.spark.input.PortableDataStream; +import org.apache.spark.partial.BoundedDouble; +import org.apache.spark.partial.PartialResult; +import org.apache.spark.rdd.RDD; +import org.apache.spark.serializer.KryoSerializer; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.util.LongAccumulator; +import org.apache.spark.util.StatCounter; + +// The test suite itself is Serializable so that anonymous Function implementations can be +// serialized, as an alternative to converting these anonymous classes to static inner classes; +// see http://stackoverflow.com/questions/758570/. +public class JavaAPISuite implements Serializable { + private transient JavaSparkContext sc; + private transient File tempDir; + + @Before + public void setUp() { +sc = new JavaSparkContext("local", "JavaAPISuite"); +tempDir = Files.createTempDir(); +tempDir.deleteOnExit(); + } + + @After + public void tearDown() { +sc.stop(); +sc = null; + } + + @SuppressWarnings("unchecked") + @Test + public void sparkContextUnion() { +// Union of non-specialized JavaRDDs +List strings = Arrays.asList("Hello", "World"); +JavaRDD s1 = sc.parallelize(strings); +JavaRDD s2 = sc.parallelize(strings); +// Varargs +JavaRDD sUnion = sc.union(s1, s2); +assertEquals(4, sUnion.count()); +// List +Listlist = new ArrayList<>(); +list.add(s2); +sUnion = sc.union(s1, list); +assertEquals(4, sUnion.count()); + +// Union of JavaDoubleRDDs +List doubles = Arrays.asList(1.0, 2.0); +JavaDoubleRDD d1 = sc.parallelizeDoubles(doubles); +JavaDoubleRDD d2 = sc.parallelizeDoubles(doubles); +JavaDoubleRDD dUnion = sc.union(d1, d2); +assertEquals(4, dUnion.count()); + +// Union of JavaPairRDDs +List > pairs = new
[2/8] spark git commit: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java -- diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java new file mode 100644 index 000..646cb97 --- /dev/null +++ b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java @@ -0,0 +1,887 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.streaming; + +import java.io.Serializable; +import java.util.*; + +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.Durations; +import org.apache.spark.streaming.JavaTestUtils; +import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.StateSpec; +import org.apache.spark.streaming.Time; +import scala.Tuple2; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.spark.HashPartitioner; +import org.apache.spark.api.java.Optional; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaMapWithStateDStream; + +/** + * Most of these tests replicate org.apache.spark.streaming.JavaAPISuite using java 8 + * lambda syntax. + */ +@SuppressWarnings("unchecked") +public class Java8APISuite extends LocalJavaStreamingContext implements Serializable { + + @Test + public void testMap() { +ListinputData = Arrays.asList( + Arrays.asList("hello", "world"), + Arrays.asList("goodnight", "moon")); + +List
expected = Arrays.asList( + Arrays.asList(5, 5), + Arrays.asList(9, 4)); + +JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); +JavaDStream letterCount = stream.map(String::length); +JavaTestUtils.attachTestOutputStream(letterCount); +List
result = JavaTestUtils.runStreams(ssc, 2, 2); + +assertOrderInvariantEquals(expected, result); + } + + @Test + public void testFilter() { +List
inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red sox")); + +List
expected = Arrays.asList( + Arrays.asList("giants"), + Arrays.asList("yankees")); + +JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); +JavaDStream filtered = stream.filter(s -> s.contains("a")); +JavaTestUtils.attachTestOutputStream(filtered); +List
result = JavaTestUtils.runStreams(ssc, 2, 2); + +assertOrderInvariantEquals(expected, result); + } + + @Test + public void testMapPartitions() { +List
inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red sox")); + +List
expected = Arrays.asList( + Arrays.asList("GIANTSDODGERS"), + Arrays.asList("YANKEESRED SOX")); + +JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); +JavaDStream mapped = stream.mapPartitions(in -> { + String out = ""; + while (in.hasNext()) { +out = out + in.next().toUpperCase(); + } + return Lists.newArrayList(out).iterator(); +}); +JavaTestUtils.attachTestOutputStream(mapped); +List
result = JavaTestUtils.runStreams(ssc, 2, 2); + +Assert.assertEquals(expected, result); + } + + @Test + public void testReduce() { +List
inputData = Arrays.asList( + Arrays.asList(1, 2, 3), + Arrays.asList(4, 5, 6), + Arrays.asList(7, 8, 9)); + +List
expected = Arrays.asList( + Arrays.asList(6), + Arrays.asList(15), + Arrays.asList(24)); + +JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); +JavaDStream reduced = stream.reduce((x, y) -> x + y); +
[7/8] spark git commit: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/core/src/test/java/org/apache/spark/JavaAPISuite.java -- diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java deleted file mode 100644 index 7bebe06..000 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ /dev/null @@ -1,1836 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark; - -import java.io.*; -import java.nio.channels.FileChannel; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.*; - -import scala.Tuple2; -import scala.Tuple3; -import scala.Tuple4; -import scala.collection.JavaConverters; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; -import com.google.common.base.Throwables; -import com.google.common.io.Files; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.compress.DefaultCodec; -import org.apache.hadoop.mapred.SequenceFileInputFormat; -import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.apache.hadoop.mapreduce.Job; -import org.junit.After; -import static org.junit.Assert.*; -import org.junit.Before; -import org.junit.Test; - -import org.apache.spark.api.java.JavaDoubleRDD; -import org.apache.spark.api.java.JavaFutureAction; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.Optional; -import org.apache.spark.api.java.function.*; -import org.apache.spark.input.PortableDataStream; -import org.apache.spark.partial.BoundedDouble; -import org.apache.spark.partial.PartialResult; -import org.apache.spark.rdd.RDD; -import org.apache.spark.serializer.KryoSerializer; -import org.apache.spark.storage.StorageLevel; -import org.apache.spark.util.LongAccumulator; -import org.apache.spark.util.StatCounter; - -// The test suite itself is Serializable so that anonymous Function implementations can be -// serialized, as an alternative to converting these anonymous classes to static inner classes; -// see http://stackoverflow.com/questions/758570/. -public class JavaAPISuite implements Serializable { - private transient JavaSparkContext sc; - private transient File tempDir; - - @Before - public void setUp() { -sc = new JavaSparkContext("local", "JavaAPISuite"); -tempDir = Files.createTempDir(); -tempDir.deleteOnExit(); - } - - @After - public void tearDown() { -sc.stop(); -sc = null; - } - - @SuppressWarnings("unchecked") - @Test - public void sparkContextUnion() { -// Union of non-specialized JavaRDDs -List strings = Arrays.asList("Hello", "World"); -JavaRDD s1 = sc.parallelize(strings); -JavaRDD s2 = sc.parallelize(strings); -// Varargs -JavaRDD sUnion = sc.union(s1, s2); -assertEquals(4, sUnion.count()); -// List -Listlist = new ArrayList<>(); -list.add(s2); -sUnion = sc.union(s1, list); -assertEquals(4, sUnion.count()); - -// Union of JavaDoubleRDDs -List doubles = Arrays.asList(1.0, 2.0); -JavaDoubleRDD d1 = sc.parallelizeDoubles(doubles); -JavaDoubleRDD d2 = sc.parallelizeDoubles(doubles); -JavaDoubleRDD dUnion = sc.union(d1, d2); -assertEquals(4, dUnion.count()); - -// Union of JavaPairRDDs -List > pairs = new ArrayList<>(); -pairs.add(new Tuple2<>(1, 2)); -pairs.add(new Tuple2<>(3, 4)); -JavaPairRDD p1 = sc.parallelizePairs(pairs); -JavaPairRDD p2 = sc.parallelizePairs(pairs); -JavaPairRDD
[4/8] spark git commit: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java -- diff --git a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java index c7959ae..ff80453 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java +++ b/launcher/src/main/java/org/apache/spark/launcher/OutputRedirector.java @@ -44,12 +44,7 @@ class OutputRedirector { OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) { this.active = true; this.reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); -this.thread = tf.newThread(new Runnable() { - @Override - public void run() { -redirect(); - } -}); +this.thread = tf.newThread(this::redirect); this.sink = Logger.getLogger(loggerName); thread.start(); } http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java -- diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java index 0aa7bd1..cefb4d1 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java @@ -91,9 +91,6 @@ public interface SparkAppHandle { * Tries to kill the underlying application. Implies {@link #disconnect()}. This will not send * a {@link #stop()} message to the application, so it's recommended that users first try to * stop the application cleanly and only resort to this method if that fails. - * - * Note that if the application is running as a child process, this method fail to kill the - * process when using Java 7. This may happen if, for example, the application is deadlocked. */ void kill(); http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java -- diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java index 82b593a..8178684 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java @@ -49,35 +49,44 @@ class SparkClassCommandBuilder extends AbstractCommandBuilder { // Master, Worker, HistoryServer, ExternalShuffleService, MesosClusterDispatcher use // SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY. -if (className.equals("org.apache.spark.deploy.master.Master")) { - javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); - javaOptsKeys.add("SPARK_MASTER_OPTS"); - memKey = "SPARK_DAEMON_MEMORY"; -} else if (className.equals("org.apache.spark.deploy.worker.Worker")) { - javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); - javaOptsKeys.add("SPARK_WORKER_OPTS"); - memKey = "SPARK_DAEMON_MEMORY"; -} else if (className.equals("org.apache.spark.deploy.history.HistoryServer")) { - javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); - javaOptsKeys.add("SPARK_HISTORY_OPTS"); - memKey = "SPARK_DAEMON_MEMORY"; -} else if (className.equals("org.apache.spark.executor.CoarseGrainedExecutorBackend")) { - javaOptsKeys.add("SPARK_JAVA_OPTS"); - javaOptsKeys.add("SPARK_EXECUTOR_OPTS"); - memKey = "SPARK_EXECUTOR_MEMORY"; -} else if (className.equals("org.apache.spark.executor.MesosExecutorBackend")) { - javaOptsKeys.add("SPARK_EXECUTOR_OPTS"); - memKey = "SPARK_EXECUTOR_MEMORY"; -} else if (className.equals("org.apache.spark.deploy.mesos.MesosClusterDispatcher")) { - javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); -} else if (className.equals("org.apache.spark.deploy.ExternalShuffleService") || - className.equals("org.apache.spark.deploy.mesos.MesosExternalShuffleService")) { - javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); - javaOptsKeys.add("SPARK_SHUFFLE_OPTS"); - memKey = "SPARK_DAEMON_MEMORY"; -} else { - javaOptsKeys.add("SPARK_JAVA_OPTS"); - memKey = "SPARK_DRIVER_MEMORY"; +switch (className) { + case "org.apache.spark.deploy.master.Master": +javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); +javaOptsKeys.add("SPARK_MASTER_OPTS"); +memKey = "SPARK_DAEMON_MEMORY"; +break; + case "org.apache.spark.deploy.worker.Worker": +javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); +javaOptsKeys.add("SPARK_WORKER_OPTS"); +memKey = "SPARK_DAEMON_MEMORY"; +
[5/8] spark git commit: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/docs/streaming-kafka-0-8-integration.md -- diff --git a/docs/streaming-kafka-0-8-integration.md b/docs/streaming-kafka-0-8-integration.md index 58b17aa..24a3e4c 100644 --- a/docs/streaming-kafka-0-8-integration.md +++ b/docs/streaming-kafka-0-8-integration.md @@ -155,33 +155,22 @@ Next, we discuss how to use this approach in your streaming application. // Hold a reference to the current offset ranges, so it can be used downstream - final AtomicReferenceoffsetRanges = new AtomicReference<>(); - - directKafkaStream.transformToPair( - new Function , JavaPairRDD >() { - @Override - public JavaPairRDD call(JavaPairRDD rdd) throws Exception { - OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); - offsetRanges.set(offsets); - return rdd; - } - } - ).map( + AtomicReference offsetRanges = new AtomicReference<>(); + + directKafkaStream.transformToPair(rdd -> { + OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); + offsetRanges.set(offsets); + return rdd; + }).map( ... - ).foreachRDD( - new Function , Void>() { - @Override - public Void call(JavaPairRDD rdd) throws IOException { - for (OffsetRange o : offsetRanges.get()) { - System.out.println( - o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset() - ); - } - ... - return null; - } - } - ); + ).foreachRDD(rdd -> { + for (OffsetRange o : offsetRanges.get()) { +System.out.println( + o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset() +); + } + ... + }); offsetRanges = [] http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/docs/streaming-programming-guide.md -- diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index a878971..abd4ac9 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -163,12 +163,7 @@ space into words. {% highlight java %} // Split each line into words -JavaDStream words = lines.flatMap( - new FlatMapFunction () { -@Override public Iterator call(String x) { - return Arrays.asList(x.split(" ")).iterator(); -} - }); +JavaDStream words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator()); {% endhighlight %} `flatMap` is a DStream operation that creates a new DStream by @@ -183,18 +178,8 @@ Next, we want to count these words. {% highlight java %} // Count each word in each batch -JavaPairDStream pairs = words.mapToPair( - new PairFunction () { -@Override public Tuple2 call(String s) { - return new Tuple2<>(s, 1); -} - }); -JavaPairDStream wordCounts = pairs.reduceByKey( - new Function2 () { -@Override public Integer call(Integer i1, Integer i2) { - return i1 + i2; -} - }); +JavaPairDStream pairs = words.mapToPair(s -> new Tuple2<>(s, 1)); +JavaPairDStream wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2); // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print(); @@ -836,11 +821,9 @@ the `(word, 1)` pairs) and the `runningCount` having the previous count. {% highlight java %} Function2 updateFunction = - new Function2
() { -@Override public Optional call(List values, Optional state) { - Integer newSum = ... // add the new values with the previous running count to get the new count - return Optional.of(newSum); -} + (values, state) -> { +Integer newSum = ... // add the new values with the previous running count to get the new count +return Optional.of(newSum); }; {% endhighlight %} @@ -915,15 +898,12 @@ val cleanedDStream = wordCounts.transform { rdd => {% highlight java %} import org.apache.spark.streaming.api.java.*; // RDD containing spam information -final JavaPairRDD
spamInfoRDD =
[3/8] spark git commit: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support
http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java -- diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java deleted file mode 100644 index 648a5ab..000 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ /dev/null @@ -1,2000 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming; - -import java.io.*; -import java.nio.charset.StandardCharsets; -import java.util.*; -import java.util.concurrent.atomic.AtomicBoolean; - -import scala.Tuple2; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; - -import org.junit.Assert; -import org.junit.Test; - -import com.google.common.io.Files; -import com.google.common.collect.Sets; - -import org.apache.spark.HashPartitioner; -import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.Optional; -import org.apache.spark.api.java.function.*; -import org.apache.spark.storage.StorageLevel; -import org.apache.spark.streaming.api.java.*; -import org.apache.spark.util.LongAccumulator; -import org.apache.spark.util.Utils; - -// The test suite itself is Serializable so that anonymous Function implementations can be -// serialized, as an alternative to converting these anonymous classes to static inner classes; -// see http://stackoverflow.com/questions/758570/. -public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable { - - public static void equalIterator(Iterator a, Iterator b) { -while (a.hasNext() && b.hasNext()) { - Assert.assertEquals(a.next(), b.next()); -} -Assert.assertEquals(a.hasNext(), b.hasNext()); - } - - public static void equalIterable(Iterable a, Iterable b) { - equalIterator(a.iterator(), b.iterator()); - } - - @Test - public void testInitialization() { -Assert.assertNotNull(ssc.sparkContext()); - } - - @SuppressWarnings("unchecked") - @Test - public void testContextState() { -ListinputData = Arrays.asList(Arrays.asList(1, 2, 3, 4)); -Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState()); -JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); -JavaTestUtils.attachTestOutputStream(stream); -Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState()); -ssc.start(); -Assert.assertEquals(StreamingContextState.ACTIVE, ssc.getState()); -ssc.stop(); -Assert.assertEquals(StreamingContextState.STOPPED, ssc.getState()); - } - - @SuppressWarnings("unchecked") - @Test - public void testCount() { -List
inputData = Arrays.asList( -Arrays.asList(1,2,3,4), -Arrays.asList(3,4,5), -Arrays.asList(3)); - -List
expected = Arrays.asList( -Arrays.asList(4L), -Arrays.asList(3L), -Arrays.asList(1L)); - -JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); -JavaDStream count = stream.count(); -JavaTestUtils.attachTestOutputStream(count); -List
result = JavaTestUtils.runStreams(ssc, 3, 3); -assertOrderInvariantEquals(expected, result); - } - - @SuppressWarnings("unchecked") - @Test - public void testMap() { -List
inputData = Arrays.asList( -Arrays.asList("hello", "world"), -Arrays.asList("goodnight", "moon")); - -List
expected = Arrays.asList( -Arrays.asList(5,5), -Arrays.asList(9,4)); - -JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); -JavaDStream letterCount = stream.map(new Function
() { -@Override -public Integer call(String s) { - return s.length(); -} -}); -
[8/8] spark git commit: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support
[SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support - Move external/java8-tests tests into core, streaming, sql and remove - Remove MaxPermGen and related options - Fix some reflection / TODOs around Java 8+ methods - Update doc references to 1.7/1.8 differences - Remove Java 7/8 related build profiles - Update some plugins for better Java 8 compatibility - Fix a few Java-related warnings For the future: - Update Java 8 examples to fully use Java 8 - Update Java tests to use lambdas for simplicity - Update Java internal implementations to use lambdas ## How was this patch tested? Existing tests Author: Sean OwenCloses #16871 from srowen/SPARK-19493. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e240549 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e240549 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e240549 Branch: refs/heads/master Commit: 0e2405490f2056728d1353abbac6f3ea177ae533 Parents: 3871d94 Author: Sean Owen Authored: Thu Feb 16 12:32:45 2017 + Committer: Sean Owen Committed: Thu Feb 16 12:32:45 2017 + -- assembly/pom.xml|1 + build/mvn |8 +- build/sbt-launch-lib.bash |2 +- .../spark/network/client/TransportClient.java | 111 +- .../network/crypto/AuthClientBootstrap.java | 16 +- .../spark/network/crypto/AuthRpcHandler.java|3 - .../network/server/TransportRequestHandler.java | 27 +- .../spark/network/crypto/AuthEngineSuite.java |2 - .../shuffle/ExternalShuffleBlockHandler.java|8 +- .../shuffle/ExternalShuffleBlockResolver.java |7 +- .../network/shuffle/ExternalShuffleClient.java | 21 +- .../network/shuffle/RetryingBlockFetcher.java |9 +- common/sketch/pom.xml |2 + common/unsafe/pom.xml |2 + .../java/org/apache/spark/unsafe/Platform.java |9 +- .../spark/unsafe/types/CalendarInterval.java| 88 +- .../org/apache/spark/api/java/Optional.java |7 +- .../api/java/function/CoGroupFunction.java |1 + .../java/function/DoubleFlatMapFunction.java|1 + .../spark/api/java/function/DoubleFunction.java |1 + .../spark/api/java/function/FilterFunction.java |1 + .../api/java/function/FlatMapFunction.java |1 + .../api/java/function/FlatMapFunction2.java |1 + .../java/function/FlatMapGroupsFunction.java|1 + .../api/java/function/ForeachFunction.java |1 + .../java/function/ForeachPartitionFunction.java |1 + .../spark/api/java/function/Function.java |1 + .../spark/api/java/function/Function0.java |1 + .../spark/api/java/function/Function2.java |1 + .../spark/api/java/function/Function3.java |1 + .../spark/api/java/function/Function4.java |1 + .../spark/api/java/function/MapFunction.java|1 + .../api/java/function/MapGroupsFunction.java|1 + .../java/function/MapPartitionsFunction.java|1 + .../api/java/function/PairFlatMapFunction.java |1 + .../spark/api/java/function/PairFunction.java |1 + .../spark/api/java/function/ReduceFunction.java |1 + .../spark/api/java/function/VoidFunction.java |1 + .../spark/api/java/function/VoidFunction2.java |1 + .../unsafe/sort/UnsafeExternalSorter.java |9 +- .../unsafe/sort/UnsafeSorterSpillMerger.java| 28 +- .../scala/org/apache/spark/SparkContext.scala |3 - .../spark/launcher/WorkerCommandBuilder.scala |1 - .../scala/org/apache/spark/util/Utils.scala | 44 +- .../java/org/apache/spark/JavaAPISuite.java | 1836 .../test/org/apache/spark/Java8RDDAPISuite.java | 356 .../test/org/apache/spark/JavaAPISuite.java | 1842 .../org/apache/spark/util/UtilsSuite.scala |6 +- dev/appveyor-install-dependencies.ps1 |2 +- dev/create-release/release-build.sh |1 - dev/make-distribution.sh|2 +- dev/mima|1 - dev/run-tests.py|3 - dev/test-dependencies.sh|2 +- docs/building-spark.md | 32 +- docs/index.md |6 +- docs/mllib-linear-methods.md|2 +- docs/mllib-statistics.md|7 +- docs/programming-guide.md | 11 +- docs/quick-start.md |9 +- docs/streaming-custom-receivers.md | 10 +- docs/streaming-kafka-0-10-integration.md| 62 +- docs/streaming-kafka-0-8-integration.md | 41 +-
[1/8] spark git commit: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support
Repository: spark Updated Branches: refs/heads/master 3871d94a6 -> 0e2405490 http://git-wip-us.apache.org/repos/asf/spark/blob/0e240549/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java -- diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java new file mode 100644 index 000..8d24104 --- /dev/null +++ b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java @@ -0,0 +1,2008 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.streaming; + +import java.io.*; +import java.nio.charset.StandardCharsets; +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.JavaCheckpointTestUtils; +import org.apache.spark.streaming.JavaTestUtils; +import org.apache.spark.streaming.LocalJavaStreamingContext; +import org.apache.spark.streaming.Seconds; +import org.apache.spark.streaming.StreamingContextState; +import org.apache.spark.streaming.StreamingContextSuite; +import org.apache.spark.streaming.Time; +import scala.Tuple2; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; + +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.io.Files; +import com.google.common.collect.Sets; + +import org.apache.spark.HashPartitioner; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.Optional; +import org.apache.spark.api.java.function.*; +import org.apache.spark.storage.StorageLevel; +import org.apache.spark.streaming.api.java.*; +import org.apache.spark.util.LongAccumulator; +import org.apache.spark.util.Utils; + +// The test suite itself is Serializable so that anonymous Function implementations can be +// serialized, as an alternative to converting these anonymous classes to static inner classes; +// see http://stackoverflow.com/questions/758570/. +public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable { + + public static void equalIterator(Iterator a, Iterator b) { +while (a.hasNext() && b.hasNext()) { + Assert.assertEquals(a.next(), b.next()); +} +Assert.assertEquals(a.hasNext(), b.hasNext()); + } + + public static void equalIterable(Iterable a, Iterable b) { + equalIterator(a.iterator(), b.iterator()); + } + + @Test + public void testInitialization() { +Assert.assertNotNull(ssc.sparkContext()); + } + + @SuppressWarnings("unchecked") + @Test + public void testContextState() { +ListinputData = Arrays.asList(Arrays.asList(1, 2, 3, 4)); +Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState()); +JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); +JavaTestUtils.attachTestOutputStream(stream); +Assert.assertEquals(StreamingContextState.INITIALIZED, ssc.getState()); +ssc.start(); +Assert.assertEquals(StreamingContextState.ACTIVE, ssc.getState()); +ssc.stop(); +Assert.assertEquals(StreamingContextState.STOPPED, ssc.getState()); + } + + @SuppressWarnings("unchecked") + @Test + public void testCount() { +List
inputData = Arrays.asList( +Arrays.asList(1,2,3,4), +Arrays.asList(3,4,5), +Arrays.asList(3)); + +List
expected = Arrays.asList( +Arrays.asList(4L), +Arrays.asList(3L), +Arrays.asList(1L)); + +JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); +JavaDStream count = stream.count(); +JavaTestUtils.attachTestOutputStream(count); +List
result = JavaTestUtils.runStreams(ssc, 3, 3); +assertOrderInvariantEquals(expected, result); + } + + @SuppressWarnings("unchecked") + @Test + public void testMap() { +
spark git commit: [SPARK-18871][SQL][TESTS] New test cases for IN/NOT IN subquery 3rd batch
Repository: spark Updated Branches: refs/heads/master f041e55ee -> 3871d94a6 [SPARK-18871][SQL][TESTS] New test cases for IN/NOT IN subquery 3rd batch ## What changes were proposed in this pull request? This is 3ird batch of test case for IN/NOT IN subquery. In this PR, it has these test files: `in-having.sql` `in-joins.sql` `in-multiple-columns.sql` These are the queries and results from running on DB2. [in-having DB2 version](https://github.com/apache/spark/files/772668/in-having.sql.db2.txt) [output of in-having](https://github.com/apache/spark/files/772670/in-having.sql.db2.out.txt) [in-joins DB2 version](https://github.com/apache/spark/files/772672/in-joins.sql.db2.txt) [output of in-joins](https://github.com/apache/spark/files/772673/in-joins.sql.db2.out.txt) [in-multiple-columns DB2 version](https://github.com/apache/spark/files/772678/in-multiple-columns.sql.db2.txt) [output of in-multiple-columns](https://github.com/apache/spark/files/772680/in-multiple-columns.sql.db2.out.txt) ## How was this patch tested? This pr is adding new test cases. We compare the result from spark with the result from another RDBMS(We used DB2 LUW). If the results are the same, we assume the result is correct. Author: Kevin YuCloses #16841 from kevinyu98/spark-18871-33. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3871d94a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3871d94a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3871d94a Branch: refs/heads/master Commit: 3871d94a695d47169720e877f77ff1e4bede43ee Parents: f041e55 Author: Kevin Yu Authored: Thu Feb 16 00:02:15 2017 -0800 Committer: Xiao Li Committed: Thu Feb 16 00:02:15 2017 -0800 -- .../inputs/subquery/in-subquery/in-having.sql | 152 .../inputs/subquery/in-subquery/in-joins.sql| 270 ++ .../in-subquery/in-multiple-columns.sql | 127 +++ .../subquery/in-subquery/in-having.sql.out | 217 .../subquery/in-subquery/in-joins.sql.out | 353 +++ .../in-subquery/in-multiple-columns.sql.out | 178 ++ 6 files changed, 1297 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3871d94a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-having.sql -- diff --git a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-having.sql b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-having.sql new file mode 100644 index 000..8f98ae1 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-having.sql @@ -0,0 +1,152 @@ +-- A test suite for IN HAVING in parent side, subquery, and both predicate subquery +-- It includes correlated cases. + +create temporary view t1 as select * from values + ("val1a", 6S, 8, 10L, float(15.0), 20D, 20E2, timestamp '2014-04-04 01:00:00.000', date '2014-04-04'), + ("val1b", 8S, 16, 19L, float(17.0), 25D, 26E2, timestamp '2014-05-04 01:01:00.000', date '2014-05-04'), + ("val1a", 16S, 12, 21L, float(15.0), 20D, 20E2, timestamp '2014-06-04 01:02:00.001', date '2014-06-04'), + ("val1a", 16S, 12, 10L, float(15.0), 20D, 20E2, timestamp '2014-07-04 01:01:00.000', date '2014-07-04'), + ("val1c", 8S, 16, 19L, float(17.0), 25D, 26E2, timestamp '2014-05-04 01:02:00.001', date '2014-05-05'), + ("val1d", null, 16, 22L, float(17.0), 25D, 26E2, timestamp '2014-06-04 01:01:00.000', null), + ("val1d", null, 16, 19L, float(17.0), 25D, 26E2, timestamp '2014-07-04 01:02:00.001', null), + ("val1e", 10S, null, 25L, float(17.0), 25D, 26E2, timestamp '2014-08-04 01:01:00.000', date '2014-08-04'), + ("val1e", 10S, null, 19L, float(17.0), 25D, 26E2, timestamp '2014-09-04 01:02:00.001', date '2014-09-04'), + ("val1d", 10S, null, 12L, float(17.0), 25D, 26E2, timestamp '2015-05-04 01:01:00.000', date '2015-05-04'), + ("val1a", 6S, 8, 10L, float(15.0), 20D, 20E2, timestamp '2014-04-04 01:02:00.001', date '2014-04-04'), + ("val1e", 10S, null, 19L, float(17.0), 25D, 26E2, timestamp '2014-05-04 01:01:00.000', date '2014-05-04') + as t1(t1a, t1b, t1c, t1d, t1e, t1f, t1g, t1h, t1i); + +create temporary view t2 as select * from values + ("val2a", 6S, 12, 14L, float(15), 20D, 20E2, timestamp '2014-04-04 01:01:00.000', date '2014-04-04'), + ("val1b", 10S, 12, 19L, float(17), 25D, 26E2, timestamp '2014-05-04 01:01:00.000', date '2014-05-04'), + ("val1b", 8S, 16, 119L, float(17), 25D, 26E2, timestamp '2015-05-04 01:01:00.000', date '2015-05-04'), + ("val1c", 12S, 16, 219L, float(17), 25D, 26E2, timestamp '2016-05-04 01:01:00.000', date '2016-05-04'), + ("val1b", null, 16, 319L, float(17),