spark git commit: [SPARK-21583][SQL] Create a ColumnarBatch from ArrowColumnVectors
Repository: spark Updated Branches: refs/heads/master ecf437a64 -> 964b507c7 [SPARK-21583][SQL] Create a ColumnarBatch from ArrowColumnVectors ## What changes were proposed in this pull request? This PR allows the creation of a `ColumnarBatch` from `ReadOnlyColumnVectors` where previously a columnar batch could only allocate vectors internally. This is useful for using `ArrowColumnVectors` in a batch form to do row-based iteration. Also added `ArrowConverter.fromPayloadIterator` which converts `ArrowPayload` iterator to `InternalRow` iterator and uses a `ColumnarBatch` internally. ## How was this patch tested? Added a new unit test for creating a `ColumnarBatch` with `ReadOnlyColumnVectors` and a test to verify the roundtrip of rows -> ArrowPayload -> rows, using `toPayloadIterator` and `fromPayloadIterator`. Author: Bryan CutlerCloses #18787 from BryanCutler/arrow-ColumnarBatch-support-SPARK-21583. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/964b507c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/964b507c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/964b507c Branch: refs/heads/master Commit: 964b507c7511cf3f4383cb0fc4026a573034b8cc Parents: ecf437a Author: Bryan Cutler Authored: Thu Aug 31 13:08:52 2017 +0900 Committer: Takuya UESHIN Committed: Thu Aug 31 13:08:52 2017 +0900 -- .../sql/execution/arrow/ArrowConverters.scala | 76 +++- .../execution/arrow/ArrowConvertersSuite.scala | 29 +++- .../vectorized/ColumnarBatchSuite.scala | 54 ++ 3 files changed, 157 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/964b507c/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala index fa45822..561a067 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.arrow import java.io.ByteArrayOutputStream import java.nio.channels.Channels +import scala.collection.JavaConverters._ + import org.apache.arrow.memory.BufferAllocator import org.apache.arrow.vector._ import org.apache.arrow.vector.file._ @@ -28,6 +30,7 @@ import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel import org.apache.spark.TaskContext import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector} import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -35,7 +38,7 @@ import org.apache.spark.util.Utils /** * Store Arrow data in a form that can be serialized by Spark and served to a Python process. */ -private[sql] class ArrowPayload private[arrow] (payload: Array[Byte]) extends Serializable { +private[sql] class ArrowPayload private[sql] (payload: Array[Byte]) extends Serializable { /** * Convert the ArrowPayload to an ArrowRecordBatch. @@ -50,6 +53,17 @@ private[sql] class ArrowPayload private[arrow] (payload: Array[Byte]) extends Se def asPythonSerializable: Array[Byte] = payload } +/** + * Iterator interface to iterate over Arrow record batches and return rows + */ +private[sql] trait ArrowRowIterator extends Iterator[InternalRow] { + + /** + * Return the schema loaded from the Arrow record batch being iterated over + */ + def schema: StructType +} + private[sql] object ArrowConverters { /** @@ -111,6 +125,66 @@ private[sql] object ArrowConverters { } /** + * Maps Iterator from ArrowPayload to InternalRow. Returns a pair containing the row iterator + * and the schema from the first batch of Arrow data read. + */ + private[sql] def fromPayloadIterator( + payloadIter: Iterator[ArrowPayload], + context: TaskContext): ArrowRowIterator = { +val allocator = + ArrowUtils.rootAllocator.newChildAllocator("fromPayloadIterator", 0, Long.MaxValue) + +new ArrowRowIterator { + private var reader: ArrowFileReader = null + private var schemaRead = StructType(Seq.empty) + private var rowIter = if (payloadIter.hasNext) nextBatch() else Iterator.empty + + context.addTaskCompletionListener { _ => +closeReader() +allocator.close() + } + + override def schema: StructType = schemaRead + + override def hasNext: Boolean = rowIter.hasNext || { +
spark git commit: [SPARK-21534][SQL][PYSPARK] PickleException when creating dataframe from python row with empty bytearray
Repository: spark Updated Branches: refs/heads/master 4482ff23a -> ecf437a64 [SPARK-21534][SQL][PYSPARK] PickleException when creating dataframe from python row with empty bytearray ## What changes were proposed in this pull request? `PickleException` is thrown when creating dataframe from python row with empty bytearray spark.createDataFrame(spark.sql("select unhex('') as xx").rdd.map(lambda x: {"abc": x.xx})).show() net.razorvine.pickle.PickleException: invalid pickle data for bytearray; expected 1 or 2 args, got 0 at net.razorvine.pickle.objects.ByteArrayConstructor.construct(ByteArrayConstructor.java ... `ByteArrayConstructor` doesn't deal with empty byte array pickled by Python3. ## How was this patch tested? Added test. Author: Liang-Chi HsiehCloses #19085 from viirya/SPARK-21534. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ecf437a6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ecf437a6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ecf437a6 Branch: refs/heads/master Commit: ecf437a64874a31328f4e28c6b24f37557fbe07d Parents: 4482ff2 Author: Liang-Chi Hsieh Authored: Thu Aug 31 12:55:38 2017 +0900 Committer: hyukjinkwon Committed: Thu Aug 31 12:55:38 2017 +0900 -- .../scala/org/apache/spark/api/python/SerDeUtil.scala | 14 ++ python/pyspark/sql/tests.py | 4 +++- 2 files changed, 17 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ecf437a6/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala index aaf8e7a..01e64b6 100644 --- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -35,6 +35,16 @@ import org.apache.spark.rdd.RDD /** Utilities for serialization / deserialization between Python and Java, using Pickle. */ private[spark] object SerDeUtil extends Logging { + class ByteArrayConstructor extends net.razorvine.pickle.objects.ByteArrayConstructor { +override def construct(args: Array[Object]): Object = { + // Deal with an empty byte array pickled by Python 3. + if (args.length == 0) { +Array.emptyByteArray + } else { +super.construct(args) + } +} + } // Unpickle array.array generated by Python 2.6 class ArrayConstructor extends net.razorvine.pickle.objects.ArrayConstructor { // /* Description of types */ @@ -108,6 +118,10 @@ private[spark] object SerDeUtil extends Logging { synchronized{ if (!initialized) { Unpickler.registerConstructor("array", "array", new ArrayConstructor()) +Unpickler.registerConstructor("__builtin__", "bytearray", new ByteArrayConstructor()) +Unpickler.registerConstructor("builtins", "bytearray", new ByteArrayConstructor()) +Unpickler.registerConstructor("__builtin__", "bytes", new ByteArrayConstructor()) +Unpickler.registerConstructor("_codecs", "encode", new ByteArrayConstructor()) initialized = true } } http://git-wip-us.apache.org/repos/asf/spark/blob/ecf437a6/python/pyspark/sql/tests.py -- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 1ecde68..b310285 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -2383,9 +2383,11 @@ class SQLTests(ReusedPySparkTestCase): def test_BinaryType_serialization(self): # Pyrolite version <= 4.9 could not serialize BinaryType with Python3 SPARK-17808 +# The empty bytearray is test for SPARK-21534. schema = StructType([StructField('mybytes', BinaryType())]) data = [[bytearray(b'here is my data')], -[bytearray(b'and here is some more')]] +[bytearray(b'and here is some more')], +[bytearray(b'')]] df = self.spark.createDataFrame(data, schema=schema) df.collect() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-17321][YARN] Avoid writing shuffle metadata to disk if NM recovery is disabled
Repository: spark Updated Branches: refs/heads/master cd5d0f337 -> 4482ff23a [SPARK-17321][YARN] Avoid writing shuffle metadata to disk if NM recovery is disabled In the current code, if NM recovery is not enabled then `YarnShuffleService` will write shuffle metadata to NM local dir-1, if this local dir-1 is on bad disk, then `YarnShuffleService` will be failed to start. So to solve this issue, in Spark side if NM recovery is not enabled, then Spark will not persist data into leveldb, in that case yarn shuffle service can still be served but lose the ability for recovery, (it is fine because the failure of NM will kill the containers as well as applications). Tested in the local cluster with NM recovery off and on to see if folder is created or not. MiniCluster UT isn't added because in MiniCluster NM will always set port to 0, but NM recovery requires non-ephemeral port. Author: jerryshaoCloses #19032 from jerryshao/SPARK-17321. Change-Id: I8f2fe73d175e2ad2c4e380caede3873e0192d027 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4482ff23 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4482ff23 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4482ff23 Branch: refs/heads/master Commit: 4482ff23ad984335b0d477100ac0815d5db8d532 Parents: cd5d0f3 Author: jerryshao Authored: Thu Aug 31 09:26:20 2017 +0800 Committer: jerryshao Committed: Thu Aug 31 09:26:20 2017 +0800 -- .../spark/network/yarn/YarnShuffleService.java | 82 ++-- .../yarn/YarnShuffleIntegrationSuite.scala | 33 ++-- .../network/yarn/YarnShuffleServiceSuite.scala | 32 +--- 3 files changed, 86 insertions(+), 61 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4482ff23/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java -- diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index cd67eb2..d8b2ed6 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -29,6 +29,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -160,7 +161,9 @@ public class YarnShuffleService extends AuxiliaryService { // If we don't find one, then we choose a file to use to save the state next time. Even if // an application was stopped while the NM was down, we expect yarn to call stopApplication() // when it comes back - registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME); + if (_recoveryPath != null) { +registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME); + } TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile); @@ -170,7 +173,10 @@ public class YarnShuffleService extends AuxiliaryService { List bootstraps = Lists.newArrayList(); boolean authEnabled = conf.getBoolean(SPARK_AUTHENTICATE_KEY, DEFAULT_SPARK_AUTHENTICATE); if (authEnabled) { -createSecretManager(); +secretManager = new ShuffleSecretManager(); +if (_recoveryPath != null) { + loadSecretsFromDb(); +} bootstraps.add(new AuthServerBootstrap(transportConf, secretManager)); } @@ -194,13 +200,12 @@ public class YarnShuffleService extends AuxiliaryService { } } - private void createSecretManager() throws IOException { -secretManager = new ShuffleSecretManager(); + private void loadSecretsFromDb() throws IOException { secretsFile = initRecoveryDb(SECRETS_RECOVERY_FILE_NAME); // Make sure this is protected in case its not in the NM recovery dir FileSystem fs = FileSystem.getLocal(_conf); -fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short)0700)); +fs.mkdirs(new Path(secretsFile.getPath()), new FsPermission((short) 0700)); db = LevelDBProvider.initLevelDB(secretsFile, CURRENT_VERSION, mapper); logger.info("Recovery location is: " + secretsFile.getPath()); @@ -317,10 +322,10 @@ public class YarnShuffleService
spark git commit: [SPARK-11574][CORE] Add metrics StatsD sink
Repository: spark Updated Branches: refs/heads/master 313c6ca43 -> cd5d0f337 [SPARK-11574][CORE] Add metrics StatsD sink This patch adds statsd sink to the current metrics system in spark core. Author: Xiaofeng LinCloses #9518 from xflin/statsd. Change-Id: Ib8720e86223d4a650df53f51ceb963cd95b49a44 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cd5d0f33 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cd5d0f33 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cd5d0f33 Branch: refs/heads/master Commit: cd5d0f3379b1a9fa0940ffd98bfff33f8cbcdeb0 Parents: 313c6ca Author: Xiaofeng Lin Authored: Thu Aug 31 08:57:15 2017 +0800 Committer: jerryshao Committed: Thu Aug 31 08:57:15 2017 +0800 -- conf/metrics.properties.template| 12 ++ .../spark/metrics/sink/StatsdReporter.scala | 163 +++ .../apache/spark/metrics/sink/StatsdSink.scala | 75 + .../spark/metrics/sink/StatsdSinkSuite.scala| 161 ++ docs/monitoring.md | 1 + 5 files changed, 412 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cd5d0f33/conf/metrics.properties.template -- diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template index aeb76c9..4c008a1 100644 --- a/conf/metrics.properties.template +++ b/conf/metrics.properties.template @@ -118,6 +118,14 @@ # prefixEMPTY STRING Prefix to prepend to every metric's name # protocol tcp Protocol ("tcp" or "udp") to use +# org.apache.spark.metrics.sink.StatsdSink +# Name: Default: Description: +# host 127.0.0.1 Hostname or IP of StatsD server +# port 8125 Port of StatsD server +# period10Poll period +# unit seconds Units of poll period +# prefixEMPTY STRING Prefix to prepend to metric name + ## Examples # Enable JmxSink for all instances by class name #*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink @@ -125,6 +133,10 @@ # Enable ConsoleSink for all instances by class name #*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink +# Enable StatsdSink for all instances by class name +#*.sink.statsd.class=org.apache.spark.metrics.sink.StatsdSink +#*.sink.statsd.prefix=spark + # Polling period for the ConsoleSink #*.sink.console.period=10 # Unit of the polling period for the ConsoleSink http://git-wip-us.apache.org/repos/asf/spark/blob/cd5d0f33/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala -- diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala new file mode 100644 index 000..ba75aa1 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +import java.io.IOException +import java.net.{DatagramPacket, DatagramSocket, InetSocketAddress} +import java.nio.charset.StandardCharsets.UTF_8 +import java.util.SortedMap +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.util.{Failure, Success, Try} + +import com.codahale.metrics._ +import org.apache.hadoop.net.NetUtils + +import org.apache.spark.internal.Logging + +/** + * @see https://github.com/etsy/statsd/blob/master/docs/metric_types.md;> + *StatsD metric types + */ +private[spark] object StatsdMetricType { + val COUNTER = "c" + val GAUGE = "g" + val TIMER = "ms" + val Set = "s" +} + +private[spark] class StatsdReporter( +registry: MetricRegistry, +host: String = "127.0.0.1", +port: Int = 8125, +prefix: String = "", +filter: MetricFilter = MetricFilter.ALL, +rateUnit: TimeUnit =
spark git commit: [SPARK-21875][BUILD] Fix Java style bugs
Repository: spark Updated Branches: refs/heads/master d8f454086 -> 313c6ca43 [SPARK-21875][BUILD] Fix Java style bugs ## What changes were proposed in this pull request? Fix Java code style so `./dev/lint-java` succeeds ## How was this patch tested? Run `./dev/lint-java` Author: Andrew AshCloses #19088 from ash211/spark-21875-lint-java. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/313c6ca4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/313c6ca4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/313c6ca4 Branch: refs/heads/master Commit: 313c6ca43593e247ab8cedac15c77d13e2830d6b Parents: d8f4540 Author: Andrew Ash Authored: Thu Aug 31 09:26:11 2017 +0900 Committer: hyukjinkwon Committed: Thu Aug 31 09:26:11 2017 +0900 -- core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java | 3 ++- .../src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/313c6ca4/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java -- diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java index 0f1e902..44b60c1 100644 --- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java +++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java @@ -74,7 +74,8 @@ public class TaskMemoryManager { * Maximum supported data page size (in bytes). In principle, the maximum addressable page size is * (1L OFFSET_BITS) bytes, which is 2+ petabytes. However, the on-heap allocator's * maximum page size is limited by the maximum amount of data that can be stored in a long[] - * array, which is (2^31 - 1) * 8 bytes (or about 17 gigabytes). Therefore, we cap this at 17 gigabytes. + * array, which is (2^31 - 1) * 8 bytes (or about 17 gigabytes). Therefore, we cap this at 17 + * gigabytes. */ public static final long MAXIMUM_PAGE_SIZE_BYTES = ((1L << 31) - 1) * 8L; http://git-wip-us.apache.org/repos/asf/spark/blob/313c6ca4/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java -- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java index 3e57403..13b006f 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java @@ -1337,7 +1337,8 @@ public class JavaDatasetSuite implements Serializable { public boolean equals(Object other) { if (other instanceof BeanWithEnum) { BeanWithEnum beanWithEnum = (BeanWithEnum) other; -return beanWithEnum.regularField.equals(regularField) && beanWithEnum.enumField.equals(enumField); +return beanWithEnum.regularField.equals(regularField) + && beanWithEnum.enumField.equals(enumField); } return false; } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21839][SQL] Support SQL config for ORC compression
Repository: spark Updated Branches: refs/heads/master 6949a9c5c -> d8f454086 [SPARK-21839][SQL] Support SQL config for ORC compression ## What changes were proposed in this pull request? This PR aims to support `spark.sql.orc.compression.codec` like Parquet's `spark.sql.parquet.compression.codec`. Users can use SQLConf to control ORC compression, too. ## How was this patch tested? Pass the Jenkins with new and updated test cases. Author: Dongjoon HyunCloses #19055 from dongjoon-hyun/SPARK-21839. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8f45408 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8f45408 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8f45408 Branch: refs/heads/master Commit: d8f45408635d4fccac557cb1e877dfe9267fb326 Parents: 6949a9c Author: Dongjoon Hyun Authored: Thu Aug 31 08:16:58 2017 +0900 Committer: hyukjinkwon Committed: Thu Aug 31 08:16:58 2017 +0900 -- python/pyspark/sql/readwriter.py| 5 ++-- .../org/apache/spark/sql/internal/SQLConf.scala | 10 +++ .../org/apache/spark/sql/DataFrameWriter.scala | 8 -- .../spark/sql/hive/orc/OrcFileFormat.scala | 2 +- .../apache/spark/sql/hive/orc/OrcOptions.scala | 18 +++- .../spark/sql/hive/orc/OrcSourceSuite.scala | 29 ++-- 6 files changed, 57 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d8f45408/python/pyspark/sql/readwriter.py -- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 01da0dc..cb847a0 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -851,8 +851,9 @@ class DataFrameWriter(OptionUtils): :param partitionBy: names of partitioning columns :param compression: compression codec to use when saving to file. This can be one of the known case-insensitive shorten names (none, snappy, zlib, and lzo). -This will override ``orc.compress``. If None is set, it uses the -default value, ``snappy``. +This will override ``orc.compress`` and +``spark.sql.orc.compression.codec``. If None is set, it uses the value +specified in ``spark.sql.orc.compression.codec``. >>> orc_df = spark.read.orc('python/test_support/sql/orc_partitioned') >>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data')) http://git-wip-us.apache.org/repos/asf/spark/blob/d8f45408/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index a685099..c407874 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -322,6 +322,14 @@ object SQLConf { .booleanConf .createWithDefault(true) + val ORC_COMPRESSION = buildConf("spark.sql.orc.compression.codec") +.doc("Sets the compression codec use when writing ORC files. Acceptable values include: " + + "none, uncompressed, snappy, zlib, lzo.") +.stringConf +.transform(_.toLowerCase(Locale.ROOT)) +.checkValues(Set("none", "uncompressed", "snappy", "zlib", "lzo")) +.createWithDefault("snappy") + val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown") .doc("When true, enable filter pushdown for ORC files.") .booleanConf @@ -998,6 +1006,8 @@ class SQLConf extends Serializable with Logging { def useCompression: Boolean = getConf(COMPRESS_CACHED) + def orcCompressionCodec: String = getConf(ORC_COMPRESSION) + def parquetCompressionCodec: String = getConf(PARQUET_COMPRESSION) def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA) http://git-wip-us.apache.org/repos/asf/spark/blob/d8f45408/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index cca9352..07347d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -517,9 +517,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { *
spark git commit: [SPARK-21834] Incorrect executor request in case of dynamic allocation
Repository: spark Updated Branches: refs/heads/branch-2.1 576975356 -> 041eccb4f [SPARK-21834] Incorrect executor request in case of dynamic allocation ## What changes were proposed in this pull request? killExecutor api currently does not allow killing an executor without updating the total number of executors needed. In case of dynamic allocation is turned on and the allocator tries to kill an executor, the scheduler reduces the total number of executors needed ( see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L635) which is incorrect because the allocator already takes care of setting the required number of executors itself. ## How was this patch tested? Ran a job on the cluster and made sure the executor request is correct Author: Sital KediaCloses #19081 from sitalkedia/skedia/oss_fix_executor_allocation. (cherry picked from commit 6949a9c5c6120fdde1b63876ede661adbd1eb15e) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/041eccb4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/041eccb4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/041eccb4 Branch: refs/heads/branch-2.1 Commit: 041eccb4fa35a2778996c052dbcfd09f779b64a6 Parents: 5769753 Author: Sital Kedia Authored: Wed Aug 30 14:19:13 2017 -0700 Committer: Marcelo Vanzin Committed: Wed Aug 30 14:19:33 2017 -0700 -- .../main/scala/org/apache/spark/ExecutorAllocationManager.scala | 3 +++ 1 file changed, 3 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/041eccb4/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index f054a78..d25ab61 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -427,6 +427,9 @@ private[spark] class ExecutorAllocationManager( } else { client.killExecutors(executorIdsToBeRemoved) } +// [SPARK-21834] killExecutors api reduces the target number of executors. +// So we need to update the target with desired value. +client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) // reset the newExecutorTotal to the existing number of executors newExecutorTotal = numExistingExecutors if (testing || executorsRemoved.nonEmpty) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21834] Incorrect executor request in case of dynamic allocation
Repository: spark Updated Branches: refs/heads/branch-2.2 d10c9dc3f -> 14054ffc5 [SPARK-21834] Incorrect executor request in case of dynamic allocation ## What changes were proposed in this pull request? killExecutor api currently does not allow killing an executor without updating the total number of executors needed. In case of dynamic allocation is turned on and the allocator tries to kill an executor, the scheduler reduces the total number of executors needed ( see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L635) which is incorrect because the allocator already takes care of setting the required number of executors itself. ## How was this patch tested? Ran a job on the cluster and made sure the executor request is correct Author: Sital KediaCloses #19081 from sitalkedia/skedia/oss_fix_executor_allocation. (cherry picked from commit 6949a9c5c6120fdde1b63876ede661adbd1eb15e) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14054ffc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14054ffc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14054ffc Branch: refs/heads/branch-2.2 Commit: 14054ffc5fd3399d04d69e26efb31d8b24b60bdc Parents: d10c9dc Author: Sital Kedia Authored: Wed Aug 30 14:19:13 2017 -0700 Committer: Marcelo Vanzin Committed: Wed Aug 30 14:19:22 2017 -0700 -- .../main/scala/org/apache/spark/ExecutorAllocationManager.scala | 3 +++ 1 file changed, 3 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/14054ffc/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index bb5eb7f..632d5f2 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -430,6 +430,9 @@ private[spark] class ExecutorAllocationManager( } else { client.killExecutors(executorIdsToBeRemoved) } +// [SPARK-21834] killExecutors api reduces the target number of executors. +// So we need to update the target with desired value. +client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) // reset the newExecutorTotal to the existing number of executors newExecutorTotal = numExistingExecutors if (testing || executorsRemoved.nonEmpty) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21834] Incorrect executor request in case of dynamic allocation
Repository: spark Updated Branches: refs/heads/master 235d28333 -> 6949a9c5c [SPARK-21834] Incorrect executor request in case of dynamic allocation ## What changes were proposed in this pull request? killExecutor api currently does not allow killing an executor without updating the total number of executors needed. In case of dynamic allocation is turned on and the allocator tries to kill an executor, the scheduler reduces the total number of executors needed ( see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L635) which is incorrect because the allocator already takes care of setting the required number of executors itself. ## How was this patch tested? Ran a job on the cluster and made sure the executor request is correct Author: Sital KediaCloses #19081 from sitalkedia/skedia/oss_fix_executor_allocation. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6949a9c5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6949a9c5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6949a9c5 Branch: refs/heads/master Commit: 6949a9c5c6120fdde1b63876ede661adbd1eb15e Parents: 235d283 Author: Sital Kedia Authored: Wed Aug 30 14:19:13 2017 -0700 Committer: Marcelo Vanzin Committed: Wed Aug 30 14:19:13 2017 -0700 -- .../main/scala/org/apache/spark/ExecutorAllocationManager.scala | 3 +++ 1 file changed, 3 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6949a9c5/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 3350326..7a5fb9a 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -446,6 +446,9 @@ private[spark] class ExecutorAllocationManager( } else { client.killExecutors(executorIdsToBeRemoved) } +// [SPARK-21834] killExecutors api reduces the target number of executors. +// So we need to update the target with desired value. +client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount) // reset the newExecutorTotal to the existing number of executors newExecutorTotal = numExistingExecutors if (testing || executorsRemoved.nonEmpty) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21714][CORE][BACKPORT-2.2] Avoiding re-uploading remote resources in yarn client mode
Repository: spark Updated Branches: refs/heads/branch-2.2 a6a994414 -> d10c9dc3f [SPARK-21714][CORE][BACKPORT-2.2] Avoiding re-uploading remote resources in yarn client mode ## What changes were proposed in this pull request? This is a backport PR to fix issue of re-uploading remote resource in yarn client mode. The original PR is #18962. ## How was this patch tested? Tested in local UT. Author: jerryshaoCloses #19074 from jerryshao/SPARK-21714-2.2-backport. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d10c9dc3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d10c9dc3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d10c9dc3 Branch: refs/heads/branch-2.2 Commit: d10c9dc3f631a26dbbbd8f5c601ca2001a5d7c80 Parents: a6a9944 Author: jerryshao Authored: Wed Aug 30 12:30:24 2017 -0700 Committer: Marcelo Vanzin Committed: Wed Aug 30 12:30:24 2017 -0700 -- .../org/apache/spark/deploy/SparkSubmit.scala | 66 --- .../apache/spark/internal/config/package.scala | 2 +- .../scala/org/apache/spark/util/Utils.scala | 25 --- .../apache/spark/deploy/SparkSubmitSuite.scala | 68 .../org/apache/spark/repl/SparkILoop.scala | 2 +- .../main/scala/org/apache/spark/repl/Main.scala | 2 +- 6 files changed, 116 insertions(+), 49 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d10c9dc3/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index c60a2a1..86d578e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -208,14 +208,20 @@ object SparkSubmit extends CommandLineUtils { /** * Prepare the environment for submitting an application. - * This returns a 4-tuple: - * (1) the arguments for the child process, - * (2) a list of classpath entries for the child, - * (3) a map of system properties, and - * (4) the main class for the child + * + * @param args the parsed SparkSubmitArguments used for environment preparation. + * @param conf the Hadoop Configuration, this argument will only be set in unit test. + * @return a 4-tuple: + *(1) the arguments for the child process, + *(2) a list of classpath entries for the child, + *(3) a map of system properties, and + *(4) the main class for the child + * * Exposed for testing. */ - private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments) + private[deploy] def prepareSubmitEnvironment( + args: SparkSubmitArguments, + conf: Option[HadoopConfiguration] = None) : (Seq[String], Seq[String], Map[String, String], String) = { // Return values val childArgs = new ArrayBuffer[String]() @@ -311,12 +317,16 @@ object SparkSubmit extends CommandLineUtils { } // In client mode, download remote files. +var localPrimaryResource: String = null +var localJars: String = null +var localPyFiles: String = null +var localFiles: String = null if (deployMode == CLIENT) { - val hadoopConf = new HadoopConfiguration() - args.primaryResource = Option(args.primaryResource).map(downloadFile(_, hadoopConf)).orNull - args.jars = Option(args.jars).map(downloadFileList(_, hadoopConf)).orNull - args.pyFiles = Option(args.pyFiles).map(downloadFileList(_, hadoopConf)).orNull - args.files = Option(args.files).map(downloadFileList(_, hadoopConf)).orNull + val hadoopConf = conf.getOrElse(new HadoopConfiguration()) + localPrimaryResource = Option(args.primaryResource).map(downloadFile(_, hadoopConf)).orNull + localJars = Option(args.jars).map(downloadFileList(_, hadoopConf)).orNull + localPyFiles = Option(args.pyFiles).map(downloadFileList(_, hadoopConf)).orNull + localFiles = Option(args.files).map(downloadFileList(_, hadoopConf)).orNull } // Require all python files to be local, so we can add them to the PYTHONPATH @@ -366,7 +376,7 @@ object SparkSubmit extends CommandLineUtils { // If a python file is provided, add it to the child arguments and list of files to deploy. // Usage: PythonAppRunner [app arguments] args.mainClass = "org.apache.spark.deploy.PythonRunner" -args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs +args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ args.childArgs if (clusterManager !=
spark git commit: [MINOR][SQL][TEST] Test shuffle hash join while is not expected
Repository: spark Updated Branches: refs/heads/master 32d6d9d72 -> 235d28333 [MINOR][SQL][TEST] Test shuffle hash join while is not expected ## What changes were proposed in this pull request? igore("shuffle hash join") is to shuffle hash join to test _case class ShuffledHashJoinExec_. But when you 'ignore' -> 'test', the test is _case class BroadcastHashJoinExec_. Before modified, as a result ofï¼canBroadcast is true. Print information in _canBroadcast(plan: LogicalPlan)_ ``` canBroadcast plan.stats.sizeInBytes:6710880 canBroadcast conf.autoBroadcastJoinThreshold:1000 ``` After modified, plan.stats.sizeInBytes is 11184808. Print information in _canBuildLocalHashMap(plan: LogicalPlan)_ and _muchSmaller(a: LogicalPlan, b: LogicalPlan)_ : ``` canBuildLocalHashMap plan.stats.sizeInBytes:11184808 canBuildLocalHashMap conf.autoBroadcastJoinThreshold:1000 canBuildLocalHashMap conf.numShufflePartitions:2 ``` ``` muchSmaller a.stats.sizeInBytes * 3:33554424 muchSmaller b.stats.sizeInBytes:33554432 ``` ## How was this patch tested? existing test case. Author: caoxuewenCloses #19069 from heary-cao/shuffle_hash_join. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/235d2833 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/235d2833 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/235d2833 Branch: refs/heads/master Commit: 235d28333c63719008ee755138db5c964237f526 Parents: 32d6d9d Author: caoxuewen Authored: Wed Aug 30 10:10:24 2017 -0700 Committer: gatorsmile Committed: Wed Aug 30 10:10:24 2017 -0700 -- .../sql/execution/benchmark/JoinBenchmark.scala | 56 +--- 1 file changed, 37 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/235d2833/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala index 46db41a..5a25d72 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.benchmark +import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.IntegerType @@ -35,7 +36,9 @@ class JoinBenchmark extends BenchmarkBase { val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id as string) as v")) runBenchmark("Join w long", N) { - sparkSession.range(N).join(dim, (col("id") % M) === col("k")).count() + val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k")) + assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined) + df.count() } /* @@ -55,7 +58,9 @@ class JoinBenchmark extends BenchmarkBase { val dim = broadcast(sparkSession.range(M).selectExpr("id as k", "cast(id as string) as v")) runBenchmark("Join w long duplicated", N) { val dim = broadcast(sparkSession.range(M).selectExpr("cast(id/10 as long) as k")) - sparkSession.range(N).join(dim, (col("id") % M) === col("k")).count() + val df = sparkSession.range(N).join(dim, (col("id") % M) === col("k")) + assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined) + df.count() } /* @@ -75,9 +80,11 @@ class JoinBenchmark extends BenchmarkBase { .selectExpr("cast(id as int) as k1", "cast(id as int) as k2", "cast(id as string) as v")) runBenchmark("Join w 2 ints", N) { - sparkSession.range(N).join(dim2, + val df = sparkSession.range(N).join(dim2, (col("id") % M).cast(IntegerType) === col("k1") - && (col("id") % M).cast(IntegerType) === col("k2")).count() + && (col("id") % M).cast(IntegerType) === col("k2")) + assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined) + df.count() } /* @@ -97,9 +104,10 @@ class JoinBenchmark extends BenchmarkBase { .selectExpr("id as k1", "id as k2", "cast(id as string) as v")) runBenchmark("Join w 2 longs", N) { - sparkSession.range(N).join(dim3, + val df = sparkSession.range(N).join(dim3, (col("id") % M) === col("k1") && (col("id") % M) === col("k2")) -.count() + assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[BroadcastHashJoinExec]).isDefined) + df.count() } /* @@ -119,9 +127,10 @@
spark git commit: Revert "[SPARK-21845][SQL] Make codegen fallback of expressions configurable"
Repository: spark Updated Branches: refs/heads/master 4133c1b0a -> 32d6d9d72 Revert "[SPARK-21845][SQL] Make codegen fallback of expressions configurable" This reverts commit 3d0e174244bc293f11dff0f11ef705ba6cd5fe3a. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32d6d9d7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32d6d9d7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32d6d9d7 Branch: refs/heads/master Commit: 32d6d9d72019404ebd47f6aa64197d9f574bac8b Parents: 4133c1b Author: gatorsmileAuthored: Wed Aug 30 09:08:40 2017 -0700 Committer: gatorsmile Committed: Wed Aug 30 09:08:40 2017 -0700 -- .../org/apache/spark/sql/internal/SQLConf.scala | 6 +++--- .../org/apache/spark/sql/execution/SparkPlan.scala | 15 ++- .../spark/sql/execution/WholeStageCodegenExec.scala | 2 +- .../apache/spark/sql/DataFrameFunctionsSuite.scala | 2 +- .../scala/org/apache/spark/sql/DataFrameSuite.scala | 12 +--- .../org/apache/spark/sql/test/SharedSQLContext.scala | 2 -- .../org/apache/spark/sql/hive/test/TestHive.scala| 1 - 7 files changed, 16 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/32d6d9d7/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 24f51ef..a685099 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -551,9 +551,9 @@ object SQLConf { .intConf .createWithDefault(100) - val CODEGEN_FALLBACK = buildConf("spark.sql.codegen.fallback") + val WHOLESTAGE_FALLBACK = buildConf("spark.sql.codegen.fallback") .internal() -.doc("When true, (whole stage) codegen could be temporary disabled for the part of query that" + +.doc("When true, whole stage codegen could be temporary disabled for the part of query that" + " fail to compile generated code") .booleanConf .createWithDefault(true) @@ -1041,7 +1041,7 @@ class SQLConf extends Serializable with Logging { def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS) - def codegenFallback: Boolean = getConf(CODEGEN_FALLBACK) + def wholeStageFallback: Boolean = getConf(WHOLESTAGE_FALLBACK) def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES) http://git-wip-us.apache.org/repos/asf/spark/blob/32d6d9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index b1db9dd..c7277c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -56,10 +56,14 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ protected def sparkContext = sqlContext.sparkContext - // whether we should fallback when hitting compilation errors caused by codegen - private val codeGenFallBack = sqlContext.conf.codegenFallback - - protected val subexpressionEliminationEnabled = sqlContext.conf.subexpressionEliminationEnabled + // sqlContext will be null when we are being deserialized on the slaves. In this instance + // the value of subexpressionEliminationEnabled will be set by the deserializer after the + // constructor has run. + val subexpressionEliminationEnabled: Boolean = if (sqlContext != null) { +sqlContext.conf.subexpressionEliminationEnabled + } else { +false + } /** Overridden make copy also propagates sqlContext to copied plan. */ override def makeCopy(newArgs: Array[AnyRef]): SparkPlan = { @@ -366,7 +370,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ try { GeneratePredicate.generate(expression, inputSchema) } catch { - case _ @ (_: JaninoRuntimeException | _: CompileException) if codeGenFallBack => + case e @ (_: JaninoRuntimeException | _: CompileException) + if sqlContext == null || sqlContext.conf.wholeStageFallback => genInterpretedPredicate(expression, inputSchema) } } http://git-wip-us.apache.org/repos/asf/spark/blob/32d6d9d7/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --
spark git commit: [SPARK-21469][ML][EXAMPLES] Adding Examples for FeatureHasher
Repository: spark Updated Branches: refs/heads/master b30a11a6a -> 4133c1b0a [SPARK-21469][ML][EXAMPLES] Adding Examples for FeatureHasher ## What changes were proposed in this pull request? This PR adds ML examples for the FeatureHasher transform in Scala, Java, Python. ## How was this patch tested? Manually ran examples and verified that output is consistent for different APIs Author: Bryan CutlerCloses #19024 from BryanCutler/ml-examples-FeatureHasher-SPARK-21810. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4133c1b0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4133c1b0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4133c1b0 Branch: refs/heads/master Commit: 4133c1b0abb22f728fbff287f4f77a06ab88bbe8 Parents: b30a11a Author: Bryan Cutler Authored: Wed Aug 30 16:00:29 2017 +0200 Committer: Nick Pentreath Committed: Wed Aug 30 16:00:29 2017 +0200 -- docs/ml-features.md | 91 +++- .../examples/ml/JavaFeatureHasherExample.java | 69 +++ .../main/python/ml/feature_hasher_example.py| 46 ++ .../examples/ml/FeatureHasherExample.scala | 50 +++ .../apache/spark/ml/feature/FeatureHasher.scala | 7 +- 5 files changed, 256 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4133c1b0/docs/ml-features.md -- diff --git a/docs/ml-features.md b/docs/ml-features.md index e19fba2..86a0e09 100644 --- a/docs/ml-features.md +++ b/docs/ml-features.md @@ -53,9 +53,9 @@ are calculated based on the mapped indices. This approach avoids the need to com term-to-index map, which can be expensive for a large corpus, but it suffers from potential hash collisions, where different raw features may become the same term after hashing. To reduce the chance of collision, we can increase the target feature dimension, i.e. the number of buckets -of the hash table. Since a simple modulo is used to transform the hash function to a column index, -it is advisable to use a power of two as the feature dimension, otherwise the features will -not be mapped evenly to the columns. The default feature dimension is `$2^{18} = 262,144$`. +of the hash table. Since a simple modulo on the hashed value is used to determine the vector index, +it is advisable to use a power of two as the feature dimension, otherwise the features will not +be mapped evenly to the vector indices. The default feature dimension is `$2^{18} = 262,144$`. An optional binary toggle parameter controls term frequency counts. When set to true all nonzero frequency counts are set to 1. This is especially useful for discrete probabilistic models that model binary, rather than integer, counts. @@ -65,7 +65,7 @@ model binary, rather than integer, counts. **IDF**: `IDF` is an `Estimator` which is fit on a dataset and produces an `IDFModel`. The `IDFModel` takes feature vectors (generally created from `HashingTF` or `CountVectorizer`) and -scales each column. Intuitively, it down-weights columns which appear frequently in a corpus. +scales each feature. Intuitively, it down-weights features which appear frequently in a corpus. **Note:** `spark.ml` doesn't provide tools for text segmentation. We refer users to the [Stanford NLP Group](http://nlp.stanford.edu/) and @@ -211,6 +211,89 @@ for more details on the API. +## FeatureHasher + +Feature hashing projects a set of categorical or numerical features into a feature vector of +specified dimension (typically substantially smaller than that of the original feature +space). This is done using the [hashing trick](https://en.wikipedia.org/wiki/Feature_hashing) +to map features to indices in the feature vector. + +The `FeatureHasher` transformer operates on multiple columns. Each column may contain either +numeric or categorical features. Behavior and handling of column data types is as follows: + +- Numeric columns: For numeric features, the hash value of the column name is used to map the +feature value to its index in the feature vector. Numeric features are never treated as +categorical, even when they are integers. You must explicitly convert numeric columns containing +categorical features to strings first. +- String columns: For categorical features, the hash value of the string "column_name=value" +is used to map to the vector index, with an indicator value of `1.0`. Thus, categorical features +are "one-hot" encoded (similarly to using [OneHotEncoder](ml-features.html#onehotencoder) with +`dropLast=false`). +- Boolean columns: Boolean values are treated in the same way as string columns. That is, +boolean
spark git commit: [SPARK-21764][TESTS] Fix tests failures on Windows: resources not being closed and incorrect paths
Repository: spark Updated Branches: refs/heads/master 734ed7a7b -> b30a11a6a [SPARK-21764][TESTS] Fix tests failures on Windows: resources not being closed and incorrect paths ## What changes were proposed in this pull request? `org.apache.spark.deploy.RPackageUtilsSuite` ``` - jars without manifest return false *** FAILED *** (109 milliseconds) java.io.IOException: Unable to delete file: C:\projects\spark\target\tmp\1500266936418-0\dep1-c.jar ``` `org.apache.spark.deploy.SparkSubmitSuite` ``` - download one file to local *** FAILED *** (16 milliseconds) java.net.URISyntaxException: Illegal character in authority at index 6: s3a://C:\projects\spark\target\tmp\test2630198944759847458.jar - download list of files to local *** FAILED *** (0 milliseconds) java.net.URISyntaxException: Illegal character in authority at index 6: s3a://C:\projects\spark\target\tmp\test2783551769392880031.jar ``` `org.apache.spark.scheduler.ReplayListenerSuite` ``` - Replay compressed inprogress log file succeeding on partial read (156 milliseconds) Exception encountered when attempting to run a suite with class name: org.apache.spark.scheduler.ReplayListenerSuite *** ABORTED *** (1 second, 391 milliseconds) java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-8f3cacd6-faad-4121-b901-ba1bba8025a0 - End-to-end replay *** FAILED *** (62 milliseconds) java.io.IOException: No FileSystem for scheme: C - End-to-end replay with compression *** FAILED *** (110 milliseconds) java.io.IOException: No FileSystem for scheme: C ``` `org.apache.spark.sql.hive.StatisticsSuite` ``` - SPARK-21079 - analyze table with location different than that of individual partitions *** FAILED *** (875 milliseconds) org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string); - SPARK-21079 - analyze partitioned table with only a subset of partitions visible *** FAILED *** (47 milliseconds) org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string); ``` **Note:** this PR does not fix: `org.apache.spark.deploy.SparkSubmitSuite` ``` - launch simple application with spark-submit with redaction *** FAILED *** (172 milliseconds) java.util.NoSuchElementException: next on empty iterator ``` I can't reproduce this on my Windows machine but looks appearntly consistently failed on AppVeyor. This one is unclear to me yet and hard to debug so I did not include this one for now. **Note:** it looks there are more instances but it is hard to identify them partly due to flakiness and partly due to swarming logs and errors. Will probably go one more time if it is fine. ## How was this patch tested? Manually via AppVeyor: **Before** - `org.apache.spark.deploy.RPackageUtilsSuite`: https://ci.appveyor.com/project/spark-test/spark/build/771-windows-fix/job/8t8ra3lrljuir7q4 - `org.apache.spark.deploy.SparkSubmitSuite`: https://ci.appveyor.com/project/spark-test/spark/build/771-windows-fix/job/taquy84yudjjen64 - `org.apache.spark.scheduler.ReplayListenerSuite`: https://ci.appveyor.com/project/spark-test/spark/build/771-windows-fix/job/24omrfn2k0xfa9xq - `org.apache.spark.sql.hive.StatisticsSuite`: https://ci.appveyor.com/project/spark-test/spark/build/771-windows-fix/job/2079y1plgj76dc9l **After** - `org.apache.spark.deploy.RPackageUtilsSuite`: https://ci.appveyor.com/project/spark-test/spark/build/775-windows-fix/job/3803dbfn89ne1164 - `org.apache.spark.deploy.SparkSubmitSuite`: https://ci.appveyor.com/project/spark-test/spark/build/775-windows-fix/job/m5l350dp7u9a4xjr - `org.apache.spark.scheduler.ReplayListenerSuite`: https://ci.appveyor.com/project/spark-test/spark/build/775-windows-fix/job/565vf74pp6bfdk18 - `org.apache.spark.sql.hive.StatisticsSuite`: https://ci.appveyor.com/project/spark-test/spark/build/775-windows-fix/job/qm78tsk8c37jb6s4 Jenkins tests are required and AppVeyor tests will be triggered. Author: hyukjinkwonCloses #18971 from HyukjinKwon/windows-fixes. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b30a11a6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b30a11a6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b30a11a6 Branch: refs/heads/master Commit: b30a11a6acf4b1512b5759f21ae58e69662ba455 Parents: 734ed7a Author: hyukjinkwon Authored: Wed Aug 30 21:35:52 2017 +0900 Committer: hyukjinkwon Committed: Wed Aug 30 21:35:52 2017 +0900 -- .../spark/deploy/RPackageUtilsSuite.scala | 7 +-- .../apache/spark/deploy/SparkSubmitSuite.scala | 4 +-
spark git commit: [SPARK-21806][MLLIB] BinaryClassificationMetrics pr(): first point (0.0, 1.0) is misleading
Repository: spark Updated Branches: refs/heads/master 8f0df6bc1 -> 734ed7a7b [SPARK-21806][MLLIB] BinaryClassificationMetrics pr(): first point (0.0, 1.0) is misleading ## What changes were proposed in this pull request? Prepend (0,p) to precision-recall curve not (0,1) where p matches lowest recall point ## How was this patch tested? Updated tests. Author: Sean OwenCloses #19038 from srowen/SPARK-21806. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/734ed7a7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/734ed7a7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/734ed7a7 Branch: refs/heads/master Commit: 734ed7a7b397578f16549070f350215bde369b3c Parents: 8f0df6b Author: Sean Owen Authored: Wed Aug 30 11:36:00 2017 +0100 Committer: Sean Owen Committed: Wed Aug 30 11:36:00 2017 +0100 -- .../BinaryClassificationMetrics.scala | 8 +++ .../BinaryClassificationMetricsSuite.scala | 22 +--- 2 files changed, 14 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/734ed7a7/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala index 9b7cd04..2cfcf38 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.scala @@ -98,16 +98,16 @@ class BinaryClassificationMetrics @Since("1.3.0") ( /** * Returns the precision-recall curve, which is an RDD of (recall, precision), - * NOT (precision, recall), with (0.0, 1.0) prepended to it. + * NOT (precision, recall), with (0.0, p) prepended to it, where p is the precision + * associated with the lowest recall on the curve. * @see http://en.wikipedia.org/wiki/Precision_and_recall;> * Precision and recall (Wikipedia) */ @Since("1.0.0") def pr(): RDD[(Double, Double)] = { val prCurve = createCurve(Recall, Precision) -val sc = confusions.context -val first = sc.makeRDD(Seq((0.0, 1.0)), 1) -first.union(prCurve) +val (_, firstPrecision) = prCurve.first() +confusions.context.parallelize(Seq((0.0, firstPrecision)), 1).union(prCurve) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/734ed7a7/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala index 99d52fa..a08917a 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/evaluation/BinaryClassificationMetricsSuite.scala @@ -23,18 +23,16 @@ import org.apache.spark.mllib.util.TestingUtils._ class BinaryClassificationMetricsSuite extends SparkFunSuite with MLlibTestSparkContext { - private def areWithinEpsilon(x: (Double, Double)): Boolean = x._1 ~= (x._2) absTol 1E-5 - - private def pairsWithinEpsilon(x: ((Double, Double), (Double, Double))): Boolean = -(x._1._1 ~= x._2._1 absTol 1E-5) && (x._1._2 ~= x._2._2 absTol 1E-5) - - private def assertSequencesMatch(left: Seq[Double], right: Seq[Double]): Unit = { - assert(left.zip(right).forall(areWithinEpsilon)) + private def assertSequencesMatch(actual: Seq[Double], expected: Seq[Double]): Unit = { +actual.zip(expected).foreach { case (a, e) => assert(a ~== e absTol 1.0e-5) } } - private def assertTupleSequencesMatch(left: Seq[(Double, Double)], - right: Seq[(Double, Double)]): Unit = { -assert(left.zip(right).forall(pairsWithinEpsilon)) + private def assertTupleSequencesMatch(actual: Seq[(Double, Double)], + expected: Seq[(Double, Double)]): Unit = { +actual.zip(expected).foreach { case ((ax, ay), (ex, ey)) => + assert(ax ~== ex absTol 1.0e-5) + assert(ay ~== ey absTol 1.0e-5) +} } private def validateMetrics(metrics: BinaryClassificationMetrics, @@ -44,7 +42,7 @@ class BinaryClassificationMetricsSuite extends SparkFunSuite with MLlibTestSpark expectedFMeasures1: Seq[Double], expectedFmeasures2: Seq[Double], expectedPrecisions: Seq[Double], - expectedRecalls: Seq[Double]) = { + expectedRecalls:
spark git commit: [SPARK-21873][SS] - Avoid using `return` inside `CachedKafkaConsumer.get`
Repository: spark Updated Branches: refs/heads/master d4895c9de -> 8f0df6bc1 [SPARK-21873][SS] - Avoid using `return` inside `CachedKafkaConsumer.get` During profiling of a structured streaming application with Kafka as the source, I came across this exception: ![Structured Streaming Kafka Exceptions](https://user-images.githubusercontent.com/3448320/29743366-4149ef78-8a99-11e7-94d6-f0cbb691134a.png) This is a 1 minute sample, which caused 106K `NonLocalReturnControl` exceptions to be thrown. This happens because `CachedKafkaConsumer.get` is ran inside: `private def runUninterruptiblyIfPossible[T](body: => T): T` Where `body: => T` is the `get` method. Turning the method into a function means that in order to escape the `while` loop defined in `get` the runtime has to do dirty tricks which involve throwing the above exception. ## What changes were proposed in this pull request? Instead of using `return` (which is generally not recommended in Scala), we place the result of the `fetchData` method inside a local variable and use a boolean flag to indicate the status of fetching data, which we monitor as our predicate to the `while` loop. ## How was this patch tested? I've ran the `KafkaSourceSuite` to make sure regression passes. Since the exception isn't visible from user code, there is no way (at least that I could think of) to add this as a test to the existing suite. Author: Yuval ItzchakovCloses #19059 from YuvalItzchakov/master. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8f0df6bc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8f0df6bc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8f0df6bc Branch: refs/heads/master Commit: 8f0df6bc1092c0c75b41e91e4ffc41a5525c8274 Parents: d4895c9 Author: Yuval Itzchakov Authored: Wed Aug 30 10:33:23 2017 +0100 Committer: Sean Owen Committed: Wed Aug 30 10:33:23 2017 +0100 -- .../spark/sql/kafka010/CachedKafkaConsumer.scala | 19 +++ 1 file changed, 15 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8f0df6bc/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala index 7c4f38e..90ed7b1 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala @@ -112,9 +112,15 @@ private[kafka010] case class CachedKafkaConsumer private( // we will move to the next available offset within `[offset, untilOffset)` and retry. // If `failOnDataLoss` is `true`, the loop body will be executed only once. var toFetchOffset = offset -while (toFetchOffset != UNKNOWN_OFFSET) { +var consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]] = null +// We want to break out of the while loop on a successful fetch to avoid using "return" +// which may causes a NonLocalReturnControl exception when this method is used as a function. +var isFetchComplete = false + +while (toFetchOffset != UNKNOWN_OFFSET && !isFetchComplete) { try { -return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss) +consumerRecord = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss) +isFetchComplete = true } catch { case e: OffsetOutOfRangeException => // When there is some error thrown, it's better to use a new consumer to drop all cached @@ -125,8 +131,13 @@ private[kafka010] case class CachedKafkaConsumer private( toFetchOffset = getEarliestAvailableOffsetBetween(toFetchOffset, untilOffset) } } -resetFetchedData() -null + +if (isFetchComplete) { + consumerRecord +} else { + resetFetchedData() + null +} } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR][TEST] Off -heap memory leaks for unit tests
Repository: spark Updated Branches: refs/heads/master e47f48c73 -> d4895c9de [MINOR][TEST] Off -heap memory leaks for unit tests ## What changes were proposed in this pull request? Free off -heap memory . I have checked all the unit tests. ## How was this patch tested? N/A Author: liuxianCloses #19075 from 10110346/memleak. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d4895c9d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d4895c9d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d4895c9d Branch: refs/heads/master Commit: d4895c9de6ca9c3ac4461cf6f86cd88eb63e0720 Parents: e47f48c Author: liuxian Authored: Wed Aug 30 10:16:11 2017 +0100 Committer: Sean Owen Committed: Wed Aug 30 10:16:11 2017 +0100 -- .../src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java| 1 + .../test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java | 1 + 2 files changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d4895c9d/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java -- diff --git a/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java b/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java index a77ba82..4ae49d8 100644 --- a/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java +++ b/common/unsafe/src/test/java/org/apache/spark/unsafe/PlatformUtilSuite.java @@ -73,5 +73,6 @@ public class PlatformUtilSuite { Assert.assertEquals( Platform.getByte(offheap.getBaseObject(), offheap.getBaseOffset()), MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE); +MemoryAllocator.UNSAFE.free(offheap); } } http://git-wip-us.apache.org/repos/asf/spark/blob/d4895c9d/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java -- diff --git a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java index f53bc0b..46b0516 100644 --- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java +++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java @@ -54,6 +54,7 @@ public class TaskMemoryManagerSuite { final long encodedAddress = manager.encodePageNumberAndOffset(dataPage, offset); Assert.assertEquals(null, manager.getPage(encodedAddress)); Assert.assertEquals(offset, manager.getOffsetInPage(encodedAddress)); +manager.freePage(dataPage, c); } @Test - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21254][WEBUI] History UI performance fixes
Repository: spark Updated Branches: refs/heads/branch-2.2 917fe6635 -> a6a994414 [SPARK-21254][WEBUI] History UI performance fixes ## This is a backport of PR #18783 to the latest released branch 2.2. ## What changes were proposed in this pull request? As described in JIRA ticket, History page is taking ~1min to load for cases when amount of jobs is 10k+. Most of the time is currently being spent on DOM manipulations and all additional costs implied by this (browser repaints and reflows). PR's goal is not to change any behavior but to optimize time of History UI rendering: 1. The most costly operation is setting `innerHTML` for `duration` column within a loop, which is [extremely unperformant](https://jsperf.com/jquery-append-vs-html-list-performance/24). [Refactoring ](https://github.com/criteo-forks/spark/commit/b7e56eef4d66af977bd05af58a81e14faf33c211) this helped to get page load time **down to 10-15s** 2. Second big gain bringing page load time **down to 4s** was [was achieved](https://github.com/criteo-forks/spark/commit/3630ca212baa94d60c5fe7e4109cf6da26288cec) by detaching table's DOM before parsing it with DataTables jQuery plugin. 3. Another chunk of improvements ([1]https://github.com/criteo-forks/spark/commit/ab520d156a7293a707aa6bc053a2f83b9ac2), [2](https://github.com/criteo-forks/spark/commit/e25be9a66b018ba0cc53884f242469b515cb2bf4), [3](https://github.com/criteo-forks/spark/commit/91697079a29138b7581e64f2aa79247fa1a4e4af)) was focused on removing unnecessary DOM manipulations that in total contributed ~250ms to page load time. ## How was this patch tested? Tested by existing Selenium tests in `org.apache.spark.deploy.history.HistoryServerSuite`. Changes were also tested on Criteo's spark-2.1 fork with 20k+ number of rows in the table, reducing load time to 4s. Author: Dmitry ParfenchikCloses #18860 from 2ooom/history-ui-perf-fix-2.2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6a99441 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6a99441 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6a99441 Branch: refs/heads/branch-2.2 Commit: a6a9944140bbb336146d0d868429cb01839375c7 Parents: 917fe66 Author: Dmitry Parfenchik Authored: Wed Aug 30 09:42:15 2017 +0100 Committer: Sean Owen Committed: Wed Aug 30 09:42:15 2017 +0100 -- .../spark/ui/static/historypage-template.html | 22 ++-- .../org/apache/spark/ui/static/historypage.js | 112 +-- 2 files changed, 71 insertions(+), 63 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a6a99441/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html b/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html index bfe31aa..20cd7bf 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html +++ b/core/src/main/resources/org/apache/spark/ui/static/historypage-template.html @@ -29,21 +29,25 @@ App Name - + {{#hasMultipleAttempts}} + Attempt ID + {{/hasMultipleAttempts}} Started - + {{#showCompletedColumn}} + Completed + {{/showCompletedColumn}} Duration @@ -68,13 +72,17 @@ {{#applications}} - {{id}} - {{name}} + {{id}} + {{name}} {{#attempts}} - {{attemptId}} + {{#hasMultipleAttempts}} + {{attemptId}} + {{/hasMultipleAttempts}} {{startTime}} - {{endTime}} - {{duration}} + {{#showCompletedColumn}} + {{endTime}} + {{/showCompletedColumn}} + {{duration}} {{sparkUser}} {{lastUpdated}} Download http://git-wip-us.apache.org/repos/asf/spark/blob/a6a99441/core/src/main/resources/org/apache/spark/ui/static/historypage.js -- diff --git a/core/src/main/resources/org/apache/spark/ui/static/historypage.js b/core/src/main/resources/org/apache/spark/ui/static/historypage.js index 5ec1ce1..3e2bba8 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/historypage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/historypage.js @@ -48,6 +48,18 @@ function getParameterByName(name, searchString) { return results === null ? "" : decodeURIComponent(results[1].replace(/\+/g, " ")); } +function removeColumnByName(columns, columnName) { +