[GitHub] spark issue #19211: [SPARK-18838][core] Add separate listener queues to Live...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19211 **[Test build #81693 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81693/testReport)** for PR 19211 at commit [`4a6858f`](https://github.com/apache/spark/commit/4a6858f57124a8b405e8ce414f61088a68dc27eb). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19180: [SPARK-21967][CORE] org.apache.spark.unsafe.types.UTF8St...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19180 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81692/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19180: [SPARK-21967][CORE] org.apache.spark.unsafe.types.UTF8St...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19180 **[Test build #81692 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81692/testReport)** for PR 19180 at commit [`70c9423`](https://github.com/apache/spark/commit/70c94230286cb8a1f7f86ab0e8d25998dd1586f9). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r138500447 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -26,20 +26,50 @@ import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData} import org.apache.spark.sql.types._ +/** + * `JackGenerator` can only be initialized with a `StructType` or a `MapType`. + * Once it is initialized with `StructType`, it can be used to write out a struct or an array of + * struct. Once it is initialized with `MapType`, it can be used to write out a map or an array + * of map. An exception will be thrown if trying to write out a struct if it is initialized with + * a `MapType`, and vice verse. + */ private[sql] class JacksonGenerator( -schema: StructType, +dataType: DataType, writer: Writer, options: JSONOptions) { // A `ValueWriter` is responsible for writing a field of an `InternalRow` to appropriate // JSON data. Here we are using `SpecializedGetters` rather than `InternalRow` so that // we can directly access data in `ArrayData` without the help of `SpecificMutableRow`. private type ValueWriter = (SpecializedGetters, Int) => Unit + // `JackGenerator` can only be initialized with a `StructType` or a `MapType`. + require(dataType.isInstanceOf[StructType] | dataType.isInstanceOf[MapType], --- End diff -- Hm.. I think we should use `||` for short circuiting next time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r138500520 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -617,6 +618,14 @@ case class JsonToStructs( {"time":"26/08/2015"} > SELECT _FUNC_(array(named_struct('a', 1, 'b', 2)); [{"a":1,"b":2}] + > SELECT _FUNC_(map('a',named_struct('b',1))); --- End diff -- little nit: `... ',n ...` -> `... ', n ...` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19212: [SPARK-21988] Add default stats to StreamingExecutionRel...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19212 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81695/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19212: [SPARK-21988] Add default stats to StreamingExecutionRel...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19212 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19212: [SPARK-21988] Add default stats to StreamingExecutionRel...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19212 **[Test build #81695 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81695/testReport)** for PR 19212 at commit [`80d0268`](https://github.com/apache/spark/commit/80d02681ae5290fefe991a7faef7273d79f5f1dd). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `case class StreamingExecutionRelation(` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18731: [SPARK-20990][SQL] Read all JSON documents in files when...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18731 Build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18731: [SPARK-20990][SQL] Read all JSON documents in files when...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18731 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81696/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18731: [SPARK-20990][SQL] Read all JSON documents in files when...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18731 **[Test build #81696 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81696/testReport)** for PR 18731 at commit [`a53404e`](https://github.com/apache/spark/commit/a53404e1d852978297af884970ed5f3fc30e6350). * This patch **fails Spark unit tests**. * This patch **does not merge cleanly**. * This patch adds the following public classes _(experimental)_: * `public class NettyMemoryMetrics implements MetricSet ` * ` class ByteArrayConstructor extends net.razorvine.pickle.objects.ByteArrayConstructor ` * `public class JavaFeatureHasherExample ` * `sealed trait LogisticRegressionTrainingSummary extends LogisticRegressionSummary ` * `sealed trait BinaryLogisticRegressionSummary extends LogisticRegressionSummary ` * `class ClusteringEvaluator @Since(\"2.3.0\") (@Since(\"2.3.0\") override val uid: String)` * ` case class ClusterStats(featureSum: Vector, squaredNormSum: Double, numOfPoints: Long)` * `class OneVsRest(Estimator, OneVsRestParams, HasParallelism, JavaMLReadable, JavaMLWritable):` * `class HasParallelism(Params):` * `case class InsertIntoDir(` * `case class InsertIntoDataSourceDirCommand(` * `case class DescribeColumnCommand(` * `case class InsertIntoHiveDirCommand(` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16422: [SPARK-17642] [SQL] support DESC EXTENDED/FORMATT...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/16422#discussion_r138498310 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala --- @@ -626,6 +624,74 @@ case class DescribeTableCommand( } } +/** + * A command to list the info for a column, including name, data type, column stats and comment. + * This function creates a [[DescribeColumnCommand]] logical plan. --- End diff -- This comment line seems not needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19132: [SPARK-21922] Fix duration always updating when task fai...
Github user caneGuy commented on the issue: https://github.com/apache/spark/pull/19132 @jerryshao @HyukjinKwon Jenkins done.Thanks too much. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19188: [SPARK-21973][SQL] Add an new option to filter queries i...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/19188 ok, I'll update in a day! Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18887: [SPARK-20642][core] Store FsHistoryProvider listing data...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18887 **[Test build #81697 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81697/testReport)** for PR 18887 at commit [`9020184`](https://github.com/apache/spark/commit/9020184bba90fc1c7394ae8ab91877efe0699914). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r138491445 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -422,208 +455,101 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } -applications.get(appId) match { - case Some(appInfo) => -try { - // If no attempt is specified, or there is no attemptId for attempts, return all attempts - appInfo.attempts.filter { attempt => -attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get - }.foreach { attempt => -val logPath = new Path(logDir, attempt.logPath) -zipFileToStream(logPath, attempt.logPath, zipStream) - } -} finally { - zipStream.close() +val app = try { + load(appId) +} catch { + case _: NoSuchElementException => +throw new SparkException(s"Logs for $appId not found.") +} + +try { + // If no attempt is specified, or there is no attemptId for attempts, return all attempts + attemptId +.map { id => app.attempts.filter(_.info.attemptId == Some(id)) } +.getOrElse(app.attempts) +.map(_.logPath) +.foreach { log => + zipFileToStream(new Path(logDir, log), log, zipStream) } - case None => throw new SparkException(s"Logs for $appId not found.") +} finally { + zipStream.close() } } /** - * Replay the log files in the list and merge the list of old applications with new ones + * Replay the given log file, saving the application in the listing db. */ protected def mergeApplicationListing(fileStatus: FileStatus): Unit = { -val newAttempts = try { - val eventsFilter: ReplayEventsFilter = { eventString => -eventString.startsWith(APPL_START_EVENT_PREFIX) || - eventString.startsWith(APPL_END_EVENT_PREFIX) || - eventString.startsWith(LOG_START_EVENT_PREFIX) - } - - val logPath = fileStatus.getPath() - val appCompleted = isApplicationCompleted(fileStatus) - - // Use loading time as lastUpdated since some filesystems don't update modifiedTime - // each time file is updated. However use modifiedTime for completed jobs so lastUpdated - // won't change whenever HistoryServer restarts and reloads the file. - val lastUpdated = if (appCompleted) fileStatus.getModificationTime else clock.getTimeMillis() --- End diff -- Actually I think ""log cleaner for inProgress files" might test this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r138491191 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/config.scala --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.util.concurrent.TimeUnit + +import scala.annotation.meta.getter + +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.util.kvstore.KVIndex + +private[spark] object config { + + /** Use this to annotate constructor params to be used as KVStore indices. */ + type KVIndexParam = KVIndex @getter + + val DEFAULT_LOG_DIR = "file:/tmp/spark-events" + + val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory") +.stringConf +.createWithDefault(DEFAULT_LOG_DIR) + + val MAX_LOG_AGE_S = ConfigBuilder("spark.history.fs.cleaner.maxAge") +.timeConf(TimeUnit.SECONDS) +.createWithDefaultString("7d") + + val LOCAL_STORE_DIR = ConfigBuilder("spark.history.store.path") --- End diff -- Correct, other parameters are the same. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r138491139 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -742,53 +698,146 @@ private[history] object FsHistoryProvider { private val APPL_END_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationEnd\"" private val LOG_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerLogStart\"" + + /** Current version of the data written to the listing database. */ + private val CURRENT_LISTING_VERSION = 1L } /** - * Application attempt information. - * - * @param logPath path to the log file, or, for a legacy log, its directory - * @param name application name - * @param appId application ID - * @param attemptId optional attempt ID - * @param startTime start time (from playback) - * @param endTime end time (from playback). -1 if the application is incomplete. - * @param lastUpdated the modification time of the log file when this entry was built by replaying - *the history. - * @param sparkUser user running the application - * @param completed flag to indicate whether or not the application has completed. - * @param fileSize the size of the log file the last time the file was scanned for changes + * A KVStoreSerializer that provides Scala types serialization too, and uses the same options as + * the API serializer. */ -private class FsApplicationAttemptInfo( +private class KVStoreScalaSerializer extends KVStoreSerializer { + + mapper.registerModule(DefaultScalaModule) + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL) + mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat) + +} + +private[history] case class KVStoreMetadata( + val version: Long, + val logDir: String) + +private[history] case class LogInfo( + @KVIndexParam val logPath: String, + val fileSize: Long) + +private[history] class AttemptInfoWrapper( +val info: v1.ApplicationAttemptInfo, val logPath: String, -val name: String, -val appId: String, -attemptId: Option[String], -startTime: Long, -endTime: Long, -lastUpdated: Long, -sparkUser: String, -completed: Boolean, -val fileSize: Long, -appSparkVersion: String) - extends ApplicationAttemptInfo( - attemptId, startTime, endTime, lastUpdated, sparkUser, completed, appSparkVersion) { - - /** extend the superclass string value with the extra attributes of this class */ - override def toString: String = { -s"FsApplicationAttemptInfo($name, $appId," + - s" ${super.toString}, source=$logPath, size=$fileSize" +val fileSize: Long) { + + def toAppAttemptInfo(): ApplicationAttemptInfo = { +ApplicationAttemptInfo(info.attemptId, info.startTime.getTime(), + info.endTime.getTime(), info.lastUpdated.getTime(), info.sparkUser, + info.completed, info.appSparkVersion) } + } -/** - * Application history information - * @param id application ID - * @param name application name - * @param attempts list of attempts, most recent first. - */ -private class FsApplicationHistoryInfo( -id: String, -override val name: String, -override val attempts: List[FsApplicationAttemptInfo]) - extends ApplicationHistoryInfo(id, name, attempts) +private[history] class ApplicationInfoWrapper( +val info: v1.ApplicationInfo, +val attempts: List[AttemptInfoWrapper]) { + + @JsonIgnore @KVIndexParam + def id: String = info.id + + @JsonIgnore @KVIndexParam("endTime") + def endTime(): Long = attempts.head.info.endTime.getTime() + + @JsonIgnore @KVIndexParam("oldestAttempt") + def oldestAttempt(): Long = attempts.map(_.info.lastUpdated.getTime()).min + + def toAppHistoryInfo(): ApplicationHistoryInfo = { +ApplicationHistoryInfo(info.id, info.name, attempts.map(_.toAppAttemptInfo())) + } + + def toApiInfo(): v1.ApplicationInfo = { +new v1.ApplicationInfo(info.id, info.name, info.coresGranted, info.maxCores, + info.coresPerExecutor, info.memoryPerExecutorMB, attempts.map(_.info)) + } + +} + +private[history] class AppListingListener(log: FileStatus, clock: Clock) extends SparkListener { + + private val app = new MutableApplicationInfo() + private val attempt = new MutableAttemptInfo(log.getPath().getName(), log.getLen()) + + override def onApplicationStart(event: SparkListenerApplicationStart): Unit = { +app.id = event.appId.orNull +app.name = event.appName + +attempt.attemptId = event.appAttemptId
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r138490678 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -742,53 +698,146 @@ private[history] object FsHistoryProvider { private val APPL_END_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationEnd\"" private val LOG_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerLogStart\"" + + /** Current version of the data written to the listing database. */ + private val CURRENT_LISTING_VERSION = 1L --- End diff -- I'll add a more verbose comment; but this is basically me punting proper versioning to the next Spark release after this code is added. This version number is a "nuclear option"; if we break the data that is serialized to disk, we increase this number, and the new SHS will delete all old data and re-generated it from event logs. I'm punting because there is no versioning issue in the first version; there's no existing data that the SHS might try to read. I plan to take a closer look at versioning after all the initial PRs go in, but leaving this here gives us a choice in case there's a more urgent need to break things. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Simple Python Vectorized UDF...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18659 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81691/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Simple Python Vectorized UDF...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18659 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Simple Python Vectorized UDF...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18659 **[Test build #81691 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81691/testReport)** for PR 18659 at commit [`4a2fec2`](https://github.com/apache/spark/commit/4a2fec2aca1998ad04862eeac60d54b088b99de5). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r138490176 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -422,208 +455,101 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } -applications.get(appId) match { - case Some(appInfo) => -try { - // If no attempt is specified, or there is no attemptId for attempts, return all attempts - appInfo.attempts.filter { attempt => -attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get - }.foreach { attempt => -val logPath = new Path(logDir, attempt.logPath) -zipFileToStream(logPath, attempt.logPath, zipStream) - } -} finally { - zipStream.close() +val app = try { + load(appId) +} catch { + case _: NoSuchElementException => +throw new SparkException(s"Logs for $appId not found.") +} + +try { + // If no attempt is specified, or there is no attemptId for attempts, return all attempts + attemptId +.map { id => app.attempts.filter(_.info.attemptId == Some(id)) } +.getOrElse(app.attempts) +.map(_.logPath) +.foreach { log => + zipFileToStream(new Path(logDir, log), log, zipStream) } - case None => throw new SparkException(s"Logs for $appId not found.") +} finally { + zipStream.close() } } /** - * Replay the log files in the list and merge the list of old applications with new ones + * Replay the given log file, saving the application in the listing db. */ protected def mergeApplicationListing(fileStatus: FileStatus): Unit = { -val newAttempts = try { - val eventsFilter: ReplayEventsFilter = { eventString => -eventString.startsWith(APPL_START_EVENT_PREFIX) || - eventString.startsWith(APPL_END_EVENT_PREFIX) || - eventString.startsWith(LOG_START_EVENT_PREFIX) - } - - val logPath = fileStatus.getPath() - val appCompleted = isApplicationCompleted(fileStatus) - - // Use loading time as lastUpdated since some filesystems don't update modifiedTime - // each time file is updated. However use modifiedTime for completed jobs so lastUpdated - // won't change whenever HistoryServer restarts and reloads the file. - val lastUpdated = if (appCompleted) fileStatus.getModificationTime else clock.getTimeMillis() --- End diff -- Yes, the behavior is the same. It's now handled in `AppListingListener` (`onApplicationStart` and `onApplicationEnd` callbacks). I don't see an explicit test for this behavior. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r138489215 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -301,9 +334,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } override def stop(): Unit = { -if (initThread != null && initThread.isAlive()) { - initThread.interrupt() - initThread.join() +try { + if (initThread != null && initThread.isAlive()) { +initThread.interrupt() +initThread.join() + } +} finally { + listing.close() --- End diff -- It might leave JNI handles open (a.k.a. a memory leak). Also, this doesn't apply to this change, but later when the UI info is also written to disk, it could prevent the UI db from being replaced with an updated one, since its files would be opened. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r138488740 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -117,17 +122,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // used for logging msgs (logs are re-scanned based on file size, rather than modtime) private val lastScanTime = new java.util.concurrent.atomic.AtomicLong(-1) - // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted - // into the map in order, so the LinkedHashMap maintains the correct ordering. - @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo] -= new mutable.LinkedHashMap() + private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0) - val fileToAppInfo = new ConcurrentHashMap[Path, FsApplicationAttemptInfo]() + private val storePath = conf.get(LOCAL_STORE_DIR) - // List of application logs to be deleted by event log cleaner. - private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] + private val listing: KVStore = storePath.map { path => +val dbPath = new File(path, "listing.ldb") - private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0) +def openDB(): LevelDB = new LevelDB(dbPath, new KVStoreScalaSerializer()) + +try { + val db = openDB() + val meta = db.getMetadata(classOf[KVStoreMetadata]) + + if (meta == null) { +db.setMetadata(new KVStoreMetadata(CURRENT_LISTING_VERSION, logDir)) +db + } else if (meta.version != CURRENT_LISTING_VERSION || !logDir.equals(meta.logDir)) { +logInfo("Detected mismatched config in existing DB, deleting...") +db.close() +Utils.deleteRecursively(dbPath) --- End diff -- Yes; the code will re-create the data from the event logs when that happens. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18731: [SPARK-20990][SQL] Read all JSON documents in files when...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18731 **[Test build #81696 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81696/testReport)** for PR 18731 at commit [`a53404e`](https://github.com/apache/spark/commit/a53404e1d852978297af884970ed5f3fc30e6350). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18266: [SPARK-20427][SQL] Read JDBC table use custom sch...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18266#discussion_r138473560 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala --- @@ -80,7 +80,7 @@ object JDBCRDD extends Logging { * @return A Catalyst schema corresponding to columns in the given order. */ private def pruneSchema(schema: StructType, columns: Array[String]): StructType = { -val fieldMap = Map(schema.fields.map(x => x.metadata.getString("name") -> x): _*) +val fieldMap = Map(schema.fields.map(x => x.name -> x): _*) --- End diff -- I see. Could we just get rid of the line where we put `name` in the metadata? https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L304 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18266: [SPARK-20427][SQL] Read JDBC table use custom sch...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18266#discussion_r138475068 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala --- @@ -111,7 +111,16 @@ private[sql] case class JDBCRelation( override val needConversion: Boolean = false - override val schema: StructType = JDBCRDD.resolveTable(jdbcOptions) + override val schema: StructType = { +val schema = JDBCRDD.resolveTable(jdbcOptions) +val customSchema = jdbcOptions.customSchema +if (customSchema.isDefined) { + JdbcUtils.parseUserSpecifiedColumnTypes(schema, customSchema.get, +sqlContext.sessionState.conf.resolver) +} else { + schema +} --- End diff -- ```Scala val tableSchema = JDBCRDD.resolveTable(jdbcOptions) jdbcOptions.customSchema match { case Some(customSchema) => JdbcUtils.parseUserSpecifiedColumnTypes( tableSchema, customSchema, sparkSession.sessionState.conf.resolver) case None => tableSchema } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18266: [SPARK-20427][SQL] Read JDBC table use custom sch...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18266#discussion_r138476182 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala --- @@ -768,6 +769,35 @@ object JdbcUtils extends Logging { } /** + * Parses the user specified customSchema option value to DataFrame schema, + * and returns it if it's all columns are equals to default schema's. + */ + def parseUserSpecifiedColumnTypes( + schema: StructType, + columnTypes: String, + nameEquality: Resolver): StructType = { +val userSchema = CatalystSqlParser.parseTableSchema(columnTypes) + +SchemaUtils.checkColumnNameDuplication( + userSchema.map(_.name), "in the customSchema option value", nameEquality) + +if (userSchema.size != schema.size) { + throw new AnalysisException("Please provide all the columns, " + + s"all columns are: ${schema.fields.map(_.name).mkString(",")}") +} + +// This is resolved by names, only check the column names. +userSchema.fieldNames.foreach { col => + schema.find(f => nameEquality(f.name, col)).getOrElse { +throw new AnalysisException( + s"${JDBCOptions.JDBC_CUSTOM_DATAFRAME_COLUMN_TYPES} option column $col not found in " + +s"schema ${schema.catalogString}") --- End diff -- ```Scala val colNames = tableSchema.fieldNames.mkString(",") throw new AnalysisException(s"Please provide all the columns, all columns are: $colNames") ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18266: [SPARK-20427][SQL] Read JDBC table use custom sch...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18266#discussion_r138464925 --- Diff: examples/src/main/python/sql/datasource.py --- @@ -177,6 +177,16 @@ def jdbc_dataset_example(spark): .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) +# Specifying dataframe column data types on read +jdbcDF3 = spark.read \ +.format("jdbc") \ +.option("url", "jdbc:postgresql:dbserver") \ +.option("dbtable", "schema.tablename") \ +.option("user", "username") \ +.option("password", "password") \ +.option("customDataFrameColumnTypes", "id DECIMAL(38, 0), name STRING") \ --- End diff -- Please rename this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18266: [SPARK-20427][SQL] Read JDBC table use custom sch...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18266#discussion_r138475397 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala --- @@ -768,6 +769,35 @@ object JdbcUtils extends Logging { } /** + * Parses the user specified customSchema option value to DataFrame schema, + * and returns it if it's all columns are equals to default schema's. + */ + def parseUserSpecifiedColumnTypes( + schema: StructType, + columnTypes: String, + nameEquality: Resolver): StructType = { --- End diff -- ```Scala def getCustomSchema( tableSchema: StructType, customSchema: String, nameEquality: Resolver): StructType = { ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19212: [SPARK-21988] Add default stats to StreamingExecutionRel...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19212 **[Test build #81695 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81695/testReport)** for PR 19212 at commit [`80d0268`](https://github.com/apache/spark/commit/80d02681ae5290fefe991a7faef7273d79f5f1dd). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19212: [SPARK-21988] Add default stats to StreamingExecu...
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/19212 [SPARK-21988] Add default stats to StreamingExecutionRelation. ## What changes were proposed in this pull request? Add default stats to StreamingExecutionRelation. ## How was this patch tested? existing unit tests and an explain() test to be sure You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark SPARK-21988 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19212.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19212 commit 80d02681ae5290fefe991a7faef7273d79f5f1dd Author: Jose TorresDate: 2017-09-12T21:44:51Z Add default stats to StreamingExecutionRelation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user sitalkedia commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138473231 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocaloneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final
[GitHub] spark issue #18317: [SPARK-21113][CORE] Read ahead input stream to amortize ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18317 **[Test build #81694 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81694/testReport)** for PR 18317 at commit [`f30117e`](https://github.com/apache/spark/commit/f30117eaf5b8274cc19832c6a36acaf44adc7915). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19211: [SPARK-18838][core] Add separate listener queues to Live...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19211 **[Test build #81693 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81693/testReport)** for PR 19211 at commit [`4a6858f`](https://github.com/apache/spark/commit/4a6858f57124a8b405e8ce414f61088a68dc27eb). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19211: [SPARK-18838][core] Add separate listener queues ...
GitHub user vanzin opened a pull request: https://github.com/apache/spark/pull/19211 [SPARK-18838][core] Add separate listener queues to LiveListenerBus. This change modifies the live listener bus so that all listeners are added to queues; each queue has its own thread to dispatch events, making it possible to separate slow listeners from other more performance-sensitive ones. The public API has not changed - all listeners added with the existing "addListener" method, which after this change mostly means all user-defined listeners, end up in a default queue. Internally, there's an API allowing listeners to be added to specific queues, and that API is used to separate the internal Spark listeners into 3 categories: application status listeners (e.g. UI), executor management (e.g. dynamic allocation), and the event log. The queueing logic, while abstracted away in a separate class, is kept as much as possible hidden away from consumers. Aside from choosing their queue, there's no code change needed to take advantage of queues. Metrics were also simplified a little bit; the live bus now keeps track of metrics per queue instead of individual listeners. This is mostly to make integration with the existing metrics code in `ListenerBus` easier, without having to refactor the code; that can be done later if queue-level metrics are not enough. Test coverage relies on existing tests; a few tests had to be tweaked because they relied on `LiveListenerBus.postToAll` being synchronous, and the change makes that method asynchronous. Other tests were simplified not to use the asynchronous LiveListenerBus. You can merge this pull request into a Git repository by running: $ git pull https://github.com/vanzin/spark SPARK-18838 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19211.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19211 commit 4a6858f57124a8b405e8ce414f61088a68dc27eb Author: Marcelo VanzinDate: 2017-09-06T00:07:01Z [SPARK-18838][core] Add separate listener queues to LiveListenerBus. This change modifies the live listener bus so that all listeners are added to queues; each queue has its own thread to dispatch events, making it possible to separate slow listeners from other more performance-sensitive ones. The public API has not changed - all listeners added with the existing "addListener" method, which after this change mostly means all user-defined listeners, end up in a default queue. Internally, there's an API allowing listeners to be added to specific queues, and that API is used to separate the internal Spark listeners into 3 categories: application status listeners (e.g. UI), executor management (e.g. dynamic allocation), and the event log. The queueing logic, while abstracted away in a separate class, is kept as much as possible hidden away from consumers. Aside from choosing their queue, there's no code change needed to take advantage of queues. Metrics were also simplified a little bit; the live bus now keeps track of metrics per queue instead of individual listeners. This is mostly to make integration with the existing metrics code in `ListenerBus` easier, without having to refactor the code; that can be done later if queue-level metrics are not enough. Test coverage relies on existing tests; a few tests had to be tweaked because they relied on `LiveListenerBus.postToAll` being synchronous, and the change makes that method asynchronous. Other tests were simplified not to use the asynchronous LiveListenerBus. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19180: [SPARK-21967][CORE] org.apache.spark.unsafe.types.UTF8St...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19180 **[Test build #81692 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81692/testReport)** for PR 19180 at commit [`70c9423`](https://github.com/apache/spark/commit/70c94230286cb8a1f7f86ab0e8d25998dd1586f9). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19180: [SPARK-21967][CORE] org.apache.spark.unsafe.types.UTF8St...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/19180 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19180: [SPARK-21967][CORE] org.apache.spark.unsafe.types.UTF8St...
Github user original-brownbear commented on the issue: https://github.com/apache/spark/pull/19180 @srowen thanks, all comments addressed I think :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19180: [SPARK-21967][CORE] org.apache.spark.unsafe.types...
Github user original-brownbear commented on a diff in the pull request: https://github.com/apache/spark/pull/19180#discussion_r138466080 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -1097,8 +1101,21 @@ public UTF8String copy() { @Override public int compareTo(@Nonnull final UTF8String other) { int len = Math.min(numBytes, other.numBytes); -// TODO: compare 8 bytes as unsigned long -for (int i = 0; i < len; i ++) { +int words = len / Longs.BYTES; +long roffset = other.getBaseOffset(); +Object rbase = other.getBaseObject(); +for (int i = 0; i < words * Longs.BYTES; i += Longs.BYTES) { + long left = getLong(base, offset + i); + long right = getLong(rbase, roffset + i); + if (left != right) { +if (!IS_LITTLE_ENDIAN) { --- End diff -- @vanzin it's a runtime constant but after the C2 pass it is completely removed by the compiler (just tried this out with Jitwatch). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19180: [SPARK-21967][CORE] org.apache.spark.unsafe.types...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19180#discussion_r138460636 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -1097,8 +1101,21 @@ public UTF8String copy() { @Override public int compareTo(@Nonnull final UTF8String other) { int len = Math.min(numBytes, other.numBytes); -// TODO: compare 8 bytes as unsigned long -for (int i = 0; i < len; i ++) { +int words = len / Longs.BYTES; +long roffset = other.getBaseOffset(); +Object rbase = other.getBaseObject(); +for (int i = 0; i < words * Longs.BYTES; i += Longs.BYTES) { + long left = getLong(base, offset + i); + long right = getLong(rbase, roffset + i); + if (left != right) { +if (!IS_LITTLE_ENDIAN) { --- End diff -- Constants ("static final") are actually inlined by javac, not the JIT. Try it out. ``` class A { static final int CONSTANT = 1; } class B { B() { System.out.println(A.CONSTANT); } } ``` ``` $ javap -c B Compiled from "foo.java" class B { B(); Code: 0: aload_0 1: invokespecial #1 // Method java/lang/Object."":()V 4: getstatic #2 // Field java/lang/System.out:Ljava/io/PrintStream; 7: iconst_1 8: invokevirtual #4 // Method java/io/PrintStream.println:(I)V 11: return } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19180: [SPARK-21967][CORE] org.apache.spark.unsafe.types...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/19180#discussion_r138454402 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -1097,8 +1101,21 @@ public UTF8String copy() { @Override public int compareTo(@Nonnull final UTF8String other) { int len = Math.min(numBytes, other.numBytes); -// TODO: compare 8 bytes as unsigned long -for (int i = 0; i < len; i ++) { +int words = len / Longs.BYTES; +long roffset = other.getBaseOffset(); +Object rbase = other.getBaseObject(); +for (int i = 0; i < words * Longs.BYTES; i += Longs.BYTES) { --- End diff -- I don't object to `Longs.BYTES` so much, but it's not used elsewhere. I think just writing "8" is clear and consistent. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19180: [SPARK-21967][CORE] org.apache.spark.unsafe.types...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/19180#discussion_r138453195 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -1097,8 +1101,21 @@ public UTF8String copy() { @Override public int compareTo(@Nonnull final UTF8String other) { int len = Math.min(numBytes, other.numBytes); -// TODO: compare 8 bytes as unsigned long -for (int i = 0; i < len; i ++) { +int words = len / Longs.BYTES; +long roffset = other.getBaseOffset(); +Object rbase = other.getBaseObject(); +for (int i = 0; i < words * Longs.BYTES; i += Longs.BYTES) { + long left = getLong(base, offset + i); + long right = getLong(rbase, roffset + i); + if (left != right) { +if (!IS_LITTLE_ENDIAN) { --- End diff -- (Nit: flip the condition to avoid the negation) Is the point that if JIT thinks the value can't change then either way it can snip out the branch that will never happen? then yes checking the condition in or outside the loop doesn't matter. If that not always true, then I'm pretty clear that checking the condition inside this hot loop is slower. My instinct would be to check it outside the loop just in case. However, I also note that it only matters once in the execution of the method, because as soon as `left != right`, the loop terminates. I think it's OK this way, therefore. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19180: [SPARK-21967][CORE] org.apache.spark.unsafe.types...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/19180#discussion_r138453256 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -1097,8 +1101,21 @@ public UTF8String copy() { @Override public int compareTo(@Nonnull final UTF8String other) { int len = Math.min(numBytes, other.numBytes); -// TODO: compare 8 bytes as unsigned long -for (int i = 0; i < len; i ++) { +int words = len / Longs.BYTES; +long roffset = other.getBaseOffset(); +Object rbase = other.getBaseObject(); +for (int i = 0; i < words * Longs.BYTES; i += Longs.BYTES) { + long left = getLong(base, offset + i); + long right = getLong(rbase, roffset + i); + if (left != right) { +if (!IS_LITTLE_ENDIAN) { + return UnsignedLongs.compare(left, right); +} else { + return UnsignedLongs.compare(Long.reverseBytes(left), Long.reverseBytes(right)); +} + } +} +for (int i = words * Longs.BYTES; i < len; i ++) { --- End diff -- Nit: remove the space before `++`. I know it was there before. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19180: [SPARK-21967][CORE] org.apache.spark.unsafe.types...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/19180#discussion_r138454686 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java --- @@ -1097,8 +1101,21 @@ public UTF8String copy() { @Override public int compareTo(@Nonnull final UTF8String other) { int len = Math.min(numBytes, other.numBytes); -// TODO: compare 8 bytes as unsigned long -for (int i = 0; i < len; i ++) { +int words = len / Longs.BYTES; --- End diff -- While we're optimizing -- you only ever use the value of this as multiplied by 8. You could just define something like `wordMax` as `(len / 8) * 8` and avoid the multiplication below. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Simple Python Vectorized UDF...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18659 **[Test build #81691 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81691/testReport)** for PR 18659 at commit [`4a2fec2`](https://github.com/apache/spark/commit/4a2fec2aca1998ad04862eeac60d54b088b99de5). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19106: [SPARK-21770][ML] ProbabilisticClassificationModel fix c...
Github user srowen commented on the issue: https://github.com/apache/spark/pull/19106 That's my sentiment too, that it is likely patching over another problem. I'd go with Joseph on this, but at least, issue a warning maybe? I also would support throwing an exception to fail clearly and early in this case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19201: [SPARK-21979][SQL]Improve QueryPlanConstraints fr...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19201 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19201: [SPARK-21979][SQL]Improve QueryPlanConstraints framework
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19201 Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19201: [SPARK-21979][SQL]Improve QueryPlanConstraints framework
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19201 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81689/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19201: [SPARK-21979][SQL]Improve QueryPlanConstraints framework
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19201 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19201: [SPARK-21979][SQL]Improve QueryPlanConstraints framework
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19201 **[Test build #81689 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81689/testReport)** for PR 19201 at commit [`d456876`](https://github.com/apache/spark/commit/d45687653a35431e88d5fbe1ccbb5684fa2794cc). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Simple Python Vectorized UDF...
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/18659 Thanks @cloud-fan. @ueshin , it would be great to incorporate the tests you have, let me know what is easiest for you. I'll push an update to change to `@pandas_udf` so hopefully the tests would run off the bat, but if not I can help sort it out. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Simple Python Vectori...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/18659#discussion_r138447017 --- Diff: python/pyspark/sql/functions.py --- @@ -2112,7 +2113,7 @@ def wrapper(*args): @since(1.3) -def udf(f=None, returnType=StringType()): +def udf(f=None, returnType=StringType(), vectorized=False): --- End diff -- It seems like the consensus is for `pandas_udf` and I'm fine with that too. I'll make that change and the others brought up here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19202: [SPARK-21980][SQL]References in grouping functions shoul...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19202 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81688/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19202: [SPARK-21980][SQL]References in grouping functions shoul...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19202 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19202: [SPARK-21980][SQL]References in grouping functions shoul...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19202 **[Test build #81688 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81688/testReport)** for PR 19202 at commit [`b08fd93`](https://github.com/apache/spark/commit/b08fd9301cdbd4c1a29d5eb322eacd1cf2ffc546). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19210: Fix Graphite re-connects for Graphite instances behind E...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19210 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19210: Fix Graphite re-connects for Graphite instances b...
GitHub user alexmnyc opened a pull request: https://github.com/apache/spark/pull/19210 Fix Graphite re-connects for Graphite instances behind ELB's or auto-scaled load balancers …resolve hosts behind a CNAME with re-tried DNS lookups Upgrade codahale metrics library so that Graphite constructor can re-resolve hosts behind a CNAME with re-tried DNS lookups. When Graphite is deployed behind an ELB, ELB may change IP addresses based on auto-scaling needs. Using current approach yields Graphite usage impossible, fixing for that use case ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/alexmnyc/spark patch-1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19210.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19210 commit 4693801be78418f1939e07a0d8dc482e3e1efda2 Author: alexmnycDate: 2017-09-12T18:49:40Z Upgrade codahale metrics library so that Graphite constructor can re-resolve hosts behind a CNAME with re-tried DNS lookups Upgrade codahale metrics library so that Graphite constructor can re-resolve hosts behind a CNAME with re-tried DNS lookups. When Graphite is deployed behind an ELB, ELB may change IP addresses based on auto-scaling needs. Using current approach yields Graphite usage impossible, fixing for that use case commit 5e2daadaa65d4e5f1988937fed050ee7905a42ea Author: alexmnyc Date: 2017-09-12T18:52:41Z Force coda-hale Graphite re-try DNS resolution for Graphite instances behind auto-scaled load balancers This fix is introduced in codahale 3.1.5 - https://github.com/dropwizard/metrics/compare/v3.1.2...v3.1.5#diff-6916c85d2dd08d89fe771c952e3b8512R120 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19197: [SPARK-18608][ML] Fix double caching
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19197 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138430322 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocaloneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final
[GitHub] spark issue #19197: [SPARK-18608][ML] Fix double caching
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/19197 having trouble merging...will try again soon --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19197: [SPARK-18608][ML] Fix double caching
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/19197 LGTM Merging with master and branch-2.2 Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19209: Branch 2.2 udf nullability
Github user ptkool closed the pull request at: https://github.com/apache/spark/pull/19209 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19209: Branch 2.2 udf nullability
GitHub user ptkool opened a pull request: https://github.com/apache/spark/pull/19209 Branch 2.2 udf nullability ## What changes were proposed in this pull request? When registering a Python UDF, a user may know whether the function can return null values or not. PythonUDF and all related classes should handle nullability. ## How was this patch tested? Existing tests and a few new tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Shopify/spark branch-2.2-udf_nullability Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19209.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19209 commit cfa6bcbe83b9a4b9607e23ac889963b6aa02f0d9 Author: Ryan BlueDate: 2017-05-01T21:48:02Z [SPARK-20540][CORE] Fix unstable executor requests. There are two problems fixed in this commit. First, the ExecutorAllocationManager sets a timeout to avoid requesting executors too often. However, the timeout is always updated based on its value and a timeout, not the current time. If the call is delayed by locking for more than the ongoing scheduler timeout, the manager will request more executors on every run. This seems to be the main cause of SPARK-20540. The second problem is that the total number of requested executors is not tracked by the CoarseGrainedSchedulerBackend. Instead, it calculates the value based on the current status of 3 variables: the number of known executors, the number of executors that have been killed, and the number of pending executors. But, the number of pending executors is never less than 0, even though there may be more known than requested. When executors are killed and not replaced, this can cause the request sent to YARN to be incorrect because there were too many executors due to the scheduler's state being slightly out of date. This is fixed by tracking the currently requested size explicitly. ## How was this patch tested? Existing tests. Author: Ryan Blue Closes #17813 from rdblue/SPARK-20540-fix-dynamic-allocation. (cherry picked from commit 2b2dd08e975dd7fbf261436aa877f1d7497ed31f) Signed-off-by: Marcelo Vanzin commit 5a0a8b0396df2feadb8333876cc08edf219fa177 Author: Sean Owen Date: 2017-05-02T00:01:05Z [SPARK-20459][SQL] JdbcUtils throws IllegalStateException: Cause already initialized after getting SQLException ## What changes were proposed in this pull request? Avoid failing to initCause on JDBC exception with cause initialized to null ## How was this patch tested? Existing tests Author: Sean Owen Closes #17800 from srowen/SPARK-20459. (cherry picked from commit af726cd6117de05c6e3b9616b8699d884a53651b) Signed-off-by: Xiao Li commit b7c1c2f973635a2ec05aedd89456765d830dfdce Author: Felix Cheung Date: 2017-05-02T04:03:48Z [SPARK-20192][SPARKR][DOC] SparkR migration guide to 2.2.0 ## What changes were proposed in this pull request? Updating R Programming Guide ## How was this patch tested? manually Author: Felix Cheung Closes #17816 from felixcheung/r22relnote. (cherry picked from commit d20a976e8918ca8d607af452301e8014fe14e64a) Signed-off-by: Felix Cheung commit b146481fff1ce529245f9c03b35c73ea604712d0 Author: Kazuaki Ishizaki Date: 2017-05-02T05:56:41Z [SPARK-20537][CORE] Fixing OffHeapColumnVector reallocation ## What changes were proposed in this pull request? As #17773 revealed `OnHeapColumnVector` may copy a part of the original storage. `OffHeapColumnVector` reallocation also copies to the new storage data up to 'elementsAppended'. This variable is only updated when using the `ColumnVector.appendX` API, while `ColumnVector.putX` is more commonly used. This PR copies the new storage data up to the previously-allocated size in`OffHeapColumnVector`. ## How was this patch tested? Existing test suites Author: Kazuaki Ishizaki Closes #17811 from kiszk/SPARK-20537. (cherry picked from commit afb21bf22a59c9416c04637412fb69d1442e6826) Signed-off-by: Wenchen Fan commit ef5e2a0509801f6afced3bc80f8d700acf84e0dd Author: Burak Yavuz Date: 2017-05-02T06:08:16Z [SPARK-20549] java.io.CharConversionException: Invalid UTF-32' in JsonToStructs ## What changes were
[GitHub] spark issue #18875: [SPARK-21513][SQL] Allow UDF to_json support converting ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18875 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18875: [SPARK-21513][SQL] Allow UDF to_json support converting ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18875 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81684/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18875: [SPARK-21513][SQL] Allow UDF to_json support converting ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18875 **[Test build #81684 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81684/testReport)** for PR 18875 at commit [`bddf283`](https://github.com/apache/spark/commit/bddf2838868b2b676ae9eb3c595b53f56de07468). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18592: [SPARK-21368][SQL] TPCDSQueryBenchmark can't refe...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/18592 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18592: [SPARK-21368][SQL] TPCDSQueryBenchmark can't refer query...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/18592 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19188: [SPARK-21973][SQL] Add an new option to filter queries i...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19188 This has been merged. @maropu Could you update this PR? Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18592: [SPARK-21368][SQL] TPCDSQueryBenchmark can't refer query...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/18592 Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18592: [SPARK-21368][SQL] TPCDSQueryBenchmark can't refer query...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/18592 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18592: [SPARK-21368][SQL] TPCDSQueryBenchmark can't refer query...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18592 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81682/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18592: [SPARK-21368][SQL] TPCDSQueryBenchmark can't refer query...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18592 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18592: [SPARK-21368][SQL] TPCDSQueryBenchmark can't refer query...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18592 **[Test build #81682 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81682/testReport)** for PR 18592 at commit [`d2d22d4`](https://github.com/apache/spark/commit/d2d22d4502b8d1bc3ff6c0af207a2b64bc1bb5f6). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19106: [SPARK-21770][ML] ProbabilisticClassificationModel fix c...
Github user sethah commented on the issue: https://github.com/apache/spark/pull/19106 Ok, I guess I'm surprised that someone even noticed this... So, basically, we are changing the behavior of a private function for a specific case which is actually impossible to ever run into. I don't see the need. I know @jkbradley mentioned that it might happen for linear models, but this method is only used for RandomForest and DecisionTree. @srowen mentioned that finding the root cause would be better, and seems to me the root cause is: there is no root cause. Anyway, if we are analyzing the behavior of this method without considering the context from which it's called, then we need to test for not only all zero probabilities but negative ones as well. Otherwise, we're shoring up one edge case without attending to others. If it is not currently possible to call this method with all zero counts, and we don't want it to be possible ever in the future, why don't we just throw an error? I'm also fine with not changing this at all. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19196 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81690/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19196 **[Test build #81690 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81690/testReport)** for PR 19196 at commit [`c5b7f23`](https://github.com/apache/spark/commit/c5b7f230ebbdabc373d8df1478993e08d420c1f3). * This patch **fails to generate documentation**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19196 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19106: [SPARK-21770][ML] ProbabilisticClassificationModel fix c...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19106 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19106: [SPARK-21770][ML] ProbabilisticClassificationModel fix c...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19106 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81687/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19106: [SPARK-21770][ML] ProbabilisticClassificationModel fix c...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19106 **[Test build #81687 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81687/testReport)** for PR 19106 at commit [`53891ed`](https://github.com/apache/spark/commit/53891ed5c16daebce40e37cc9109b71299a33aca). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19196 **[Test build #81690 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81690/testReport)** for PR 19196 at commit [`c5b7f23`](https://github.com/apache/spark/commit/c5b7f230ebbdabc373d8df1478993e08d420c1f3). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19196#discussion_r138414658 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala --- @@ -53,6 +53,8 @@ case class StatefulOperatorStateInfo( trait StatefulOperator extends SparkPlan { def stateInfo: Option[StatefulOperatorStateInfo] + def keyExpressions: Seq[Attribute] --- End diff -- we don't need to expose this if we don't want to --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19196#discussion_r138414592 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala --- @@ -117,8 +119,33 @@ class IncrementalExecution( } } - override def preparations: Seq[Rule[SparkPlan]] = state +: super.preparations + override def preparations: Seq[Rule[SparkPlan]] = Seq( +state, +EnsureStatefulOpPartitioning) ++ super.preparations /** No need assert supported, as this check has already been done */ override def assertSupported(): Unit = { } } + +object EnsureStatefulOpPartitioning extends Rule[SparkPlan] { + // Needs to be transformUp to avoid extra shuffles + override def apply(plan: SparkPlan): SparkPlan = plan transformUp { +case ss: StatefulOperator => + val numPartitions = plan.sqlContext.sessionState.conf.numShufflePartitions + val keys = ss.keyExpressions --- End diff -- Another option is to not expose `keyExpressions` in `StatefulOperator` but use the `requiredChildDistribution` field to get the required key expression and partitioning --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19180: [SPARK-21967][CORE] org.apache.spark.unsafe.types.UTF8St...
Github user original-brownbear commented on the issue: https://github.com/apache/spark/pull/19180 @srowen got a sec to take a look at this one? :) (should be a quick one) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19201: [SPARK-21979][SQL]Improve QueryPlanConstraints framework
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19201 **[Test build #81689 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81689/testReport)** for PR 19201 at commit [`d456876`](https://github.com/apache/spark/commit/d45687653a35431e88d5fbe1ccbb5684fa2794cc). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19208: [SPARK-21087] [ML] CrossValidator, TrainValidationSplit ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19208 **[Test build #81686 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81686/testReport)** for PR 19208 at commit [`ae13440`](https://github.com/apache/spark/commit/ae13440fd2220e28b58df52836f55fe5ed77c43f). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19208: [SPARK-21087] [ML] CrossValidator, TrainValidationSplit ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19208 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81686/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19208: [SPARK-21087] [ML] CrossValidator, TrainValidationSplit ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19208 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19202: [SPARK-21980][SQL]References in grouping functions shoul...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19202 **[Test build #81688 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81688/testReport)** for PR 19202 at commit [`b08fd93`](https://github.com/apache/spark/commit/b08fd9301cdbd4c1a29d5eb322eacd1cf2ffc546). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18281: [SPARK-21027][ML][PYTHON] Added tunable paralleli...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/18281 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/18704#discussion_r138409856 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java --- @@ -147,6 +147,11 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) { public abstract void putShorts(int rowId, int count, short[] src, int srcIndex); /** + * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count]) --- End diff -- @ueshin Line 145 may make a mistake in comment `Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)` It should be `Sets values from [src + srcIndex, src + srcIndex + count) to [rowId, rowId + count)` What do you think? If so, should we update them in this PR? Or, is it better to create another PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16422: [SPARK-17642] [SQL] support DESC EXTENDED/FORMATT...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/16422 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19201: [SPARK-21979][SQL]Improve QueryPlanConstraints fr...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/19201#discussion_r138409695 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala --- @@ -106,91 +106,48 @@ trait QueryPlanConstraints { self: LogicalPlan => * Infers an additional set of constraints from a given set of equality constraints. * For e.g., if an operator has constraints of the form (`a = 5`, `a = b`), this returns an * additional constraint of the form `b = 5`. - * - * [SPARK-17733] We explicitly prevent producing recursive constraints of the form `a = f(a, b)` - * as they are often useless and can lead to a non-converging set of constraints. */ private def inferAdditionalConstraints(constraints: Set[Expression]): Set[Expression] = { -val constraintClasses = generateEquivalentConstraintClasses(constraints) - +val aliasedConstraints = eliminateAliasedExpressionInConstraints(constraints) var inferredConstraints = Set.empty[Expression] -constraints.foreach { +aliasedConstraints.foreach { case eq @ EqualTo(l: Attribute, r: Attribute) => -val candidateConstraints = constraints - eq -inferredConstraints ++= candidateConstraints.map(_ transform { - case a: Attribute if a.semanticEquals(l) && -!isRecursiveDeduction(r, constraintClasses) => r -}) -inferredConstraints ++= candidateConstraints.map(_ transform { - case a: Attribute if a.semanticEquals(r) && -!isRecursiveDeduction(l, constraintClasses) => l -}) +val candidateConstraints = aliasedConstraints - eq +inferredConstraints ++= replaceConstraints(candidateConstraints, l, r) +inferredConstraints ++= replaceConstraints(candidateConstraints, r, l) case _ => // No inference } inferredConstraints -- constraints } /** - * Generate a sequence of expression sets from constraints, where each set stores an equivalence - * class of expressions. For example, Set(`a = b`, `b = c`, `e = f`) will generate the following - * expression sets: (Set(a, b, c), Set(e, f)). This will be used to search all expressions equal - * to an selected attribute. + * Replace the aliased expression in [[Alias]] with the alias name if both exist in constraints. + * Thus non-converging inference can be prevented. + * E.g. `a = f(a, b)`, `a = f(b, c) && c = g(a, b)`. + * Also, the size of constraints is reduced without losing any information. + * When the inferred filters are pushed down the operators that generate the alias, + * the alias names used in filters are replaced by the aliased expressions. */ - private def generateEquivalentConstraintClasses( - constraints: Set[Expression]): Seq[Set[Expression]] = { -var constraintClasses = Seq.empty[Set[Expression]] -constraints.foreach { - case eq @ EqualTo(l: Attribute, r: Attribute) => -// Transform [[Alias]] to its child. -val left = aliasMap.getOrElse(l, l) -val right = aliasMap.getOrElse(r, r) -// Get the expression set for an equivalence constraint class. -val leftConstraintClass = getConstraintClass(left, constraintClasses) -val rightConstraintClass = getConstraintClass(right, constraintClasses) -if (leftConstraintClass.nonEmpty && rightConstraintClass.nonEmpty) { - // Combine the two sets. - constraintClasses = constraintClasses -.diff(leftConstraintClass :: rightConstraintClass :: Nil) :+ -(leftConstraintClass ++ rightConstraintClass) -} else if (leftConstraintClass.nonEmpty) { // && rightConstraintClass.isEmpty - // Update equivalence class of `left` expression. - constraintClasses = constraintClasses -.diff(leftConstraintClass :: Nil) :+ (leftConstraintClass + right) -} else if (rightConstraintClass.nonEmpty) { // && leftConstraintClass.isEmpty - // Update equivalence class of `right` expression. - constraintClasses = constraintClasses -.diff(rightConstraintClass :: Nil) :+ (rightConstraintClass + left) -} else { // leftConstraintClass.isEmpty && rightConstraintClass.isEmpty - // Create new equivalence constraint class since neither expression presents - // in any classes. - constraintClasses = constraintClasses :+ Set(left, right) -} - case _ => // Skip + private def eliminateAliasedExpressionInConstraints(constraints: Set[Expression]) +: Set[Expression] = { +val
[GitHub] spark pull request #19110: [SPARK-21027][ML][PYTHON] Added tunable paralleli...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19110 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19202: [SPARK-21980][SQL]References in grouping functions shoul...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19202 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18704: [SPARK-20783][SQL] Create ColumnVector to abstrac...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/18704#discussion_r138409265 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java --- @@ -147,6 +147,11 @@ private void throwUnsupportedException(int requiredCapacity, Throwable cause) { public abstract void putShorts(int rowId, int count, short[] src, int srcIndex); /** + * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count]) --- End diff -- I see. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org