[GitHub] spark pull request #19287: [SPARK-22074][Core] Task killed by other attempt ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19287#discussion_r141643992 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -724,6 +724,7 @@ private[spark] class TaskSetManager( logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " + s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " + s"as the attempt ${info.attemptNumber} succeeded on ${info.host}") + attemptInfo.markKilledByOtherAttempt --- End diff -- nit: this has a side-effect, so the normal scala-style is to call this w/ parens: `attemptInfo.markKilledByOtherAttempt()` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19287: [SPARK-22074][Core] Task killed by other attempt ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19287#discussion_r141668660 --- Diff: core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala --- @@ -58,4 +57,18 @@ object FakeTask { } new TaskSet(tasks, stageId, stageAttemptId, priority = 0, null) } + + def createShuffleMapTaskSet(numTasks: Int, stageId: Int, stageAttemptId: Int, +prefLocs: Seq[TaskLocation]*): TaskSet = { --- End diff -- nit: multi-line method definitions should have each param on its own line, indented 4 spaces ```scala def createShuffleMapTaskSet( numTasks: Int, stageId: Int, stageAttemptId: Int, prefLocs: Seq[TaskLocation]*): TaskSet = { ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19287: [SPARK-22074][Core] Task killed by other attempt ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19287#discussion_r141674264 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -744,6 +744,100 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(resubmittedTasks === 0) } + + test("[SPARK-22074] Task killed by other attempt task should not be resubmitted") { +val conf = new SparkConf().set("spark.speculation", "true") +sc = new SparkContext("local", "test", conf) +// Set the speculation multiplier to be 0 so speculative tasks are launched immediately +sc.conf.set("spark.speculation.multiplier", "0.0") +sc.conf.set("spark.speculation.quantile", "0.5") +sc.conf.set("spark.speculation", "true") + +val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), + ("exec2", "host2"), ("exec3", "host3")) +sched.initialize(new FakeSchedulerBackend() { + override def killTask( + taskId: Long, + executorId: String, + interruptThread: Boolean, + reason: String): Unit = {} +}) + +// Keep track of the number of tasks that are resubmitted, +// so that the test can check that no tasks were resubmitted. +var resubmittedTasks = 0 +val dagScheduler = new FakeDAGScheduler(sc, sched) { + override def taskEnded( + task: Task[_], + reason: TaskEndReason, + result: Any, + accumUpdates: Seq[AccumulatorV2[_, _]], + taskInfo: TaskInfo): Unit = { +super.taskEnded(task, reason, result, accumUpdates, taskInfo) +reason match { + case Resubmitted => resubmittedTasks += 1 + case _ => +} + } +} +sched.setDAGScheduler(dagScheduler) + +val taskSet = FakeTask.createShuffleMapTaskSet(4, 0, 0, + Seq(TaskLocation("host1", "exec1")), + Seq(TaskLocation("host1", "exec1")), + Seq(TaskLocation("host3", "exec3")), + Seq(TaskLocation("host2", "exec2"))) + +val clock = new ManualClock() +val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) +val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task => + task.metrics.internalAccums +} +// Offer resources for 4 tasks to start +for ((k, v) <- List( + "exec1" -> "host1", + "exec1" -> "host1", + "exec3" -> "host3", + "exec2" -> "host2")) { + val taskOption = manager.resourceOffer(k, v, NO_PREF) + assert(taskOption.isDefined) + val task = taskOption.get + assert(task.executorId === k) +} +assert(sched.startedTasks.toSet === Set(0, 1, 2, 3)) +clock.advance(1) +// Complete the 2 tasks and leave 2 task in running +for (id <- Set(0, 1)) { + manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id))) + assert(sched.endedTasks(id) === Success) +} + +// checkSpeculatableTasks checks that the task runtime is greater than the threshold for +// speculating. Since we use a threshold of 0 for speculation, tasks need to be running for +// > 0ms, so advance the clock by 1ms here. +clock.advance(1) +assert(manager.checkSpeculatableTasks(0)) +assert(sched.speculativeTasks.toSet === Set(2, 3)) + +// Offer resource to start the speculative attempt for the running task 2.0 +val taskOption = manager.resourceOffer("exec2", "host2", ANY) +assert(taskOption.isDefined) +val task4 = taskOption.get +assert(task4.index === 2) +assert(task4.taskId === 4) +assert(task4.executorId === "exec2") +assert(task4.attemptNumber === 1) +sched.backend = mock(classOf[SchedulerBackend]) --- End diff -- its really weird to switch `sched.backend` in the middle of the test. I worry that in the future this will not work as expected and the test will not work as expected when something else changes in the scheduler. instead, can you just override `killTask` in your subclass of `FakeSchedulerBackend` to track the calls you care about? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19194: [SPARK-20589] Allow limiting task concurrency per stage
Github user squito commented on the issue: https://github.com/apache/spark/pull/19194 Sorry if I was unclear earlier on the issue w/ the active Job ID. So, I agree, that if a user actually gets into this situation, where they've got two different jobs for the same stage, with different max concurrent tasks, its mostly a toss-up which one they'll get, as the users jobs are probably racing to get to that stage. Still, I think its important that it pulls the max concurrent tasks from the active job, just so that users can understand what is going on and for consistency and debugability. The TaskSetManager gets the property from the active job, which actually submitted the stage, so the ExecutorAllocationManager should do the same. I think the best way to ensure that is to add activeJobId to SparkListenerStageSubmitted. Then you'd go back to just keeping a jobIdToMaxConcurrentTasks map when handling onJobStart, and in onStageSubmitted, you'd then figure out the max number of tasks for that stage, given the job which actually submitted it. @tgravescs what do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r141475205 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -512,6 +535,9 @@ private[spark] class TaskSetManager( serializedTask) } } else { + if (runningTasks >= maxConcurrentTasks) { +logDebug("Already running max. no. of concurrent tasks.") --- End diff -- can you update this msg to include the jobGroup and full config name? also I think it would be good to have this log at the info level one time (not on every `resourceOffer`, just the first time we hit the limit for this TaskSet). I just see users hitting this inadvertently and so it would be good to make it clear. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19194: [SPARK-20589] Allow limiting task concurrency per stage
Github user squito commented on the issue: https://github.com/apache/spark/pull/19194 btw I'm not opposed to this feature, just want to make sure that given (a) complexity and (b) the fact that it doesn't quite meet the original ask anyway (configuring at the stage & no need to break up pipelines), that the folks that wanted it are still for it. I wonder if it should even be marked experimental in case we decide to drop it in favor of something stage level eventually. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19343: [SPARK-22121][SQL] Correct database location for namenod...
Github user squito commented on the issue: https://github.com/apache/spark/pull/19343 OK, closing this and the jira --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19343: [SPARK-22121][SQL] Correct database location for ...
Github user squito closed the pull request at: https://github.com/apache/spark/pull/19343 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps
Github user squito commented on the issue: https://github.com/apache/spark/pull/19250 @HyukjinKwon you might be interested in this one also --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19343: [SPARK-22121][SQL] Correct database location for namenod...
Github user squito commented on the issue: https://github.com/apache/spark/pull/19343 I don't see much point in putting this in the docs ... it seems too fine-grained a detail to be useful there. I just don't see the users who encounter this exception from going to look at the spot in the docs to figure out exactly what is wrong. I put an example exception in the JIRA, so at least users can find it with a search. Sounds like you feel pretty strongly we should close this as "won't fix"? I'd still prefer to have this in, but will settle for just having the workaround be easily searchable --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19194: [SPARK-20589] Allow limiting task concurrency per stage
Github user squito commented on the issue: https://github.com/apache/spark/pull/19194 one more thought -- have you considered adding stage properties? I know that is more involved since its an API change, but I dunno how many times I've wanted something like that, eg. for labelling a stage in the UI, for partitioning hints, etc. And if you had it, then it would remove all the complications in this of multiple jobs & threads etc., and allow the property to actually be set on just one stage, as opposed to the entire job, as was originally requested. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19338: [SPARK-22123][CORE] Add latest failure reason for...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19338#discussion_r141098993 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala --- @@ -61,6 +61,16 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int, private val blacklistedExecs = new HashSet[String]() private val blacklistedNodes = new HashSet[String]() + private var latestFailureReason: String = null + + /** + * Get the most recent failure reason of this TaskSet. + * @return + */ + def getLatestFailureReason: String = { --- End diff -- @jerryshao the whole class is `private[scheduler]`, so I think is OK. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19343: [SPARK-22121][SQL] Correct database location for namenod...
Github user squito commented on the issue: https://github.com/apache/spark/pull/19343 whoops, sorry I wrote [CORE] out of habit! > Spark SQL might not be deployed in the HDFS system. Conceptually, this HDFS-specific codes should not be part of our HiveExternalCatalog . HiveExternalCatalog is just for using Hive metastore. It does not assume we use HDFS. yes, I totally understand that. Even for users that are on hdfs, this is clear user-error, they should be using hive's metatool to update the database location. Originally I thought this would be unnecessary complication in spark, but with enough complaints I figured maybe spark could just handle it automatically. Is there another place this could go instead? Anyway, if you really feel like this doesn't belong in spark, that is fine. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19343: [SPARK-22121][CORE] Correct database location for...
GitHub user squito opened a pull request: https://github.com/apache/spark/pull/19343 [SPARK-22121][CORE] Correct database location for namenode HA. ## What changes were proposed in this pull request? If hdfs HA is turned on after a hive database is already created, the db location may still reference just one namenode, instead of the nameservice, if users do not properly follow all upgrade instructions. After this change, spark detects the misconfiguration and tries to auto-adjust for it, since this is the behavior from hive as well. ## How was this patch tested? Added unit tests. Also deployed on a cluster with hdfs ha, with the database location set to only one instance, and then I failed over the namenode so the other instance was the active one. After this change, things worked without a problem. You can merge this pull request into a Git repository by running: $ git pull https://github.com/squito/spark SPARK-22121 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19343.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19343 commit c2e125eacb48971ee72dd61859a95ca8ae6a9fc8 Author: Imran Rashid Date: 2017-09-26T00:55:58Z [SPARK-22121][CORE] Correct database location for namenode HA. If hdfs HA is turned on after a hive database is already created, the db location may still reference just one namenode, instead of the nameservice, if users do not properly follow all upgrade instructions. After this change, spark detects the misconfiguration and tries to auto-adjust for it, since this is the behavior from hive as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19331: [SPARK-22109][SQL] Resolves type conflicts betwee...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19331#discussion_r140890429 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala --- @@ -470,15 +471,15 @@ object PartitioningUtils { * Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower" * types. */ - private def resolveTypeConflicts(literals: Seq[Literal]): Seq[Literal] = { + private def resolveTypeConflicts(literals: Seq[Literal], timeZone: TimeZone): Seq[Literal] = { val desiredType = { val topType = literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_)) --- End diff -- thanks for such a quick fix! but, don't we also need to update [`upCastingOrder`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala#L430)? It seems this happens to work because `upcastingOrder.indexOf(TimestampType) = -1`. But I think that doesn't compare the right way with NullType, so if you add this to [`ParquetPartitionDiscoverySuite.test("parse partitions")`](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala#L241), it fails: ``` check(Seq( s"hdfs://host:9000/path/a=$defaultPartitionName/b=blah", s"hdfs://host:9000/path/a=2014-01-01 00%3A00%3A00.0/b=foo"), PartitionSpec( StructType(Seq( StructField("a", TimestampType), StructField("b", StringType))), Seq( Partition(InternalRow(null, "blah"), s"hdfs://host:9000/path/a=$defaultPartitionName/b=blah"), Partition(InternalRow(Timestamp.valueOf("2014-01-01 00:00:00.0"), "foo"), s"hdfs://host:9000/path/a=2014-01-01 00%3A00%3A00.0/b=foo" ``` (I have to admit, I don't totally understand what the ramifications of that fail are -- the behavior in the resulting dataframe seems fine to me, but I figure there is probably some case this would mess up ...) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19338: [SPARK-21539][CORE] Add latest failure reason for...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19338#discussion_r140823560 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala --- @@ -94,7 +96,9 @@ private[scheduler] class TaskSetBlacklist(val conf: SparkConf, val stageId: Int, private[scheduler] def updateBlacklistForFailedTask( host: String, exec: String, - index: Int): Unit = { + index: Int, + failureReason: Option[String] = None): Unit = { --- End diff -- `failureReason` should always be present in this call, so it shouldn't be an `Option` as an arg to this method. (I realize this is a bit of a pain as you have to modify all the call sites in tests, sorry about that). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19338: [SPARK-21539][CORE] Add latest failure reason for...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19338#discussion_r140823764 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -838,7 +840,7 @@ private[spark] class TaskSetManager( if (!isZombie && reason.countTowardsTaskFailures) { taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask( -info.host, info.executorId, index)) +info.host, info.executorId, index, Some(failureReason))) assert (null != failureReason) --- End diff -- move the `assert (null != failureReason)` first, and to go along with the other change, drop the `Some` wrapper around `failureReason`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19338: [SPARK-21539][CORE] Add latest failure reason for task s...
Github user squito commented on the issue: https://github.com/apache/spark/pull/19338 @caneGuy thanks for working on this, looks very reasonable to me, I am going to take a closer look at a couple of details. But can you make a couple of updates in the meantime: 1) Can you open a new jira for this, and put that in the commit summary? SPARK-21539 is referring to something else entirely 2) Can you reformat the new exception to look a bit more like the formatting for when there are too many failures of a specific task? maybe like this: ``` User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Aborting TaskSet 0.0 because task 0 (partition 0) cannot run anywhere due to node and executor blacklist. Most recent failure: Lost task 0.1 in stage 0.0 (TID 3,xxx, executor 1): java.lang.Exception: Fake error! at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:73) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:305) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Blacklisting behavior can be configured via spark.blacklist.*. Driver Stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1458) ... ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19338: [SPARK-21539][CORE] Add latest failure reason for task s...
Github user squito commented on the issue: https://github.com/apache/spark/pull/19338 Jenkins, ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19311#discussion_r140815961 --- Diff: core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala --- @@ -407,4 +407,119 @@ class MemoryStoreSuite }) assert(memoryStore.getSize(blockId) === 1) } + + test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") { +// Setup a memory store with many blocks cached, and then one request which leads to multiple +// blocks getting evicted. We'll make the eviction throw an exception, and make sure that +// all locks are released. +val ct = implicitly[ClassTag[Array[Byte]]] +def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, readLockAfterDrop: Boolean): Unit = { --- End diff -- I don't think `validBlock` captures the intent here -- I don't see anything valid or invalid about it either way. The part of the behavior which changes is whether or not another thread grabs a reader lock on the thread after it gets dropped to disk. (To go along with that, we drop the block to disk, rather than just evicting it completely, as otherwise there is nothing to grab a lock of. I could always drop the block to disk, instead of having that depend on this, it just seemed like another useful thing to check, whether the number of blocks was successfully updated in `blockInfoManager`, when the block was dropped completely.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19311#discussion_r140813788 --- Diff: core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala --- @@ -407,4 +407,119 @@ class MemoryStoreSuite }) assert(memoryStore.getSize(blockId) === 1) } + + test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") { +// Setup a memory store with many blocks cached, and then one request which leads to multiple +// blocks getting evicted. We'll make the eviction throw an exception, and make sure that +// all locks are released. +val ct = implicitly[ClassTag[Array[Byte]]] +def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, readLockAfterDrop: Boolean): Unit = { + val tc = TaskContext.empty() + val memManager = new StaticMemoryManager(conf, Long.MaxValue, 100, numCores = 1) + val blockInfoManager = new BlockInfoManager + blockInfoManager.registerTask(tc.taskAttemptId) + var droppedSoFar = 0 + val blockEvictionHandler = new BlockEvictionHandler { +var memoryStore: MemoryStore = _ + +override private[storage] def dropFromMemory[T: ClassTag]( +blockId: BlockId, +data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel = { + if (droppedSoFar < failAfterDroppingNBlocks) { +droppedSoFar += 1 +memoryStore.remove(blockId) +if (readLockAfterDrop) { + // for testing purposes, we act like another thread gets the read lock on the new + // block + StorageLevel.DISK_ONLY +} else { + StorageLevel.NONE +} + } else { +throw new RuntimeException(s"Mock error dropping block $droppedSoFar") + } +} + } + val memoryStore = new MemoryStore(conf, blockInfoManager, serializerManager, memManager, + blockEvictionHandler) { +override def afterDropAction(blockId: BlockId): Unit = { + if (readLockAfterDrop) { +// pretend that we get a read lock on the block (now on disk) in another thread +TaskContext.setTaskContext(tc) +blockInfoManager.lockForReading(blockId) +TaskContext.unset() + } +} + } + + blockEvictionHandler.memoryStore = memoryStore + memManager.setMemoryStore(memoryStore) + + // Put in some small blocks to fill up the memory store + val initialBlocks = (1 to 10).map { id => +val blockId = BlockId(s"rdd_1_$id") +val blockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, tellMaster = false) +val initialWriteLock = blockInfoManager.lockNewBlockForWriting(blockId, blockInfo) +assert(initialWriteLock) +val success = memoryStore.putBytes(blockId, 10, MemoryMode.ON_HEAP, () => { + new ChunkedByteBuffer(ByteBuffer.allocate(10)) +}) +assert(success) +blockInfoManager.unlock(blockId, None) + } + assert(blockInfoManager.size === 10) + + + // Add one big block, which will require evicting everything in the memorystore. However our + // mock BlockEvictionHandler will throw an exception -- make sure all locks are cleared. + val largeBlockId = BlockId(s"rdd_2_1") + val largeBlockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, tellMaster = false) + val initialWriteLock = blockInfoManager.lockNewBlockForWriting(largeBlockId, largeBlockInfo) + assert(initialWriteLock) + if (failAfterDroppingNBlocks < 10) { +val exc = intercept[RuntimeException] { + memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () => { +new ChunkedByteBuffer(ByteBuffer.allocate(100)) + }) +} +assert(exc.getMessage().startsWith("Mock error dropping block"), exc) +// BlockManager.doPut takes care of releasing the lock for the newly written block -- not +// testing that here, so do it manually +blockInfoManager.removeBlock(largeBlockId) + } else { +memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () => { + new ChunkedByteBuffer(ByteBuffer.allocate(100)) +}) +// BlockManager.doPut takes care of releasing the lock for the newly written block -- not +// testing that here, so do it manually +blockInfoManager.unlock(largeBlockId) + } + + val largeBlockInMemory = if (failAfterDrop
[GitHub] spark pull request #18853: [SPARK-21646][SQL] CommonType for binary comparis...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18853#discussion_r140802924 --- Diff: docs/sql-programming-guide.md --- @@ -1460,6 +1460,13 @@ that these options will be deprecated in future release as more optimizations ar Configures the number of partitions to use when shuffling data for joins or aggregations. + +spark.sql.typeCoercion.mode +default + +Whether compatible with Hive. Available options are default and hive. --- End diff -- This description feels inadequate to me. I think most users will think "hive" means "old, legacy way of doing things and "default" means "new, better, spark way of doing things". But I haven't heard an argument in favor of the "default" behavior, just that we don't want to have a breaking change of behavior. So (a) I'd advocate that we rename "default" to "legacy", or something else along those lines. I do think it should be the default value, to avoid changing behavior. and (b) I think the doc section here should more clearly indicate the difference, eg. "The 'legacy' typeCoercion mode was used in spark prior to 2.3, and so it continues to be the default to avoid breaking behavior. However, it has logical inconsistencies. The 'hive' mode is preferred for most new applications, though it may require additional manual casting. I am even wondering if we should have a 3rd option, to not implicit cast across type categories, eg. like postgres, as this avoids nasty surprises for the user. While the casts are convenient, when it doesn't work there is very little indication to the user that anything went wrong -- most likely they'll just keep continue processing data though the results don't actually have the semantics they want. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18853: [SPARK-21646][SQL] CommonType for binary comparis...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18853#discussion_r140799432 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -115,21 +115,46 @@ object TypeCoercion { * is a String and the other is not. It also handles when one op is a Date and the * other is a Timestamp by making the target type to be String. */ - val findCommonTypeForBinaryComparison: (DataType, DataType) => Option[DataType] = { -// We should cast all relative timestamp/date/string comparison into string comparisons -// This behaves as a user would expect because timestamp strings sort lexicographically. -// i.e. TimeStamp(2013-01-01 00:00 ...) < "2014" = true -case (StringType, DateType) => Some(StringType) -case (DateType, StringType) => Some(StringType) -case (StringType, TimestampType) => Some(StringType) -case (TimestampType, StringType) => Some(StringType) -case (TimestampType, DateType) => Some(StringType) -case (DateType, TimestampType) => Some(StringType) -case (StringType, NullType) => Some(StringType) -case (NullType, StringType) => Some(StringType) -case (l: StringType, r: AtomicType) if r != StringType => Some(r) -case (l: AtomicType, r: StringType) if (l != StringType) => Some(l) -case (l, r) => None + private def findCommonTypeForBinaryComparison( + plan: LogicalPlan, + l: DataType, + r: DataType): Option[DataType] = +if (!plan.conf.isHiveTypeCoercionMode) { + (l, r) match { +// We should cast all relative timestamp/date/string comparison into string comparisons +// This behaves as a user would expect because timestamp strings sort lexicographically. +// i.e. TimeStamp(2013-01-01 00:00 ...) < "2014" = true --- End diff -- I think this comment should be updated given the latest investigation, explaining both the original motivation, and how it is flawed. Eg. "Originally spark cast all relative timestamp/date/string comparison into string comparisons. The motivation was that this would lead to natural comparisons on simple string inputs for times, eg. TimeStamp(2013-01-01 00:00 ...) < "2014" = true. However, this leads to other logical inconsistencies, eg. TimeStamp(2013-01-01 00:00 ...) < "5" = true. Also, equals is not consistent with other binary comparisions. Futhermore, comparing to a string that does not look like a time at all will still compare, just with an unexpected result." --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18853: [SPARK-21646][SQL] CommonType for binary comparis...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18853#discussion_r140801036 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- @@ -2677,4 +2677,142 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { checkAnswer(df, Row(1, 1, 1)) } } + + test("SPARK-21646: CommonTypeForBinaryComparison: StringType vs NumericType") { +withTempView("v") { + val str1 = Long.MaxValue.toString + "1" + val str2 = Int.MaxValue.toString + "1" + val str3 = "10" + Seq(str1, str2, str3).toDF("c1").createOrReplaceTempView("v") + withSQLConf(SQLConf.typeCoercionMode.key -> "hive") { +checkAnswer(sql("SELECT c1 from v where c1 > 0"), + Row(str1) :: Row(str2) :: Row(str3) :: Nil) +checkAnswer(sql("SELECT c1 from v where c1 > 0L"), + Row(str1) :: Row(str2) :: Row(str3) :: Nil) + } + + withSQLConf(SQLConf.typeCoercionMode.key -> "default") { +checkAnswer(sql("SELECT c1 from v where c1 > 0"), Row(str3) :: Nil) +checkAnswer(sql("SELECT c1 from v where c1 > 0L"), Row(str2) :: Row(str3) :: Nil) + } +} + } + + test("SPARK-21646: CommonTypeForBinaryComparison: DoubleType vs IntegerType") { +withTempView("v") { + Seq(("0", 1), ("-0.4", 2), ("0.6", 3)).toDF("c1", "c2").createOrReplaceTempView("v") + withSQLConf(SQLConf.typeCoercionMode.key -> "hive") { +checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0"), Seq(Row("0"))) +checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0L"), Seq(Row("0"))) +checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0.0"), Seq(Row("0"))) +checkAnswer(sql("SELECT c1 FROM v WHERE c1 = -0.4"), Seq(Row("-0.4"))) +checkAnswer(sql("SELECT count(*) FROM v WHERE c1 > 0"), Row(1) :: Nil) + } + + withSQLConf(SQLConf.typeCoercionMode.key -> "default") { +checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0"), Seq(Row("0"), Row("-0.4"), Row("0.6"))) +checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0L"), Seq(Row("0"), Row("-0.4"), Row("0.6"))) +checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0.0"), Seq(Row("0"))) +checkAnswer(sql("SELECT c1 FROM v WHERE c1 = -0.4"), Seq(Row("-0.4"))) +checkAnswer(sql("SELECT count(*) FROM v WHERE c1 > 0"), Row(0) :: Nil) + } +} + } + + test("SPARK-21646: CommonTypeForBinaryComparison: StringType vs DateType") { +withTempView("v") { + val v1 = Date.valueOf("2017-09-22") + val v2 = Date.valueOf("2017-09-09") + Seq(v1, v2).toDF("c1").createTempView("v") + withSQLConf(SQLConf.typeCoercionMode.key -> "hive") { +checkAnswer(sql("select c1 from v where c1 > '2017-8-1'"), Row(v1) :: Row(v2) :: Nil) +checkAnswer(sql("select c1 from v where c1 > cast('2017-8-1' as date)"), + Row(v1) :: Row(v2) :: Nil) + } + + withSQLConf(SQLConf.typeCoercionMode.key -> "default") { +checkAnswer(sql("select c1 from v where c1 > '2017-8-1'"), Nil) +checkAnswer(sql("select c1 from v where c1 > cast('2017-8-1' as date)"), + Row(v1) :: Row(v2) :: Nil) + } +} + } + + test("SPARK-21646: CommonTypeForBinaryComparison: StringType vs TimestampType") { +withTempView("v") { + val v1 = Timestamp.valueOf("2017-07-21 23:42:12.123") + val v2 = Timestamp.valueOf("2017-08-21 23:42:12.123") + Seq(v1, v2).toDF("c1").createTempView("v") + withSQLConf(SQLConf.typeCoercionMode.key -> "hive") { +checkAnswer(sql("select c1 from v where c1 > '2017-8-1'"), Row(v2) :: Nil) +checkAnswer(sql("select c1 from v where c1 > cast('2017-8-1' as timestamp)"), + Row(v2) :: Nil) + } + + withSQLConf(SQLConf.typeCoercionMode.key -> "default") { +checkAnswer(sql("select c1 from v where c1 > '2017-8-1'"), Nil) +checkAnswer(sql("select c1 from v where c1 > cast('2017-8-1' as timestamp)"), + Row(v2) :: Nil) --- End diff -- perhaps there should also be a comparison for a time with more degrees of precision? It seems from the original discussion in https://issues.apache.org/jira/browse/SPARK-8420?focusedCommentId=14592654&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14592654 this was one of the concerns, eg. with `'1969-12-31 16:00:00'` `'1969-12-31 16:00:00.0'` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18853: [SPARK-21646][SQL] CommonType for binary comparis...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18853#discussion_r140797299 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- @@ -2677,4 +2677,142 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { checkAnswer(df, Row(1, 1, 1)) } } + + test("SPARK-21646: CommonTypeForBinaryComparison: StringType vs NumericType") { +withTempView("v") { + val str1 = Long.MaxValue.toString + "1" + val str2 = Int.MaxValue.toString + "1" + val str3 = "10" + Seq(str1, str2, str3).toDF("c1").createOrReplaceTempView("v") + withSQLConf(SQLConf.typeCoercionMode.key -> "hive") { +checkAnswer(sql("SELECT c1 from v where c1 > 0"), + Row(str1) :: Row(str2) :: Row(str3) :: Nil) +checkAnswer(sql("SELECT c1 from v where c1 > 0L"), + Row(str1) :: Row(str2) :: Row(str3) :: Nil) + } + + withSQLConf(SQLConf.typeCoercionMode.key -> "default") { +checkAnswer(sql("SELECT c1 from v where c1 > 0"), Row(str3) :: Nil) +checkAnswer(sql("SELECT c1 from v where c1 > 0L"), Row(str2) :: Row(str3) :: Nil) + } +} + } + + test("SPARK-21646: CommonTypeForBinaryComparison: DoubleType vs IntegerType") { +withTempView("v") { + Seq(("0", 1), ("-0.4", 2), ("0.6", 3)).toDF("c1", "c2").createOrReplaceTempView("v") + withSQLConf(SQLConf.typeCoercionMode.key -> "hive") { +checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0"), Seq(Row("0"))) +checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0L"), Seq(Row("0"))) +checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0.0"), Seq(Row("0"))) +checkAnswer(sql("SELECT c1 FROM v WHERE c1 = -0.4"), Seq(Row("-0.4"))) +checkAnswer(sql("SELECT count(*) FROM v WHERE c1 > 0"), Row(1) :: Nil) + } + + withSQLConf(SQLConf.typeCoercionMode.key -> "default") { +checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0"), Seq(Row("0"), Row("-0.4"), Row("0.6"))) +checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0L"), Seq(Row("0"), Row("-0.4"), Row("0.6"))) +checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0.0"), Seq(Row("0"))) +checkAnswer(sql("SELECT c1 FROM v WHERE c1 = -0.4"), Seq(Row("-0.4"))) +checkAnswer(sql("SELECT count(*) FROM v WHERE c1 > 0"), Row(0) :: Nil) + } +} + } + + test("SPARK-21646: CommonTypeForBinaryComparison: StringType vs DateType") { +withTempView("v") { + val v1 = Date.valueOf("2017-09-22") + val v2 = Date.valueOf("2017-09-09") + Seq(v1, v2).toDF("c1").createTempView("v") + withSQLConf(SQLConf.typeCoercionMode.key -> "hive") { +checkAnswer(sql("select c1 from v where c1 > '2017-8-1'"), Row(v1) :: Row(v2) :: Nil) +checkAnswer(sql("select c1 from v where c1 > cast('2017-8-1' as date)"), + Row(v1) :: Row(v2) :: Nil) + } + + withSQLConf(SQLConf.typeCoercionMode.key -> "default") { +checkAnswer(sql("select c1 from v where c1 > '2017-8-1'"), Nil) +checkAnswer(sql("select c1 from v where c1 > cast('2017-8-1' as date)"), + Row(v1) :: Row(v2) :: Nil) + } +} + } + + test("SPARK-21646: CommonTypeForBinaryComparison: StringType vs TimestampType") { +withTempView("v") { + val v1 = Timestamp.valueOf("2017-07-21 23:42:12.123") + val v2 = Timestamp.valueOf("2017-08-21 23:42:12.123") + Seq(v1, v2).toDF("c1").createTempView("v") + withSQLConf(SQLConf.typeCoercionMode.key -> "hive") { +checkAnswer(sql("select c1 from v where c1 > '2017-8-1'"), Row(v2) :: Nil) +checkAnswer(sql("select c1 from v where c1 > cast('2017-8-1' as timestamp)"), + Row(v2) :: Nil) + } + + withSQLConf(SQLConf.typeCoercionMode.key -> "default") { +checkAnswer(sql("select c1 from v where c1 > '2017-8-1'"), Nil) +checkAnswer(sql("select c1 from v where c1 > cast('2017-8-1' as timestamp)"), + Row(v2) :: Nil) --- End diff -- I think we should include a comparison which is only the year, eg. ` > '2014'`, as that was listed as the motivation for the "default" behavior in the code comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19311#discussion_r140559549 --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala --- @@ -544,20 +544,39 @@ private[spark] class MemoryStore( } if (freedMemory >= space) { -logInfo(s"${selectedBlocks.size} blocks selected for dropping " + - s"(${Utils.bytesToString(freedMemory)} bytes)") -for (blockId <- selectedBlocks) { - val entry = entries.synchronized { entries.get(blockId) } - // This should never be null as only one task should be dropping - // blocks and removing entries. However the check is still here for - // future safety. - if (entry != null) { -dropBlock(blockId, entry) +var exceptionWasThrown: Boolean = true +try { + logInfo(s"${selectedBlocks.size} blocks selected for dropping " + +s"(${Utils.bytesToString(freedMemory)} bytes)") + for (blockId <- selectedBlocks) { +val entry = entries.synchronized { + entries.get(blockId) +} +// This should never be null as only one task should be dropping +// blocks and removing entries. However the check is still here for +// future safety. +if (entry != null) { + dropBlock(blockId, entry) +} + } + exceptionWasThrown = false + logInfo(s"After dropping ${selectedBlocks.size} blocks, " + +s"free memory is ${Utils.bytesToString(maxMemory - blocksMemoryUsed)}") + freedMemory +} finally { + // like BlockManager.doPut, we use a finally rather than a catch to avoid having to deal + // with InterruptedException + if (exceptionWasThrown) { +selectedBlocks.foreach { id => + // some of the blocks may have already been unlocked, or completely removed + blockInfoManager.get(id).foreach { info => --- End diff -- good point, thanks, I've handled this now --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19313: [SPARK-21928][CORE] Set classloader on Serializer...
Github user squito closed the pull request at: https://github.com/apache/spark/pull/19313 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19313: [SPARK-21928][CORE] Set classloader on SerializerManager...
Github user squito commented on the issue: https://github.com/apache/spark/pull/19313 thanks @vanzin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19313: [SPARK-21928][CORE] Set classloader on Serializer...
GitHub user squito opened a pull request: https://github.com/apache/spark/pull/19313 [SPARK-21928][CORE] Set classloader on SerializerManager's private kryo ## What changes were proposed in this pull request? We have to make sure that SerializerManager's private instance of kryo also uses the right classloader, regardless of the current thread classloader. In particular, this fixes serde during remote cache fetches, as those occur in netty threads. ## How was this patch tested? Manual tests & existing suite via jenkins. I haven't been able to reproduce this is in a unit test, because when a remote RDD partition can be fetched, there is a warning message and then the partition is just recomputed locally. I manually verified the warning message is no longer present. (cherry picked from commit b75bd1777496ce0354458bf85603a8087a6a0ff8) You can merge this pull request into a Git repository by running: $ git pull https://github.com/squito/spark SPARK-21928_2.1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19313.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19313 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r140337893 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager( // place the executors. private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] +override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobStart.stageInfos.foreach(stageInfo => stageIdToJobId(stageInfo.stageId) = jobStart.jobId) + + var jobGroupId = if (jobStart.properties != null) { +jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + } else { +null + } + + val maxConTasks = if (jobGroupId != null && +conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt + } else { +Int.MaxValue + } + + if (maxConTasks <= 0) { +throw new IllegalArgumentException( + "Maximum Concurrent Tasks should be set greater than 0 for the job to progress.") + } + + if (jobGroupId == null || !conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +jobGroupId = DEFAULT_JOB_GROUP + } + + jobIdToJobGroup(jobStart.jobId) = jobGroupId + if (!jobGroupToMaxConTasks.contains(jobGroupId)) { --- End diff -- Sorry I am doing a really bad job explaining my concerns. Of course, if you have multiple threads racing to set the value for the same job group, its unpredictable which value you'll get. Nothing we can do about that, its not the scenario I'm talking about. Here's the step-by-step (hope I understand the proposed change correctly): (1) T1: Set jobgroup = "foo". Set maxConTasks=10. (2) T1: Launch job 1.1 Uses maxConTasks=10 (3) T2: Set jobgroup = "foo". (4) T2: Launch job 2.1. Uses maxConTasks=10 (5) T1: Finish job 1.1. Do *not* remove the entry for `jobGroupToMaxConTasks("foo")`, because there is still another job running for this job group in T2 (https://github.com/apache/spark/pull/19194/files#diff-b096353602813e47074ace09a3890d56R664) (6) T1: Set maxConTasks=20 (7) T1: Launch job 1.2. Uses maxConTasks=10, because `jobGroupToMaxConTasks.contains("foo")`, so we don't reset the value. (8) T2: Finish job 2.1. Again, do not remove `jobGroupToMaxConTasks("foo")`, as T1 is still running a job in this job group. (9) Set maxConTasks=20 (10) T2: Run job 2.2. Uses maxConTasks=10, because `jobGroupToMaxConTasks.contains("foo")` As long as there a job running in T2 when T1 finishes its job (or vice versa), we never remove the prior value, and so never update it when starting a new job. We could be stuck with maxConTasks=10 in both threads indefinitely, even though both threads have set maxConTasks=20. If you remove that `if (!jobGroupToMaxConTasks.contains(jobGroupId))`, then when job 1.2 starts, you'd use the new value of maxConTasks=20. The only weird thing is that job 2.1 suddenly switches mid-flight to using maxConTasks=20 as well. But that seems more reasonable to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....
GitHub user squito opened a pull request: https://github.com/apache/spark/pull/19311 [SPARK-22083][CORE] Release locks in MemoryStore.evictBlocksToFreeSpace ## What changes were proposed in this pull request? MemoryStore.evictBlocksToFreeSpace acquires write locks for all the blocks it intends to evict up front. If there is a failure to evict blocks (eg., some failure dropping a block to disk), then we have to release the lock. Otherwise the lock is never released and an executor trying to get the lock will wait forever. ## How was this patch tested? Added unit test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/squito/spark SPARK-22083 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19311.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19311 commit 1ff270a1fdbd567965c6c721f0a92bc1b77bc240 Author: Imran Rashid Date: 2017-09-21T19:12:24Z [SPARK-22083][CORE] Release locks in MemoryStore.evictBlocksToFreeSpace MemoryStore.evictBlocksToFreeSpace acquires write locks for all the blocks it intends to evict up front. If there is a failure to evict blocks (eg., some failure dropping a block to disk), then we have to release the lock. Otherwise the lock is never released and an executor trying to get the lock will wait forever. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r140296863 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager( // place the executors. private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] +override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobStart.stageInfos.foreach(stageInfo => stageIdToJobId(stageInfo.stageId) = jobStart.jobId) + + var jobGroupId = if (jobStart.properties != null) { +jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + } else { +null + } + + val maxConTasks = if (jobGroupId != null && +conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt + } else { +Int.MaxValue + } + + if (maxConTasks <= 0) { +throw new IllegalArgumentException( + "Maximum Concurrent Tasks should be set greater than 0 for the job to progress.") + } + + if (jobGroupId == null || !conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +jobGroupId = DEFAULT_JOB_GROUP + } + + jobIdToJobGroup(jobStart.jobId) = jobGroupId + if (!jobGroupToMaxConTasks.contains(jobGroupId)) { --- End diff -- You could submit jobs concurrently, from two different threads. I didn't describe it very well -- say both threads are in the same job group. Each thread can only do one job at a time, but maybe between the two threads, there is always some active job for the job group the entire time. There are real use cases which are like this -- in "job-server" style deployments, there is a long-running spark context (most likely with cached RDDs), accepting requests from multiple users (perhaps sitting behind an http server). Maybe some group of users are always put into one job group (eg., there is a low-priority group and a high priority group of users). You might process more than one job for each group at a time, and there is a never ending stream of jobs. (again, its not the most common scenario, but might as well have it behave correctly.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19280: [SPARK-21928][CORE] Set classloader on SerializerManager...
Github user squito commented on the issue: https://github.com/apache/spark/pull/19280 > Looks ok to me, assuming the "default serializer" in SerializerManager is configured correctly through other means. I think that part is fine. The serializer is created here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkEnv.scala#L279 The same instance is assigned to `SparkEnv.serializer`: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkEnv.scala#L374 Which has its default classloader set in Executor.scala, right by the part I'm changing: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L131 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r140136101 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager( // place the executors. private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] +override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobStart.stageInfos.foreach(stageInfo => stageIdToJobId(stageInfo.stageId) = jobStart.jobId) + + var jobGroupId = if (jobStart.properties != null) { +jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + } else { +null + } + + val maxConTasks = if (jobGroupId != null && +conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt + } else { +Int.MaxValue + } + + if (maxConTasks <= 0) { +throw new IllegalArgumentException( + "Maximum Concurrent Tasks should be set greater than 0 for the job to progress.") + } + + if (jobGroupId == null || !conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +jobGroupId = DEFAULT_JOB_GROUP + } + + jobIdToJobGroup(jobStart.jobId) = jobGroupId + if (!jobGroupToMaxConTasks.contains(jobGroupId)) { --- End diff -- lemme put my concern another way: why don't you remove the `if (!jobGroupToMaxConTasks.contains(jobGroupId))`, and just unconditionally always make the assignment `jobGroupToMaxConTasks(jobGroupId) = maxConTasks`? that is simpler to reason about, and has all the properties we want. I agree the scenario I'm describing is pretty weird, but the only difference I see between your version and this is in that scenario. And its probably not the behavior we want in that scenario. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19280: [SPARK-21928][CORE] Set classloader on SerializerManager...
Github user squito commented on the issue: https://github.com/apache/spark/pull/19280 reaching out to some potential reviewers: @vanzin @srowen @JoshRosen @mridulm @tgravescs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps
Github user squito commented on the issue: https://github.com/apache/spark/pull/19250 also cc @yhuai @liancheng would appreciate a review since you've looked at sql & hive compatibility in the past --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory usage to...
Github user squito commented on the issue: https://github.com/apache/spark/pull/19160 lgtm @jerryshao I didn't merge yet in case you want to keep discussing the naming, but I'm fine with this as is, so feel free to merge. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19160#discussion_r140051678 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -248,6 +251,16 @@ private[spark] class BlockManager( logInfo(s"Initialized BlockManager: $blockManagerId") } + def shuffleMetricsSource: Source = { +import BlockManager._ + +if (externalShuffleServiceEnabled) { + new ShuffleMetricsSource("ExternalShuffle", shuffleClient.shuffleMetrics()) +} else { + new ShuffleMetricsSource("NettyBlockTransfer", shuffleClient.shuffleMetrics()) --- End diff -- ok, I guess I haven't seen enough setups to know how users subscribe to these and whether they'd actually want that distinction. (especially since it seems like some of the distinction probably shouldn't be there, eg. the openBlockLatency metric.) But I don't think there is any clear right answer here, if you think this is the best naming, that is fine with me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r139819360 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(taskOption4.get.addedJars === addedJarsMidTaskSet) } + test("limit max concurrent running tasks in a job group when configured ") { +val conf = new SparkConf(). + set(config.BLACKLIST_ENABLED, true). + set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max concurrent tasks to 2 + +sc = new SparkContext("local", "test", conf) +sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) +val props = new Properties(); +props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // set the job group + +val tasks = Array.tabulate[Task[_]](10) { i => + new FakeTask(0, i, Nil) +} +val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, props), 2) + +// make some offers to our taskset +var taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + // offer each executor twice (simulating 2 cores per executor) + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up to maxConcurrentTasks. + +// make 4 more offers +val taskDescs2 = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs2.size === 0) // tsm doesn't accept any as it is already running at max tasks + +// inform tsm that one task has completed +val directTaskResult = createTaskResult(0) +tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult) + +// make 4 more offers after previous task completed +taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs.size === 1) // tsm accepts one as it can run one more task + } + + test("do not limit max concurrent running tasks in a job group by default") { +val conf = new SparkConf(). + set(config.BLACKLIST_ENABLED, true) + +sc = new SparkContext("local", "test", conf) +sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + +val tasks = Array.tabulate[Task[_]](10) { i => + new FakeTask(0, i, Nil) +} +val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, null), 2) + +// make 5 offers to our taskset +var taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host2" +).flatMap { case (exec, host) => + // offer each executor twice (simulating 3 cores per executor) --- End diff -- update comments 5 offers -> 6 offers twice -> three times --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r139819793 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(taskOption4.get.addedJars === addedJarsMidTaskSet) } + test("limit max concurrent running tasks in a job group when configured ") { +val conf = new SparkConf(). + set(config.BLACKLIST_ENABLED, true). + set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max concurrent tasks to 2 + +sc = new SparkContext("local", "test", conf) +sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) +val props = new Properties(); +props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // set the job group + +val tasks = Array.tabulate[Task[_]](10) { i => + new FakeTask(0, i, Nil) +} +val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, props), 2) + +// make some offers to our taskset +var taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + // offer each executor twice (simulating 2 cores per executor) + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up to maxConcurrentTasks. + +// make 4 more offers +val taskDescs2 = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs2.size === 0) // tsm doesn't accept any as it is already running at max tasks + +// inform tsm that one task has completed +val directTaskResult = createTaskResult(0) +tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult) + +// make 4 more offers after previous task completed +taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs.size === 1) // tsm accepts one as it can run one more task + } + + test("do not limit max concurrent running tasks in a job group by default") { --- End diff -- I don't think this test really adds anything beyond other tests in this suite. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r139821899 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager( // place the executors. private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] +override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobStart.stageInfos.foreach(stageInfo => stageIdToJobId(stageInfo.stageId) = jobStart.jobId) + + var jobGroupId = if (jobStart.properties != null) { +jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + } else { +null + } + + val maxConTasks = if (jobGroupId != null && +conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt + } else { +Int.MaxValue + } + + if (maxConTasks <= 0) { +throw new IllegalArgumentException( + "Maximum Concurrent Tasks should be set greater than 0 for the job to progress.") + } + + if (jobGroupId == null || !conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +jobGroupId = DEFAULT_JOB_GROUP + } + + jobIdToJobGroup(jobStart.jobId) = jobGroupId + if (!jobGroupToMaxConTasks.contains(jobGroupId)) { --- End diff -- this is probably a weird / unusual situation, but is this really the behavior you want if there are multiple jobs submitted for the same job group? Wouldn't you just take the conf for the job group at the time each job was submitted? Worst case with this approach: say you are *always* submitting multiple jobs for each job group; when one finishes, you immediately start another one, so that the new one partially overlaps the old one. Then even if you change the conf, all jobs will keep using the old value forever. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r139818031 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -758,11 +825,52 @@ private[spark] class ExecutorAllocationManager( allocationManager.synchronized { stageIdToNumSpeculativeTasks(stageId) = stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1 +maxConcurrentTasks = getMaxConTasks +logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on spec. task submitted.") allocationManager.onSchedulerBacklogged() } } /** + * Calculate the maximum no. of concurrent tasks that can run currently. + */ +def getMaxConTasks(): Int = { + // We can limit the no. of concurrent tasks by a job group. A job group can have multiple jobs + // with multiple stages. We need to get all the active stages belonging to a job group to + // calculate the total no. of pending + running tasks to decide the maximum no. of executors + // we need at that time to serve the outstanding tasks. This is capped by the minimum no. of + // outstanding tasks and the max concurrent limit specified for the job group if any. + + def getIncompleteTasksForStage(stageId: Int, numTasks: Int): Int = { +totalPendingTasks(stageId) + totalRunningTasks(stageId) + } + + def sumIncompleteTasksForStages: (Int, (Int, Int)) => Int = (totalTasks, stageToNumTasks) => { +val activeTasks = getIncompleteTasksForStage(stageToNumTasks._1, stageToNumTasks._2) +sumOrMax(totalTasks, activeTasks) + } + // Get the total running & pending tasks for all stages in a job group. + def getIncompleteTasksForJobGroup(stagesItr: mutable.HashMap[Int, Int]): Int = { +stagesItr.foldLeft(0)(sumIncompleteTasksForStages) + } + + def sumIncompleteTasksForJobGroup: (Int, (String, mutable.HashMap[Int, Int])) => Int = { +(maxConTasks, x) => { + val totalIncompleteTasksForJobGroup = getIncompleteTasksForJobGroup(x._2) + val maxTasks = Math.min(jobGroupToMaxConTasks(x._1), totalIncompleteTasksForJobGroup) + sumOrMax(maxConTasks, maxTasks) +} + } + + def sumOrMax(a: Int, b: Int): Int = if (doesSumOverflow(a, b)) Int.MaxValue else (a + b) + + def doesSumOverflow(a: Int, b: Int): Boolean = b > (Int.MaxValue - a) + + val stagesByJobGroup = stageIdToNumTasks.groupBy(x => jobIdToJobGroup(stageIdToJobId(x._1))) --- End diff -- you could just store `stageIdToJobGroupId`. Simplifies this a bit, and then you dont' need to store `jobIdToJobGroup` at all, I think --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19145: [spark-21933][yarn] Spark Streaming request more executo...
Github user squito commented on the issue: https://github.com/apache/spark/pull/19145 I'm not sure I totally follow the sequence of events, but I get the feeling this should be handled in yarn, not spark. Also, I agree with Jerry, it seems like your `completedContainerIdSet` may grow continuously. You'll remove from it *if* you happen to get a duplicate message. But I think in most cases you will not a get duplicate message, if I understand correctly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19160#discussion_r139800662 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -115,6 +115,7 @@ private[spark] class Executor( if (!isLocal) { env.metricsSystem.registerSource(executorSource) env.blockManager.initialize(conf.getAppId) +env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource) --- End diff -- super nit: can you put the `reigsterSource()` calls next to each other? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19160#discussion_r139796177 --- Diff: core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala --- @@ -18,11 +18,14 @@ package org.apache.spark.network.netty import java.nio.ByteBuffer +import java.util --- End diff -- if you are trying to avoid confusion w/ scala's hashmaps, I think our convention is to rename w/ "J" prefix ``` import java.util.{HashMap => JHashMap} ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19160#discussion_r139795417 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -248,6 +251,16 @@ private[spark] class BlockManager( logInfo(s"Initialized BlockManager: $blockManagerId") } + def shuffleMetricsSource: Source = { +import BlockManager._ + +if (externalShuffleServiceEnabled) { + new ShuffleMetricsSource("ExternalShuffle", shuffleClient.shuffleMetrics()) +} else { + new ShuffleMetricsSource("NettyBlockTransfer", shuffleClient.shuffleMetrics()) --- End diff -- do you think we really need to distinguish these two cases? whether or not you have the external shuffle service, this memory is still owned by the executor JVM (its really only external on the remote end). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19160#discussion_r139799940 --- Diff: core/src/main/scala/org/apache/spark/deploy/ExternalShuffleServiceSource.scala --- @@ -19,19 +19,19 @@ package org.apache.spark.deploy import javax.annotation.concurrent.ThreadSafe -import com.codahale.metrics.MetricRegistry +import com.codahale.metrics.{MetricRegistry, MetricSet} import org.apache.spark.metrics.source.Source -import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler /** * Provides metrics source for external shuffle service */ @ThreadSafe -private class ExternalShuffleServiceSource -(blockHandler: ExternalShuffleBlockHandler) extends Source { +private class ExternalShuffleServiceSource extends Source { --- End diff -- just for my own understanding, not directly related to this change -- I hadn't realized that the ExternalShuffleBlockHandler had its own ShuffleMetrics already. Some of those metrics really seem like they should be part of regular shuffle server, in the executor. Eg., [openBlockRequestLatencyMillis](https://github.com/apache/spark/blob/master/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java#L89). Do you know why its separate? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19211: [SPARK-18838][core] Add separate listener queues to Live...
Github user squito commented on the issue: https://github.com/apache/spark/pull/19211 lgtm do you know how much better this makes it? Eg., if we had an existing case where things go haywire in dynamic allocation because of this -- we could see if after this change, the only dropped events are in eventLog, hopefully. Seems like the right change regardless, but it would be good to know the effect. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19211: [SPARK-18838][core] Add separate listener queues ...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19211#discussion_r139787945 --- Diff: core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala --- @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} + +import com.codahale.metrics.{Gauge, Timer} + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.util.Utils + +/** + * An asynchronous queue for events. All events posted to this queue will be delivered to the child + * listeners in a separate thread. + * + * Delivery will only begin when the `start()` method is called. The `stop()` method should be + * called when no more events need to be delivered. + */ +private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveListenerBusMetrics) + extends SparkListenerBus + with Logging { + + import AsyncEventQueue._ + + // Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if + // it's perpetually being added to more quickly than it's being drained. + private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]( +conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) + + // Keep the event count separately, so that waitUntilEmpty() can be implemented properly; + // this allows that method to return only when the events in the queue have been fully + // processed (instead of just dequeued). + private val eventCount = new AtomicLong() + + /** A counter for dropped events. It will be reset every time we log it. */ + private val droppedEventsCounter = new AtomicLong(0L) + + /** When `droppedEventsCounter` was logged last time in milliseconds. */ + @volatile private var lastReportTimestamp = 0L + + private val logDroppedEvent = new AtomicBoolean(false) + + private var sc: SparkContext = null + + private val started = new AtomicBoolean(false) + private val stopped = new AtomicBoolean(false) + + private val droppedEvents = metrics.metricRegistry.counter(s"queue.$name.numDroppedEvents") + private val processingTime = metrics.metricRegistry.timer(s"queue.$name.listenerProcessingTime") + + // Remove the queue size gauge first, in case it was created by a previous incarnation of + // this queue that was removed from the listener bus. + metrics.metricRegistry.remove(s"queue.$name.size") + metrics.metricRegistry.register(s"queue.$name.size", new Gauge[Int] { +override def getValue: Int = eventQueue.size() + }) + + private val dispatchThread = new Thread(s"spark-listener-group-$name") { +setDaemon(true) +override def run(): Unit = Utils.tryOrStopSparkContext(sc) { + dispatch() +} + } + + private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) { +try { + var next: SparkListenerEvent = eventQueue.take() + while (next != POISON_PILL) { +val ctx = processingTime.time() +try { + super.postToAll(next) +} finally { + ctx.stop() +} +eventCount.decrementAndGet() +next = eventQueue.take() + } + eventCount.decrementAndGet() +} catch { + case ie: InterruptedException => +logInfo(s"Stopping listener queue $name.", ie) +} + } + + override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = { + metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface])) + } +
[GitHub] spark pull request #19280: [SPARK-21928][CORE] Set classloader on Serializer...
GitHub user squito opened a pull request: https://github.com/apache/spark/pull/19280 [SPARK-21928][CORE] Set classloader on SerializerManager private kryo ## What changes were proposed in this pull request? We have to make sure thatthat SerializerManager's private instance of kryo also uses the right classloader, regardless of the current thread classloader. In particular, this fixes serde during remote cache fetches, as those occur in netty threads. ## How was this patch tested? Manual tests & existing suite via jenkins. I haven't been able to reproduce this is in a unit test, because when a remote RDD partition can be fetched, there is a warning message and then the partition is just recomputed locally. I manually verified the warning message is no longer present. You can merge this pull request into a Git repository by running: $ git pull https://github.com/squito/spark SPARK-21928_ser_classloader Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19280.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19280 commit acbaf8b65629344d760360b768e89f1712af8942 Author: Imran Rashid Date: 2017-09-19T15:19:43Z [SPARK-21928][CORE] Set classloader on SerializerManager private kryo We have to make sure thatthat SerializerManager's private instance of kryo also uses the right classloader, regardless of the current thread classloader. In particular, this fixes serde during remote cache fetches, as those occur in netty threads. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps
Github user squito commented on the issue: https://github.com/apache/spark/pull/19250 Hi @ueshin @rxin, could you please review? thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19250: [SPARK-12297] Table timezone correction for Times...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19250#discussion_r139234142 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala --- @@ -92,7 +92,7 @@ case class CreateHiveTableAsSelectCommand( } override def argString: String = { -s"[Database:${tableDesc.database}}, " + +s"[Database:${tableDesc.database}, " + --- End diff -- totally unrelated typo fix, but didn't seem worth an independent pr --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19250: [SPARK-12297] Table timezone correction for Times...
GitHub user squito opened a pull request: https://github.com/apache/spark/pull/19250 [SPARK-12297] Table timezone correction for Timestamps ## What changes were proposed in this pull request? When reading and writing data, spark will adjust timestamp data based on the delta between the current session timezone and the table time zone (specified either by a persistent table property, or an option to the DataFrameReader / Writer). This is particularly important for parquet data, so that it can be treated equivalently by other SQL engines (eg. Impala and Hive). Furthermore, this is useful if the same data is processed by multiple clusters in different time zones, and "timestamp without time zone" semantics are desired. ## How was this patch tested? Unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/squito/spark timestamp_all_formats Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19250.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19250 commit 54f87c2c1e0ab0645fa5497553cf031f13e98c3b Author: Imran Rashid Date: 2017-08-28T19:52:15Z SPARK-12297. Table timezones. commit 53b9fbe0c6128ec11afdb46d3239c693129f6952 Author: Imran Rashid Date: 2017-09-14T20:18:46Z All dataformats support timezone correction. Move rules & tests to a more appropriate location. Ensure rule works without hive support. Extra checks on when table timezones are set. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18935: [SPARK-9104][CORE] Expose Netty memory metrics in Spark
Github user squito commented on the issue: https://github.com/apache/spark/pull/18935 lgtm any more thoughts @jiangxb1987 @zsxwing ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16992: [SPARK-19662][SCHEDULER][TEST] Add Fair Scheduler Unit T...
Github user squito commented on the issue: https://github.com/apache/spark/pull/16992 merged to master thanks @erenavsarogullari , sorry again for the delays --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18950#discussion_r134501736 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -724,6 +777,62 @@ private[spark] class ExecutorAllocationManager( } /** + * Calculate the maximum no. of concurrent tasks that can run currently. + */ +def getMaxConTasks(): Int = { + val stagesByJobGroup = stageIdToNumTasks.groupBy(x => jobIdToJobGroup(stageIdToJobId(x._1))) --- End diff -- I think this needs a comment explaining why you need to look at stages at all -- its not obvious why its necessary. (at first I was going to suggest the number of tasks left in a stage shouldn't matter, but then realized that it could in some scenarios) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18950#discussion_r134501323 --- Diff: core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala --- @@ -188,10 +192,195 @@ class ExecutorAllocationManagerSuite assert(numExecutorsTarget(manager) === 10) } + test("add executors capped by max concurrent tasks for a job group with single core executors") { +val conf = new SparkConf() + .setMaster("myDummyLocalExternalClusterManager") + .setAppName("test-executor-allocation-manager") + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") + .set("spark.job.group1.maxConcurrentTasks", "2") + .set("spark.job.group2.maxConcurrentTasks", "5") +val sc = new SparkContext(conf) +contexts += sc +sc.setJobGroup("group1", "", false) + +val manager = sc.executorAllocationManager.get +val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10)) +// Submit the job and stage start/submit events +sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, sc.getLocalProperties)) +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0))) + +// Verify that we're capped at number of max concurrent tasks in the stage +assert(maxNumExecutorsNeeded(manager) === 2) + +// Submit another stage in the same job +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1))) +assert(maxNumExecutorsNeeded(manager) === 2) + +sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0))) +sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1))) +sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded)) + +// Submit a new job in the same job group +val stage2 = createStageInfo(2, 20) +sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq{stage2}, sc.getLocalProperties)) --- End diff -- nit: still a few `Seq{}` you missed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18950#discussion_r134498890 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -724,6 +777,62 @@ private[spark] class ExecutorAllocationManager( } /** + * Calculate the maximum no. of concurrent tasks that can run currently. + */ +def getMaxConTasks(): Int = { + val stagesByJobGroup = stageIdToNumTasks.groupBy(x => jobIdToJobGroup(stageIdToJobId(x._1))) + + def getMaxConTasks(maxConTasks: Int, +stagesByJobGroupItr: Iterator[(String, mutable.HashMap[Int, Int])]): Int = { --- End diff -- style nit: with multi-line method definitions, each param goes on its own line, double-indented: ```scala def getMaxConTasks( maxConTasks: Int, stagesByJobGroupItr: Iterator[(String, mutable.HashMap[Int, Int])]): Int = { ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...
Github user squito commented on the issue: https://github.com/apache/spark/pull/18950 @dhruve looks like a real test failure --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18935: [SPARK-9104][CORE] Expose Netty memory metrics in Spark
Github user squito commented on the issue: https://github.com/apache/spark/pull/18935 thanks for the added info @jerryshao. (a) ok makes sense now about the use of this -- it is not exposed now, you plan to expose it in future changes. That is fine, however I would like to at least see a *plan* for how you want to expose this before we merge this. I think past efforts have gotten tripped on how we'd expose it in the driver; perhaps the simplest thing to do is expose it to the metric system? There just isn't any point in this unless we're going to do something with it. (b) thanks for including the example of the metrics. But one minor thing I noticed -- are you at all surprised that the direct & heap memory are exactly the same? Are things getting mixed up somewhere? Or maybe those are just the values from the initialization netty always does. I definitely think that is more metrics than most users care about, and would want verbose off by default. I am wondering if those metrics are useful at all ... but I guess they are pretty cheap, and maybe there is some scenario where a user would care about some of them, eg. `numHugeAllocations`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18887: [SPARK-20642][core] Store FsHistoryProvider listing data...
Github user squito commented on the issue: https://github.com/apache/spark/pull/18887 I futzed around for a while with trying to keep the old stuff around, and I realized it really would be quite a mess. The biggest problem is that the old rest api is just waay too tied into the UI, eg https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala#L26 , probably my fault in that implementation of the rest apis. You could keep the old code around, but would involve so much moving and refactoring that it seems pointless. I'm still looking at the other changes in the larger project, and I'd encourage other reviewers to do the same. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16992: [SPARK-19662][SCHEDULER][TEST] Add Fair Scheduler Unit T...
Github user squito commented on the issue: https://github.com/apache/spark/pull/16992 been a while so lets run tests again just to check: Jenkins, test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16992: [SPARK-19662][SCHEDULER][TEST] Add Fair Scheduler...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/16992#discussion_r134056863 --- Diff: docs/job-scheduling.md --- @@ -235,7 +235,7 @@ properties: of the cluster. By default, each pool's `minShare` is 0. The pool properties can be set by creating an XML file, similar to `conf/fairscheduler.xml.template`, -and setting a `spark.scheduler.allocation.file` property in your +and either setting `fairscheduler.xml` into classpath or a `spark.scheduler.allocation.file` property in your --- End diff -- super nit: ... and either putting a file named `fairscheduler.xml` on the classpath, or setting `spark.scheduler.allocation.file` ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18935: [SPARK-9104][CORE] Expose Netty memory metrics in...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18935#discussion_r134052216 --- Diff: common/network-common/src/main/java/org/apache/spark/network/util/NettyMemoryMetrics.java --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.util; + +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.*; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.MetricSet; +import com.google.common.annotations.VisibleForTesting; +import io.netty.buffer.PoolArenaMetric; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocatorMetric; + +/** + * A Netty memory metrics class to collect metrics from Netty PooledByteBufAllocator. + */ +public class NettyMemoryMetrics implements MetricSet { + + private final PooledByteBufAllocator pooledAllocator; + + private final boolean verboseMetricsEnabled; + + private final Map allMetrics; + + private final String metricPrefix; + + @VisibleForTesting + final static Set VERBOSE_METRICS = new HashSet<>(); + static { +VERBOSE_METRICS.addAll(Arrays.asList( + "numAllocations", + "numTinyAllocations", + "numSmallAllocations", + "numNormalAllocations", + "numHugeAllocations", + "numDeallocations", + "numTinyDeallocations", + "numSmallDeallocations", + "numNormalDeallocations", + "numHugeDeallocations", + "numActiveAllocations", + "numActiveTinyAllocations", + "numActiveSmallAllocations", + "numActiveNormalAllocations", + "numActiveHugeAllocations", + "numActiveBytes")); + } + + public NettyMemoryMetrics(PooledByteBufAllocator pooledAllocator, + String metricPrefix, + TransportConf conf) { +this.pooledAllocator = pooledAllocator; +this.allMetrics = new HashMap<>(); +this.metricPrefix = metricPrefix; +this.verboseMetricsEnabled = conf.verboseMetrics(); + +registerMetrics(this.pooledAllocator); + } + + private void registerMetrics(PooledByteBufAllocator allocator) { +PooledByteBufAllocatorMetric pooledAllocatorMetric = allocator.metric(); + +// Register general metrics. +allMetrics.put(MetricRegistry.name(metricPrefix, "usedHeapMemory"), + (Gauge) () -> pooledAllocatorMetric.usedHeapMemory()); +allMetrics.put(MetricRegistry.name(metricPrefix, "usedDirectMemory"), + (Gauge) () -> pooledAllocatorMetric.usedDirectMemory()); + +if (verboseMetricsEnabled) { + int directArenaIndex = 0; + for (PoolArenaMetric metric : pooledAllocatorMetric.directArenas()) { +registerArenaMetric(metric, "directArena" + directArenaIndex); +directArenaIndex++; + } + + int heapArenaIndex = 0; + for (PoolArenaMetric metric : pooledAllocatorMetric.heapArenas()) { +registerArenaMetric(metric, "heapArena" + heapArenaIndex); +heapArenaIndex++; + } +} + } + + private void registerArenaMetric(PoolArenaMetric arenaMetric, String arenaName) { +for (String methodName : VERBOSE_METRICS) { + Method m; + try { +m = PoolArenaMetric.class.getMethod(methodName); + } catch (Exception e) { +// Failed to find metric related method, ignore this metric. +continue; + } + + if (!Modifier.isPublic(m.getModifiers())) { +// Ignore non-public methods.
[GitHub] spark pull request #18935: [SPARK-9104][CORE] Expose Netty memory metrics in...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18935#discussion_r134051537 --- Diff: common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java --- @@ -106,6 +105,12 @@ public TransportClientFactory( conf.getModuleName() + "-client"); this.pooledAllocator = NettyUtils.createPooledByteBufAllocator( conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads()); +this.metrics = new NettyMemoryMetrics( --- End diff -- it looks to me like this doesn't get exposed at all for the user to query -- eg. I dont' see it getting pulled into the MetricSystem. is that happening somehow? or this exposed another way? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18935: [SPARK-9104][CORE] Expose Netty memory metrics in...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18935#discussion_r134050644 --- Diff: common/network-common/src/main/java/org/apache/spark/network/util/NettyMemoryMetrics.java --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.util; + +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.util.*; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.MetricSet; +import com.google.common.annotations.VisibleForTesting; +import io.netty.buffer.PoolArenaMetric; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocatorMetric; + +/** + * A Netty memory metrics class to collect metrics from Netty PooledByteBufAllocator. + */ +public class NettyMemoryMetrics implements MetricSet { + + private final PooledByteBufAllocator pooledAllocator; + + private final boolean verboseMetricsEnabled; + + private final Map allMetrics; + + private final String metricPrefix; + + @VisibleForTesting + final static Set VERBOSE_METRICS = new HashSet<>(); + static { +VERBOSE_METRICS.addAll(Arrays.asList( + "numAllocations", + "numTinyAllocations", + "numSmallAllocations", + "numNormalAllocations", + "numHugeAllocations", + "numDeallocations", + "numTinyDeallocations", + "numSmallDeallocations", + "numNormalDeallocations", + "numHugeDeallocations", + "numActiveAllocations", + "numActiveTinyAllocations", + "numActiveSmallAllocations", + "numActiveNormalAllocations", + "numActiveHugeAllocations", + "numActiveBytes")); + } + + public NettyMemoryMetrics(PooledByteBufAllocator pooledAllocator, + String metricPrefix, + TransportConf conf) { +this.pooledAllocator = pooledAllocator; +this.allMetrics = new HashMap<>(); +this.metricPrefix = metricPrefix; +this.verboseMetricsEnabled = conf.verboseMetrics(); + +registerMetrics(this.pooledAllocator); + } + + private void registerMetrics(PooledByteBufAllocator allocator) { +PooledByteBufAllocatorMetric pooledAllocatorMetric = allocator.metric(); + +// Register general metrics. +allMetrics.put(MetricRegistry.name(metricPrefix, "usedHeapMemory"), + (Gauge) () -> pooledAllocatorMetric.usedHeapMemory()); +allMetrics.put(MetricRegistry.name(metricPrefix, "usedDirectMemory"), + (Gauge) () -> pooledAllocatorMetric.usedDirectMemory()); + +if (verboseMetricsEnabled) { + int directArenaIndex = 0; + for (PoolArenaMetric metric : pooledAllocatorMetric.directArenas()) { +registerArenaMetric(metric, "directArena" + directArenaIndex); +directArenaIndex++; + } + + int heapArenaIndex = 0; + for (PoolArenaMetric metric : pooledAllocatorMetric.heapArenas()) { +registerArenaMetric(metric, "heapArena" + heapArenaIndex); +heapArenaIndex++; + } +} + } + + private void registerArenaMetric(PoolArenaMetric arenaMetric, String arenaName) { +for (String methodName : VERBOSE_METRICS) { + Method m; + try { +m = PoolArenaMetric.class.getMethod(methodName); + } catch (Exception e) { +// Failed to find metric related method, ignore this metric. +continue; + } + + if (!Modifier.isPublic(m.getModifiers())) { +// Ignore non-public methods.
[GitHub] spark issue #18887: [SPARK-20642][core] Store FsHistoryProvider listing data...
Github user squito commented on the issue: https://github.com/apache/spark/pull/18887 Just to keep others in the loop, Marcelo and I talked about this some offline. I think this PR itself is fine, but to me this is an important point in the larger history server project he's doing, so I'm going to take a look at the rest of the changes as well before I feel comfortable merging. Also he explained why it extremely complicated to keep both the old & new version available, which make sense to me, though I may poke at a version myself just to see how complex it is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r133725104 --- Diff: scalastyle-config.xml --- @@ -86,7 +86,7 @@ This file is divided into 3 sections: - + --- End diff -- ok makes sense. I wasn't paying enough attention to the actual change at first, thought you were just turning the rule entirely off, but I agree this changes makes sense. btw I was curious why `package object config` was OK -- its just a hard-coded special case: https://github.com/scalastyle/scalastyle/blob/master/src/main/scala/org/scalastyle/scalariform/ClassNamesChecker.scala#L74 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18887: [SPARK-20642][core] Store FsHistoryProvider listing data...
Github user squito commented on the issue: https://github.com/apache/spark/pull/18887 >> I feel like we should have one version where the old code is still available, controlled by a feature flag. >I'm not sure exactly what you're suggesting. The default behavior is still, as much as possible, the old one: everything is kept in memory. Keeping the exact old code in place would mean forking FsHistoryProvider, which is not something I see as desirable. long-term, definitely not desirable. My concern is that there is a lot of new code taking effect here, on some important functionality -- we don't want some bug to prevent adoption of 2.3. For one version, you could leave the old one available, rename it to something else, and put a big disclaimer in the code that its obsolete, and as long as thing are smooth for 2.3 delete it entirely for 2.4. The HS isn't as core or tricky as the network module but I'm thinking of this like netty vs. nio. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r133512286 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -316,25 +350,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { val newLastScanTime = getNewLastScanTime() logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime") - val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) -.getOrElse(Seq.empty[FileStatus]) + val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil) // scan for modified applications, replay and merge them - val logInfos: Seq[FileStatus] = statusList + val logInfos = statusList .filter { entry => - val fileInfo = fileToAppInfo.get(entry.getPath()) - val prevFileSize = if (fileInfo != null) fileInfo.fileSize else 0L !entry.isDirectory() && // FsHistoryProvider generates a hidden file which can't be read. Accidentally // reading a garbage file is safe, but we would log an error which can be scary to // the end-user. !entry.getPath().getName().startsWith(".") && -prevFileSize < entry.getLen() && -SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) +SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) && +recordedFileSize(entry.getPath()) < entry.getLen() } .flatMap { entry => Some(entry) } --- End diff -- realize this isn't your change, but what is the point of this? isn't it a no-op? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r133517697 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -742,53 +696,146 @@ private[history] object FsHistoryProvider { private val APPL_END_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationEnd\"" private val LOG_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerLogStart\"" + + /** Current version of the data written to the listing database. */ + private val CURRENT_LISTING_VERSION = 1L } /** - * Application attempt information. - * - * @param logPath path to the log file, or, for a legacy log, its directory - * @param name application name - * @param appId application ID - * @param attemptId optional attempt ID - * @param startTime start time (from playback) - * @param endTime end time (from playback). -1 if the application is incomplete. - * @param lastUpdated the modification time of the log file when this entry was built by replaying - *the history. - * @param sparkUser user running the application - * @param completed flag to indicate whether or not the application has completed. - * @param fileSize the size of the log file the last time the file was scanned for changes + * A KVStoreSerializer that provides Scala types serialization too, and uses the same options as + * the API serializer. */ -private class FsApplicationAttemptInfo( +private class KVStoreScalaSerializer extends KVStoreSerializer { + + mapper.registerModule(DefaultScalaModule) + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL) + mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat) + +} + +private[history] case class KVStoreMetadata( + val version: Long, + val logDir: String) + +private[history] case class LogInfo( + @KVIndexParam val logPath: String, + val fileSize: Long) + +private[history] class AttemptInfoWrapper( +val info: v1.ApplicationAttemptInfo, val logPath: String, -val name: String, -val appId: String, -attemptId: Option[String], -startTime: Long, -endTime: Long, -lastUpdated: Long, -sparkUser: String, -completed: Boolean, -val fileSize: Long, -appSparkVersion: String) - extends ApplicationAttemptInfo( - attemptId, startTime, endTime, lastUpdated, sparkUser, completed, appSparkVersion) { - - /** extend the superclass string value with the extra attributes of this class */ - override def toString: String = { -s"FsApplicationAttemptInfo($name, $appId," + - s" ${super.toString}, source=$logPath, size=$fileSize" +val fileSize: Long) { + + def toAppAttemptInfo(): ApplicationAttemptInfo = { +ApplicationAttemptInfo(info.attemptId, info.startTime.getTime(), + info.endTime.getTime(), info.lastUpdated.getTime(), info.sparkUser, + info.completed, info.appSparkVersion) } + } -/** - * Application history information - * @param id application ID - * @param name application name - * @param attempts list of attempts, most recent first. - */ -private class FsApplicationHistoryInfo( -id: String, -override val name: String, -override val attempts: List[FsApplicationAttemptInfo]) - extends ApplicationHistoryInfo(id, name, attempts) +private[history] class ApplicationInfoWrapper( +val info: v1.ApplicationInfo, +val attempts: List[AttemptInfoWrapper]) { + + @JsonIgnore @KVIndexParam + def id: String = info.id + + @JsonIgnore @KVIndexParam("endTime") + def endTime(): Long = attempts.head.info.endTime.getTime() + + @JsonIgnore @KVIndexParam("oldestAttempt") + def oldestAttempt(): Long = attempts.map(_.info.lastUpdated.getTime()).min + + def toAppHistoryInfo(): ApplicationHistoryInfo = { +ApplicationHistoryInfo(info.id, info.name, attempts.map(_.toAppAttemptInfo())) + } + + def toApiInfo(): v1.ApplicationInfo = { +new v1.ApplicationInfo(info.id, info.name, info.coresGranted, info.maxCores, + info.coresPerExecutor, info.memoryPerExecutorMB, attempts.map(_.info)) + } + +} + +private[history] class AppListingListener(log: FileStatus, clock: Clock) extends SparkListener { + + private val app = new MutableApplicationInfo() + private val attempt = new MutableAttemptInfo(log.getPath().getName(), log.getLen()) + + override def onApplicationStart(event: SparkListenerApplicationStart): Unit = { +app.id = event.
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r133510977 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -301,6 +334,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } override def stop(): Unit = { +listing.close() --- End diff -- if this throws an exception, should we still try to cleanup `initThread`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r133511528 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -117,17 +122,37 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // used for logging msgs (logs are re-scanned based on file size, rather than modtime) private val lastScanTime = new java.util.concurrent.atomic.AtomicLong(-1) - // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted - // into the map in order, so the LinkedHashMap maintains the correct ordering. - @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo] -= new mutable.LinkedHashMap() + private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0) - val fileToAppInfo = new ConcurrentHashMap[Path, FsApplicationAttemptInfo]() + private val storePath = conf.get(LOCAL_STORE_DIR) - // List of application logs to be deleted by event log cleaner. - private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo] + private val listing = storePath.map { path => --- End diff -- given the large initializer, could you add an explicit type annotation to `listing` to help the reader? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18950#discussion_r133505348 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -1214,6 +1214,101 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg verify(taskSetManagerSpy, times(1)).addPendingTask(anyInt()) } + test("limit max concurrent running tasks in a job group when configured ") { +val conf = new SparkConf(). + set(config.BLACKLIST_ENABLED, true). + set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max concurrent tasks to 2 + +sc = new SparkContext("local", "test", conf) +sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) +val props = new Properties(); +props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // set the job group + +val tasks = Array.tabulate[Task[_]](10) { i => + new FakeTask(0, i, Nil) +} +val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, props), 2) + +// make some offers to our taskset +var taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + // offer each executor twice (simulating 2 cores per executor) + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up to maxConcurrentTasks. + +// make 4 more offers +val taskDescs2 = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs2.size === 0) // tsm doesn't accept any as it is already running at max tasks + +// inform tsm that one task has completed +val directTaskResult = new DirectTaskResult[String](null, Seq()) { --- End diff -- you can use `createTaskResult` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18950#discussion_r133504041 --- Diff: core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala --- @@ -188,6 +188,125 @@ class ExecutorAllocationManagerSuite assert(numExecutorsTarget(manager) === 10) } + test("add executors capped by max concurrent tasks for a job group with single core executors") { +val conf = new SparkConf() + .setMaster("myDummyLocalExternalClusterManager") + .setAppName("test-executor-allocation-manager") + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") + .set("spark.job.group1.maxConcurrentTasks", "2") + .set("spark.job.group2.maxConcurrentTasks", "5") +val sc = new SparkContext(conf) +contexts += sc +sc.setJobGroup("group1", "", false) + +val manager = sc.executorAllocationManager.get +val stage0 = createStageInfo(0, 10) +// Submit the job and stage start/submit events +sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq{stage0}, sc.getLocalProperties)) --- End diff -- nit: parens, not braces: `Seq(stage0)` (throughout this test) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18950#discussion_r133504438 --- Diff: core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala --- @@ -188,6 +188,125 @@ class ExecutorAllocationManagerSuite assert(numExecutorsTarget(manager) === 10) } + test("add executors capped by max concurrent tasks for a job group with single core executors") { +val conf = new SparkConf() + .setMaster("myDummyLocalExternalClusterManager") + .setAppName("test-executor-allocation-manager") + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") + .set("spark.job.group1.maxConcurrentTasks", "2") + .set("spark.job.group2.maxConcurrentTasks", "5") +val sc = new SparkContext(conf) +contexts += sc +sc.setJobGroup("group1", "", false) + +val manager = sc.executorAllocationManager.get +val stage0 = createStageInfo(0, 10) +// Submit the job and stage start/submit events +sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq{stage0}, sc.getLocalProperties)) +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0)) + +// Verify that we're capped at number of max concurrent tasks in the stage +assert(maxNumExecutorsNeeded(manager) === 2) + +// Submit another stage in the same job +val stage1 = createStageInfo(1, 10) +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1)) +assert(maxNumExecutorsNeeded(manager) === 2) + +sc.listenerBus.postToAll(SparkListenerStageCompleted(stage0)) +sc.listenerBus.postToAll(SparkListenerStageCompleted(stage1)) +sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded)) + +// Submit a new job in the same job group +val stage2 = createStageInfo(2, 20) +sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq{stage2}, sc.getLocalProperties)) +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2)) +assert(maxNumExecutorsNeeded(manager) === 2) + +sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2)) +sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded)) + +// Set another jobGroup +sc.setJobGroup("group2", "", false) + +val stage3 = createStageInfo(3, 20) +sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq{stage3}, sc.getLocalProperties)) +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3)) +assert(maxNumExecutorsNeeded(manager) === 5) + +sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3)) +sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded)) + +// Clear jobGroup +sc.clearJobGroup() + +val stage4 = createStageInfo(4, 50) +sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq{stage4}, sc.getLocalProperties)) +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4)) +assert(maxNumExecutorsNeeded(manager) === 50) + } + + test("add executors capped by max concurrent tasks for a job group with multi cores executors") { +val conf = new SparkConf() + .setMaster("myDummyLocalExternalClusterManager") + .setAppName("test-executor-allocation-manager") + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.dynamicAllocation.testing", "true") + .set("spark.job.group1.maxConcurrentTasks", "2") + .set("spark.job.group2.maxConcurrentTasks", "5") + .set("spark.executor.cores", "3") +val sc = new SparkContext(conf) +contexts += sc +sc.setJobGroup("group1", "", false) + +val manager = sc.executorAllocationManager.get +val stage0 = createStageInfo(0, 10) +// Submit the job and stage start/submit events +sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq{stage0}, sc.getLocalProperties)) +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0)) + +// Verify that we're capped at number of max concurrent tasks in the stage +assert(maxNumExecutorsNeeded(manager) === 1) + +// Submit another stage in the same job +val stage1 = createStageInfo(1, 10) +sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1)) +assert(maxNumExecutorsNeeded(manager) === 1) + +sc.listenerBus.postToAll(SparkListenerStageCompleted(stage0)) +sc
[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18950#discussion_r133502797 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -314,7 +316,7 @@ private[spark] class ExecutorAllocationManager( // Do not change our target while we are still initializing, // Otherwise the first job may have to ramp up unnecessarily 0 -} else if (maxNeeded < numExecutorsTarget) { +} else if (maxNeeded <= numExecutorsTarget) { // The target number exceeds the number we actually need, so stop adding new --- End diff -- yeah I don't think this change hurts, but unless you have a reason for changing it, I'd prefer to leave it just for when we need to dig through history. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18950#discussion_r133499828 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -602,6 +604,21 @@ private[spark] class ExecutorAllocationManager( // place the executors. private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] +override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + val jobGroupId = if (jobStart.properties != null) { +jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + } else { +"" + } + val maxConcurrentTasks = conf.getInt(s"spark.job.$jobGroupId.maxConcurrentTasks", +Int.MaxValue) + + logInfo(s"Setting maximum concurrent tasks for group: ${jobGroupId} to $maxConcurrentTasks") + allocationManager.synchronized { +allocationManager.maxConcurrentTasks = maxConcurrentTasks --- End diff -- yeah mark is right. after all, that is what separates a job group property from a global property for the entire spark context. I see why this is desirable for the most common case, of just running one job at a time, but to get this to work with multiple concurrent jobs (& job groups) you need to track a map from jobGroup -> maxConcurrency, and then sum that up (handling overflow for Int.MaxValue) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r133316362 --- Diff: scalastyle-config.xml --- @@ -86,7 +86,7 @@ This file is divided into 3 sections: - + --- End diff -- Agree it makes more sense to make the whole thing `private[spark]`, but couldn't you just turn the rule off around this object, like we do for println? (Also I'm confused why `package object config` doesn't violate the rule). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18950#discussion_r133311545 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -602,6 +604,21 @@ private[spark] class ExecutorAllocationManager( // place the executors. private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] +override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + val jobGroupId = if (jobStart.properties != null) { +jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + } else { +"" + } + val maxConcurrentTasks = conf.getInt(s"spark.job.$jobGroupId.maxConcurrentTasks", +Int.MaxValue) + + logInfo(s"Setting maximum concurrent tasks for group: ${jobGroupId} to $maxConcurrentTasks") + allocationManager.synchronized { +allocationManager.maxConcurrentTasks = maxConcurrentTasks --- End diff -- if you have multiple jobs running concurrently, you are not keeping a separate maxConcurrentTasks for each one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18950#discussion_r133311882 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -454,64 +477,68 @@ private[spark] class TaskSetManager( } } - dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) => -// Found a task; do some bookkeeping and return a task description -val task = tasks(index) -val taskId = sched.newTaskId() -// Do various bookkeeping -copiesRunning(index) += 1 -val attemptNum = taskAttempts(index).size -val info = new TaskInfo(taskId, index, attemptNum, curTime, - execId, host, taskLocality, speculative) -taskInfos(taskId) = info -taskAttempts(index) = info :: taskAttempts(index) -// Update our locality level for delay scheduling -// NO_PREF will not affect the variables related to delay scheduling -if (maxLocality != TaskLocality.NO_PREF) { - currentLocalityIndex = getLocalityIndex(taskLocality) - lastLaunchTime = curTime -} -// Serialize and return the task -val serializedTask: ByteBuffer = try { - ser.serialize(task) -} catch { - // If the task cannot be serialized, then there's no point to re-attempt the task, - // as it will always fail. So just abort the whole task-set. - case NonFatal(e) => -val msg = s"Failed to serialize task $taskId, not attempting to retry it." -logError(msg, e) -abort(s"$msg Exception during serialization: $e") -throw new TaskNotSerializableException(e) -} -if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 && - !emittedTaskSizeWarning) { - emittedTaskSizeWarning = true - logWarning(s"Stage ${task.stageId} contains a task of very large size " + -s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " + -s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.") -} -addRunningTask(taskId) - -// We used to log the time it takes to serialize the task, but task size is already -// a good proxy to task serialization time. -// val timeTaken = clock.getTime() - startTime -val taskName = s"task ${info.id} in stage ${taskSet.id}" -logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " + - s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)") - -sched.dagScheduler.taskStarted(task, info) -new TaskDescription( - taskId, - attemptNum, - execId, - taskName, - index, - addedFiles, - addedJars, - task.localProperties, - serializedTask) + dequeueTask(execId, host, allowedLocality).map { --- End diff -- unintentional reformat? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18874: [SPARK-21656][CORE] spark dynamic allocation should not ...
Github user squito commented on the issue: https://github.com/apache/spark/pull/18874 @srowen you have a good point about a case that becomes worse after this change. Still I think this change is better on balance. btw, there are more even more odd cases with dynamic allocation right now -- the one that I've seen most often is if you have a run really short tasks, but all in sequence, you probably won't release any executors. Say you first run some really large job with 10k tasks, and so you request a bunch of executors. After that, you only ever run 100 tasks at a time, so you could release a bunch of resources. But in 60 seconds, its enough time for a bunch of short stages to execute. Each stage chooses a random set of executors to run on. So no executor is ever idle for 60 seconds. Perhaps we could also fix that in some larger change which more tightly integrated dynamic allocation into the scheduler that Tom was alluding to. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18874: [SPARK-21656][CORE] spark dynamic allocation should not ...
Github user squito commented on the issue: https://github.com/apache/spark/pull/18874 This change makes sense to me. Tom's last comment about resetting that timeout every time one task is scheduled I think explains how you get in this situation and why you don't actually want to change those confs. A few months back I was looking at that and chatted with Kay about why that was the case -- I forget the details off the top of my head, but at least remember there was some reason why it wasn't trivial to change. So I think this is the right immediate fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18739: [WIP][SPARK-21539][CORE] Job should not be aborte...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18739#discussion_r129849052 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -665,10 +667,15 @@ private[spark] class TaskSetManager( } } if (blacklistedEverywhere) { -val partition = tasks(indexInTaskSet).partitionId -abort(s"Aborting $taskSet because task $indexInTaskSet (partition $partition) " + - s"cannot run anywhere due to node and executor blacklist. Blacklisting behavior " + - s"can be configured via spark.blacklist.*.") +val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false) +val mayAllocateNewExecutor = + conf.getInt("spark.executor.instances", -1) > currentExecutorNumber +if (!dynamicAllocationEnabled && !mayAllocateNewExecutor) { --- End diff -- the reason we do wait until the task set has finished is that before that, we have no idea whether the failure is the fault of the user-code (or bad input data etc.), or its actually a fault with the node / executor. Our only piece of information on that is when the task that fails on one executor, and then succeeds elsewhere, then we assume that its the failure was the fault of the original executor (though this heuristic also has false-positives, from what I've seen so far it seems tolerable.) I have also thought of having this wait some amount of time rather than killing the taskset immediately, to see if another executor comes up. However, there are some complications with that as well. I think this is all captured in the discussion on SPARK-15815, that actually discusses one of the trickiest cases -- just one task remaining with Dynamic Allocation, and all other executors have been killed b/c they were idle. Take a look at that jira. If it summarizes things, then we can close SPARK-21539 as a duplicate and continue discussion on SPARK-15815. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18739: [WIP][SPARK-21539][CORE] Job should not be aborte...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18739#discussion_r129749281 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -644,10 +644,12 @@ private[spark] class TaskSetManager( } pendingTask.foreach { indexInTaskSet => + var currentExecutorNumber = 0 --- End diff -- `numBlacklistedExecutors` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18739: [WIP][SPARK-21539][CORE] Job should not be aborte...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/18739#discussion_r129749722 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala --- @@ -665,10 +667,15 @@ private[spark] class TaskSetManager( } } if (blacklistedEverywhere) { -val partition = tasks(indexInTaskSet).partitionId -abort(s"Aborting $taskSet because task $indexInTaskSet (partition $partition) " + - s"cannot run anywhere due to node and executor blacklist. Blacklisting behavior " + - s"can be configured via spark.blacklist.*.") +val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false) +val mayAllocateNewExecutor = + conf.getInt("spark.executor.instances", -1) > currentExecutorNumber +if (!dynamicAllocationEnabled && !mayAllocateNewExecutor) { --- End diff -- but even with dynamic allocation, you might still want this, right? Are you hoping that with dynamic allocation, even if everything is blacklisted, eventually the executors will go idle, get torn down, and then new executors will get created since you still have tasks left? On large clusters, this seems desirable. There are weird cases with small clusters though ... suppose the cluster only has two nodes, and you end up blacklisting both nodes (with such a small cluster, that can happen just because tasks fail from poor user code). Then with this change, you'll go back to having the job sit idle for a long time, just waiting for the blacklist to timeout. I agree the current solution isn't great, but I don't know if this really improves things. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17902: [SPARK-20641][core] Add key-value store abstraction and ...
Github user squito commented on the issue: https://github.com/apache/spark/pull/17902 merged to master --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17902: [SPARK-20641][core] Add key-value store abstraction and ...
Github user squito commented on the issue: https://github.com/apache/spark/pull/17902 fair enough on jackson. add the log for the seed please, other than that, lgtm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17113: [SPARK-13669][SPARK-20898][Core] Improve the blacklist m...
Github user squito commented on the issue: https://github.com/apache/spark/pull/17113 I'm fine with this, just a couple asks: 1) is there a test for SPARK-20898? I know at some point in the development of killing blacklisted executors we tested on yarn, I'm really disappointed (in myself) for merging it though it didn't actually work. It would be nice to have a regression test. Maybe that would be a huge pain to do, just wanted to see if you thought about it. 2) Can you update the jira description for SPARK-13669 now? I think a lot has changed since that was initially opened. Also IIUC, we don't have any particular advice on when users should enable this at the moment, its just an escape hatch you'd like to leave in place? If there is any advice, putting it in the jira would also make sense. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/17902#discussion_r119904509 --- Diff: common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java --- @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Random; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import org.apache.commons.io.FileUtils; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.*; + +public abstract class DBIteratorSuite { + + private static final int MIN_ENTRIES = 42; + private static final int MAX_ENTRIES = 1024; + private static final Random RND = new Random(); + + private static List allEntries; + private static List clashingEntries; + private static KVStore db; + + private static interface BaseComparator extends Comparator { +/** + * Returns a comparator that falls back to natural order if this comparator's ordering + * returns equality for two elements. Used to mimic how the index sorts things internally. + */ +default BaseComparator fallback() { + return (t1, t2) -> { +int result = BaseComparator.this.compare(t1, t2); +if (result != 0) { + return result; +} + +return t1.key.compareTo(t2.key); + }; +} + +/** Reverses the order of this comparator. */ +default BaseComparator reverse() { + return (t1, t2) -> -BaseComparator.this.compare(t1, t2); +} + } + + private static final BaseComparator NATURAL_ORDER = (t1, t2) -> t1.key.compareTo(t2.key); + private static final BaseComparator REF_INDEX_ORDER = (t1, t2) -> t1.id.compareTo(t2.id); + private static final BaseComparator COPY_INDEX_ORDER = (t1, t2) -> t1.name.compareTo(t2.name); + private static final BaseComparator NUMERIC_INDEX_ORDER = (t1, t2) -> t1.num - t2.num; + private static final BaseComparator CHILD_INDEX_ORDER = (t1, t2) -> t1.child.compareTo(t2.child); + + /** + * Implementations should override this method; it is called only once, before all tests are + * run. Any state can be safely stored in static variables and cleaned up in a @AfterClass + * handler. + */ + protected abstract KVStore createStore() throws Exception; + + @AfterClass + public static void cleanupData() throws Exception { +allEntries = null; +db = null; + } + + @Before + public void setup() throws Exception { +if (db != null) { --- End diff -- for later debugging, it would be helpful to log the random generator seed here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/17902#discussion_r119652958 --- Diff: common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java --- @@ -0,0 +1,500 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +import java.util.Arrays; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Random; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import org.apache.commons.io.FileUtils; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.*; + +public abstract class DBIteratorSuite { + + private static final int MIN_ENTRIES = 42; + private static final int MAX_ENTRIES = 1024; + private static final Random RND = new Random(); + + private static List allEntries; + private static List clashingEntries; + private static KVStore db; + + private static interface BaseComparator extends Comparator { +/** + * Returns a comparator that falls back to natural order if this comparator's ordering + * returns equality for two elements. Used to mimic how the index sorts things internally. + */ +default BaseComparator fallback() { + return (t1, t2) -> { +int result = BaseComparator.this.compare(t1, t2); +if (result != 0) { + return result; +} + +return t1.key.compareTo(t2.key); + }; +} + +/** Reverses the order of this comparator. */ +default BaseComparator reverse() { + return (t1, t2) -> -BaseComparator.this.compare(t1, t2); +} + } + + private static final BaseComparator NATURAL_ORDER = (t1, t2) -> t1.key.compareTo(t2.key); + private static final BaseComparator REF_INDEX_ORDER = (t1, t2) -> t1.id.compareTo(t2.id); + private static final BaseComparator COPY_INDEX_ORDER = (t1, t2) -> t1.name.compareTo(t2.name); + private static final BaseComparator NUMERIC_INDEX_ORDER = (t1, t2) -> t1.num - t2.num; + private static final BaseComparator CHILD_INDEX_ORDER = (t1, t2) -> t1.child.compareTo(t2.child); + + /** + * Implementations should override this method; it is called only once, before all tests are + * run. Any state can be safely stored in static variables and cleaned up in a @AfterClass + * handler. + */ + protected abstract KVStore createStore() throws Exception; + + @AfterClass + public static void cleanupData() throws Exception { +allEntries = null; +db = null; + } + + @Before + public void setup() throws Exception { +if (db != null) { + return; +} + +db = createStore(); + +int count = RND.nextInt(MAX_ENTRIES) + MIN_ENTRIES; + +// Instead of generating sequential IDs, generate random unique IDs to avoid the insertion +// order matching the natural ordering. Just in case. +boolean[] usedIDs = new boolean[count]; + +allEntries = new ArrayList<>(count); +for (int i = 0; i < count; i++) { + CustomType1 t = new CustomType1(); + + int id; + do { +id = RND.nextInt(count); + } while (usedIDs[id]); --- End diff -- I know this doesn't really matter, but this is O(n^2), listing the ids and then using https://docs.oracle.com/javase/8/docs/api/java/util/Collections.html#shuffle-java.util.List-java.util.Random- would be O(n) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not h
[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/17902#discussion_r119657514 --- Diff: common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBTypeInfoSuite.java --- @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import org.junit.Test; +import static org.junit.Assert.*; + +public class LevelDBTypeInfoSuite { + + @Test + public void testIndexAnnotation() throws Exception { +KVTypeInfo ti = new KVTypeInfo(CustomType1.class); +assertEquals(5, ti.indices().count()); + +CustomType1 t1 = new CustomType1(); +t1.key = "key"; +t1.id = "id"; +t1.name = "name"; +t1.num = 42; +t1.child = "child"; + +assertEquals(t1.key, ti.getIndexValue(KVIndex.NATURAL_INDEX_NAME, t1)); +assertEquals(t1.id, ti.getIndexValue("id", t1)); +assertEquals(t1.name, ti.getIndexValue("name", t1)); +assertEquals(t1.num, ti.getIndexValue("int", t1)); +assertEquals(t1.child, ti.getIndexValue("child", t1)); + } + + @Test(expected = IllegalArgumentException.class) + public void testNoNaturalIndex() throws Exception { +newTypeInfo(NoNaturalIndex.class); + } + + @Test(expected = IllegalArgumentException.class) + public void testDuplicateIndex() throws Exception { +newTypeInfo(DuplicateIndex.class); + } + + @Test(expected = IllegalArgumentException.class) + public void testEmptyIndexName() throws Exception { +newTypeInfo(EmptyIndexName.class); + } + + @Test(expected = IllegalArgumentException.class) + public void testIllegalIndexName() throws Exception { +newTypeInfo(IllegalIndexName.class); + } + + @Test(expected = IllegalArgumentException.class) + public void testIllegalIndexMethod() throws Exception { +newTypeInfo(IllegalIndexMethod.class); + } + + @Test + public void testKeyClashes() throws Exception { +LevelDBTypeInfo ti = newTypeInfo(CustomType1.class); + +CustomType1 t1 = new CustomType1(); +t1.key = "key1"; +t1.name = "a"; + +CustomType1 t2 = new CustomType1(); +t2.key = "key2"; +t2.name = "aa"; + +CustomType1 t3 = new CustomType1(); +t3.key = "key3"; +t3.name = "aaa"; + +// Make sure entries with conflicting names are sorted correctly. +assertBefore(ti.index("name").entityKey(null, t1), ti.index("name").entityKey(null, t2)); +assertBefore(ti.index("name").entityKey(null, t1), ti.index("name").entityKey(null, t3)); +assertBefore(ti.index("name").entityKey(null, t2), ti.index("name").entityKey(null, t3)); + } + + @Test + public void testNumEncoding() throws Exception { +LevelDBTypeInfo.Index idx = newTypeInfo(CustomType1.class).indices().iterator().next(); + +assertEquals("+=0001", new String(idx.toKey(1), UTF_8)); +assertEquals("+=0010", new String(idx.toKey(16), UTF_8)); +assertEquals("+=7fff", new String(idx.toKey(Integer.MAX_VALUE), UTF_8)); + +assertBefore(idx.toKey(1), idx.toKey(2)); +assertBefore(idx.toKey(-1), idx.toKey(2)); +assertBefore(idx.toKey(-11), idx.toKey(2)); +assertBefore(idx.toKey(-11), idx.toKey(-1)); +assertBefore(idx.toKey(1), idx.toKey(11)); +assertBefore(idx.toKey(Integer.MIN_VALUE), idx.toKey(Integer.MAX_VALUE)); + +assertBefore(idx.toKey(1L), idx.toKey(2L)); +assertBefore(idx.toKey(-1L), idx.toKey(2L)); +assertBefore(idx.toKey(Long.MIN_VALUE), idx.toKey(Long.MAX_VALUE)); + +
[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/17902#discussion_r119651685 --- Diff: common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java --- @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import org.iq80.leveldb.WriteBatch; + +/** + * Holds metadata about app-specific types stored in LevelDB. Serves as a cache for data collected + * via reflection, to make it cheaper to access it multiple times. + * + * + * The hierarchy of keys stored in LevelDB looks roughly like the following. This hierarchy ensures + * that iteration over indices is easy, and that updating values in the store is not overly + * expensive. Of note, indices choose using more disk space (one value per key) instead of keeping + * lists of pointers, which would be more expensive to update at runtime. + * + * + * + * Indentation defines when a sub-key lives under a parent key. In LevelDB, this means the full + * key would be the concatenation of everything up to that point in the hierarchy, with each + * component separated by a NULL byte. + * + * + * + * +TYPE_NAME + * NATURAL_INDEX + * +NATURAL_KEY + * - + * -NATURAL_INDEX + * INDEX_NAME + * +INDEX_VALUE + * +NATURAL_KEY + * -INDEX_VALUE + * .INDEX_VALUE + * CHILD_INDEX_NAME + * +CHILD_INDEX_VALUE + * NATURAL_KEY_OR_DATA + * - + * -INDEX_NAME + * + * + * + * Entity data (either the entity's natural key or a copy of the data) is stored in all keys + * that end with "+". A count of all objects that match a particular top-level index + * value is kept at the end marker ("-"). A count is also kept at the natural index's end + * marker, to make it easy to retrieve the number of all elements of a particular type. + * + * + * + * To illustrate, given a type "Foo", with a natural index and a second index called "bar", you'd + * have these keys and values in the store for two instances, one with natural key "key1" and the + * other "key2", both with value "yes" for "bar": + * + * + * + * Foo __main__ +key1 [data for instance 1] + * Foo __main__ +key2 [data for instance 2] + * Foo __main__ - [count of all Foo] + * Foo bar +yes +key1 [instance 1 key or data, depending on index type] + * Foo bar +yes +key2 [instance 2 key or data, depending on index type] + * Foo bar +yes - [count of all Foo with "bar=yes" ] + * + * + * + * Note that all indexed values are prepended with "+", even if the index itself does not have an + * explicit end marker. This allows for easily skipping to the end of an index by telling LevelDB + * to seek to the "phantom" end marker of the index. + * + * + * + * Child indices are stored after their parent index. In the example above, let's assume there is + * a child index "child", whose parent is "bar". If both instances have value "no" for this field, + * the data in the store would look something like the following: + * + * + * + * ... + * Foo bar +yes - + * Foo bar .yes .child +no +key1 [instance 1 key or data, depending on index type] + * Foo bar .yes .ch
[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/17902#discussion_r119649224 --- Diff: common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java --- @@ -0,0 +1,502 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +import java.lang.reflect.Array; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import org.iq80.leveldb.WriteBatch; + +/** + * Holds metadata about app-specific types stored in LevelDB. Serves as a cache for data collected + * via reflection, to make it cheaper to access it multiple times. + * + * + * The hierarchy of keys stored in LevelDB looks roughly like the following. This hierarchy ensures + * that iteration over indices is easy, and that updating values in the store is not overly + * expensive. Of note, indices choose using more disk space (one value per key) instead of keeping + * lists of pointers, which would be more expensive to update at runtime. + * + * + * + * Indentation defines when a sub-key lives under a parent key. In LevelDB, this means the full + * key would be the concatenation of everything up to that point in the hierarchy, with each + * component separated by a NULL byte. + * + * + * + * +TYPE_NAME + * NATURAL_INDEX + * +NATURAL_KEY + * - + * -NATURAL_INDEX + * INDEX_NAME + * +INDEX_VALUE + * +NATURAL_KEY + * -INDEX_VALUE + * .INDEX_VALUE + * CHILD_INDEX_NAME + * +CHILD_INDEX_VALUE + * NATURAL_KEY_OR_DATA + * - + * -INDEX_NAME + * + * + * + * Entity data (either the entity's natural key or a copy of the data) is stored in all keys + * that end with "+". A count of all objects that match a particular top-level index + * value is kept at the end marker ("-"). A count is also kept at the natural index's end + * marker, to make it easy to retrieve the number of all elements of a particular type. + * + * + * + * To illustrate, given a type "Foo", with a natural index and a second index called "bar", you'd + * have these keys and values in the store for two instances, one with natural key "key1" and the + * other "key2", both with value "yes" for "bar": + * + * + * + * Foo __main__ +key1 [data for instance 1] + * Foo __main__ +key2 [data for instance 2] --- End diff -- one thing I had some trouble keeping straight as I read through this was the difference between an index "key" and an index "value". Normally i think of "value" as what you are calling "data" here. It seems like you are calling the index value just final component "+key1", while the key refers to the entire thing "Foo __main__ +key1". Is that right? It might also help adding comments on `entityKey()` and `getValue()` as well --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/17902#discussion_r119408662 --- Diff: common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +import java.util.Iterator; +import java.util.Map; + +import com.google.common.base.Preconditions; + +/** + * A configurable view that allows iterating over values in a {@link KVStore}. + * + * + * The different methods can be used to configure the behavior of the iterator. Calling the same + * method multiple times is allowed; the most recent value will be used. + * + * + * + * The iterators returns by this view are of type {@link KVStoreIterator}; they auto-close + * when used in a for loop that exhausts their contents, but when used manually, they need + * to be closed explicitly unless all elements are read. + * + */ +public abstract class KVStoreView implements Iterable { + + final Class type; + + boolean ascending = true; + String index = KVIndex.NATURAL_INDEX_NAME; + Object first = null; + Object last = null; + Object parent = null; + long skip = 0L; + long max = Long.MAX_VALUE; + + public KVStoreView(Class type) { +this.type = type; + } + + /** + * Reverses the order of iteration. By default, iterates in ascending order. + */ + public KVStoreView reverse() { +ascending = !ascending; +return this; + } + + /** + * Iterates according to the given index. + */ + public KVStoreView index(String name) { +this.index = Preconditions.checkNotNull(name); +return this; + } + + /** + * Defines the value of the parent index when iterating over a child index. Only elements that + * match the parent index's value will be included in the iteration. + * + * + * Required for iterating over child indices, will generate an error if iterating over a + * parent-less index. + * + */ + public KVStoreView parent(Object value) { +this.parent = value; +return this; + } + + /** + * Iterates starting at the given value of the chosen index. + */ + public KVStoreView first(Object value) { +this.first = value; +return this; + } + + /** + * Stops iteration at the given value of the chosen index. --- End diff -- would be nice to clarify whether the matching element is included or not. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/17902#discussion_r119409655 --- Diff: common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; + +import com.google.common.base.Preconditions; + +/** + * Wrapper around types managed in a KVStore, providing easy access to their indexed fields. + */ +public class KVTypeInfo { + + private final Class type; + private final Map indices; + private final Map accessors; + + public KVTypeInfo(Class type) throws Exception { +this.type = type; +this.accessors = new HashMap<>(); +this.indices = new HashMap<>(); + +for (Field f : type.getFields()) { + KVIndex idx = f.getAnnotation(KVIndex.class); + if (idx != null) { +checkIndex(idx, indices); +indices.put(idx.value(), idx); +accessors.put(idx.value(), new FieldAccessor(f)); + } +} + +for (Method m : type.getMethods()) { + KVIndex idx = m.getAnnotation(KVIndex.class); + if (idx != null) { +checkIndex(idx, indices); +Preconditions.checkArgument(m.getParameterTypes().length == 0, + "Annotated method %s::%s should not have any parameters.", type.getName(), m.getName()); +indices.put(idx.value(), idx); +accessors.put(idx.value(), new MethodAccessor(m)); + } +} + + Preconditions.checkArgument(indices.containsKey(KVIndex.NATURAL_INDEX_NAME), +"No natural index defined for type %s.", type.getName()); + Preconditions.checkArgument(indices.get(KVIndex.NATURAL_INDEX_NAME).parent().isEmpty(), +"Natural index of %s cannot have a parent.", type.getName()); + +for (KVIndex idx : indices.values()) { + if (!idx.parent().isEmpty()) { +KVIndex parent = indices.get(idx.parent()); +Preconditions.checkArgument(parent != null, + "Cannot find parent %s of index %s.", idx.parent(), idx.value()); +Preconditions.checkArgument(parent.parent().isEmpty(), + "Parent index %s of index %s cannot be itself a child index.", idx.parent(), idx.value()); + } +} + } + + private void checkIndex(KVIndex idx, Map indices) { +Preconditions.checkArgument(idx.value() != null && !idx.value().isEmpty(), + "No name provided for index in type %s.", type.getName()); +Preconditions.checkArgument( + !idx.value().startsWith("_") || idx.value().equals(KVIndex.NATURAL_INDEX_NAME), + "Index name %s (in type %s) is not allowed.", idx.value(), type.getName()); +Preconditions.checkArgument(idx.parent().isEmpty() || !idx.parent().equals(idx.value()), + "Index %s cannot be parent of itself.", idx.value()); +Preconditions.checkArgument(!indices.containsKey(idx.value()), + "Duplicate index %s for type %s.", idx.value(), type.getName()); + } + + public Class getType() { +return type; + } + + public Object getIndexValue(String indexName, Object instance) throws Exception { +return getAccessor(indexName).get(instance); + } + + public Stream indices() { +return indices.values().stream(); + } + + Accessor getAccessor(String indexName) { +Accessor a = accessors.get(indexName); +Preconditions.checkArgument(a != null, "No index %s.", indexName); +return a; + } + + Accesso
[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/17902#discussion_r119411351 --- Diff: common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +import java.io.Closeable; +import java.util.Iterator; +import java.util.Map; + +/** + * Abstraction for a local key/value store for storing app data. + * + * + * There are two main features provided by the implementations of this interface: + * + * + * Serialization + * + * + * Data will be serialized to and deserialized from the underlying data store using a + * {@link KVStoreSerializer}, which can be customized by the application. The serializer is + * based on Jackson, so it supports all the Jackson annotations for controlling the serialization + * of app-defined types. + * + * + * + * Data is also automatically compressed to save disk space. + * + * + * Automatic Key Management + * + * + * When using the built-in key management, the implementation will automatically create unique + * keys for each type written to the store. Keys are based on the type name, and always start + * with the "+" prefix character (so that it's easy to use both manual and automatic key + * management APIs without conflicts). + * + * + * + * Another feature of automatic key management is indexing; by annotating fields or methods of + * objects written to the store with {@link KVIndex}, indices are created to sort the data + * by the values of those properties. This makes it possible to provide sorting without having + * to load all instances of those types from the store. + * + * + * + * KVStore instances are thread-safe for both reads and writes. + * + */ +public interface KVStore extends Closeable { + + /** + * Returns app-specific metadata from the store, or null if it's not currently set. + * + * + * The metadata type is application-specific. This is a convenience method so that applications + * don't need to define their own keys for this information. + * + */ + T getMetadata(Class klass) throws Exception; + + /** + * Writes the given value in the store metadata key. + */ + void setMetadata(Object value) throws Exception; + + /** + * Read a specific instance of an object. + */ + T read(Class klass, Object naturalKey) throws Exception; --- End diff -- add that key cannot be null, and if the key does not exist, you throw a NoSuchElementException (otherwise I might think you'd return null) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/17902#discussion_r119420304 --- Diff: common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import org.fusesource.leveldbjni.JniDBFactory; +import org.iq80.leveldb.DB; +import org.iq80.leveldb.Options; +import org.iq80.leveldb.WriteBatch; + +/** + * Implementation of KVStore that uses LevelDB as the underlying data store. + */ +public class LevelDB implements KVStore { + + @VisibleForTesting + static final long STORE_VERSION = 1L; + + @VisibleForTesting + static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8); + + /** DB key where app metadata is stored. */ + private static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8); + + /** DB key where type aliases are stored. */ + private static final byte[] TYPE_ALIASES_KEY = "__types__".getBytes(UTF_8); + + final AtomicReference _db; + final KVStoreSerializer serializer; + + private final ConcurrentMap typeAliases; --- End diff -- this `typeAliases` thing is pretty confusing. IIUC, the idea is to replace a long fully qualified type name with a shorter numeric id, and this holds the mapping? I'd include a comment about it. maybe even rename `typetoID`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/17902#discussion_r119407982 --- Diff: common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +import java.util.Iterator; +import java.util.Map; + +import com.google.common.base.Preconditions; + +/** + * A configurable view that allows iterating over values in a {@link KVStore}. + * + * + * The different methods can be used to configure the behavior of the iterator. Calling the same + * method multiple times is allowed; the most recent value will be used. + * + * + * + * The iterators returns by this view are of type {@link KVStoreIterator}; they auto-close --- End diff -- typo: returned --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/17902#discussion_r119412048 --- Diff: common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.kvstore; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import org.fusesource.leveldbjni.JniDBFactory; +import org.iq80.leveldb.DB; +import org.iq80.leveldb.Options; +import org.iq80.leveldb.WriteBatch; + +/** + * Implementation of KVStore that uses LevelDB as the underlying data store. + */ +public class LevelDB implements KVStore { + + @VisibleForTesting + static final long STORE_VERSION = 1L; + + @VisibleForTesting + static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8); + + /** DB key where app metadata is stored. */ + private static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8); + + /** DB key where type aliases are stored. */ + private static final byte[] TYPE_ALIASES_KEY = "__types__".getBytes(UTF_8); + + final AtomicReference _db; + final KVStoreSerializer serializer; + + private final ConcurrentMap typeAliases; + private final ConcurrentMap, LevelDBTypeInfo> types; + + public LevelDB(File path) throws Exception { +this(path, new KVStoreSerializer()); + } + + public LevelDB(File path, KVStoreSerializer serializer) throws Exception { +this.serializer = serializer; +this.types = new ConcurrentHashMap<>(); + +Options options = new Options(); +options.createIfMissing(!path.exists()); +this._db = new AtomicReference<>(JniDBFactory.factory.open(path, options)); + +byte[] versionData = db().get(STORE_VERSION_KEY); +if (versionData != null) { + long version = serializer.deserializeLong(versionData); + if (version != STORE_VERSION) { +throw new UnsupportedStoreVersionException(); + } +} else { + db().put(STORE_VERSION_KEY, serializer.serialize(STORE_VERSION)); +} + +Map aliases; +try { + aliases = get(TYPE_ALIASES_KEY, TypeAliases.class).aliases; +} catch (NoSuchElementException e) { + aliases = new HashMap<>(); +} +typeAliases = new ConcurrentHashMap<>(aliases); + } + + @Override + public T getMetadata(Class klass) throws Exception { +try { + return get(METADATA_KEY, klass); +} catch (NoSuchElementException nsee) { + return null; +} + } + + @Override + public void setMetadata(Object value) throws Exception { +if (value != null) { + put(METADATA_KEY, value); +} else { + db().delete(METADATA_KEY); +} + } + + T get(byte[] key, Class klass) throws Exception { +byte[] data = db().get(key); +if (data == null) { + throw new NoSuchElementException(new String(key, UTF_8)); +} +return serializer.deserialize(data, klass); + } + + private void put(byte[] key, Object value) throws Exception { +Preconditions.checkArgument(value != null, "Null values are not allowed."); +db().put(key, serializer.serialize(value)); + } +