[GitHub] spark pull request #19287: [SPARK-22074][Core] Task killed by other attempt ...

2017-09-28 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19287#discussion_r141643992
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -724,6 +724,7 @@ private[spark] class TaskSetManager(
   logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task 
${attemptInfo.id} " +
 s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on 
${attemptInfo.host} " +
 s"as the attempt ${info.attemptNumber} succeeded on ${info.host}")
+  attemptInfo.markKilledByOtherAttempt
--- End diff --

nit: this has a side-effect, so the normal scala-style is to call this w/ 
parens:
`attemptInfo.markKilledByOtherAttempt()`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19287: [SPARK-22074][Core] Task killed by other attempt ...

2017-09-28 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19287#discussion_r141668660
  
--- Diff: core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala ---
@@ -58,4 +57,18 @@ object FakeTask {
 }
 new TaskSet(tasks, stageId, stageAttemptId, priority = 0, null)
   }
+
+  def createShuffleMapTaskSet(numTasks: Int, stageId: Int, stageAttemptId: 
Int,
+prefLocs: Seq[TaskLocation]*): TaskSet = {
--- End diff --

nit: multi-line method definitions should have each param on its own line, 
indented 4 spaces

```scala
def createShuffleMapTaskSet(
numTasks: Int,
stageId: Int,
stageAttemptId: Int,
prefLocs: Seq[TaskLocation]*): TaskSet = {
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19287: [SPARK-22074][Core] Task killed by other attempt ...

2017-09-28 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19287#discussion_r141674264
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -744,6 +744,100 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 assert(resubmittedTasks === 0)
   }
 
+
+  test("[SPARK-22074] Task killed by other attempt task should not be 
resubmitted") {
+val conf = new SparkConf().set("spark.speculation", "true")
+sc = new SparkContext("local", "test", conf)
+// Set the speculation multiplier to be 0 so speculative tasks are 
launched immediately
+sc.conf.set("spark.speculation.multiplier", "0.0")
+sc.conf.set("spark.speculation.quantile", "0.5")
+sc.conf.set("spark.speculation", "true")
+
+val sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
+  ("exec2", "host2"), ("exec3", "host3"))
+sched.initialize(new FakeSchedulerBackend() {
+  override def killTask(
+   taskId: Long,
+   executorId: String,
+   interruptThread: Boolean,
+   reason: String): Unit = {}
+})
+
+// Keep track of the number of tasks that are resubmitted,
+// so that the test can check that no tasks were resubmitted.
+var resubmittedTasks = 0
+val dagScheduler = new FakeDAGScheduler(sc, sched) {
+  override def taskEnded(
+  task: Task[_],
+  reason: TaskEndReason,
+  result: Any,
+  accumUpdates: Seq[AccumulatorV2[_, _]],
+  taskInfo: TaskInfo): Unit = {
+super.taskEnded(task, reason, result, accumUpdates, taskInfo)
+reason match {
+  case Resubmitted => resubmittedTasks += 1
+  case _ =>
+}
+  }
+}
+sched.setDAGScheduler(dagScheduler)
+
+val taskSet = FakeTask.createShuffleMapTaskSet(4, 0, 0,
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host1", "exec1")),
+  Seq(TaskLocation("host3", "exec3")),
+  Seq(TaskLocation("host2", "exec2")))
+
+val clock = new ManualClock()
+val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, 
clock = clock)
+val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = 
taskSet.tasks.map { task =>
+  task.metrics.internalAccums
+}
+// Offer resources for 4 tasks to start
+for ((k, v) <- List(
+  "exec1" -> "host1",
+  "exec1" -> "host1",
+  "exec3" -> "host3",
+  "exec2" -> "host2")) {
+  val taskOption = manager.resourceOffer(k, v, NO_PREF)
+  assert(taskOption.isDefined)
+  val task = taskOption.get
+  assert(task.executorId === k)
+}
+assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+clock.advance(1)
+// Complete the 2 tasks and leave 2 task in running
+for (id <- Set(0, 1)) {
+  manager.handleSuccessfulTask(id, createTaskResult(id, 
accumUpdatesByTask(id)))
+  assert(sched.endedTasks(id) === Success)
+}
+
+// checkSpeculatableTasks checks that the task runtime is greater than 
the threshold for
+// speculating. Since we use a threshold of 0 for speculation, tasks 
need to be running for
+// > 0ms, so advance the clock by 1ms here.
+clock.advance(1)
+assert(manager.checkSpeculatableTasks(0))
+assert(sched.speculativeTasks.toSet === Set(2, 3))
+
+// Offer resource to start the speculative attempt for the running 
task 2.0
+val taskOption = manager.resourceOffer("exec2", "host2", ANY)
+assert(taskOption.isDefined)
+val task4 = taskOption.get
+assert(task4.index === 2)
+assert(task4.taskId === 4)
+assert(task4.executorId === "exec2")
+assert(task4.attemptNumber === 1)
+sched.backend = mock(classOf[SchedulerBackend])
--- End diff --

its really weird to switch `sched.backend` in the middle of the test.  I 
worry that in the future this will not work as expected and the test will not 
work as expected when something else changes in the scheduler.

instead, can you just override `killTask` in your subclass of 
`FakeSchedulerBackend` to track the calls you care about? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19194: [SPARK-20589] Allow limiting task concurrency per stage

2017-09-28 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19194
  
Sorry if I was unclear earlier on the issue w/ the active Job ID.  So, I 
agree, that if a user actually gets into this situation, where they've got two 
different jobs for the same stage, with different max concurrent tasks, its 
mostly a toss-up which one they'll get, as the users jobs are probably racing 
to get to that stage.  Still, I think its important that it pulls the max 
concurrent tasks from the active job, just so that users can understand what is 
going on and for consistency and debugability.  The TaskSetManager gets the 
property from the active job, which actually submitted the stage, so the 
ExecutorAllocationManager should do the same.

I think the best way to ensure that is to add activeJobId to 
SparkListenerStageSubmitted.  Then you'd go back to just keeping a 
jobIdToMaxConcurrentTasks map when handling onJobStart, and in 
onStageSubmitted, you'd then figure out the max number of tasks for that stage, 
given the job which actually submitted it.

@tgravescs what do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r141475205
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -512,6 +535,9 @@ private[spark] class TaskSetManager(
   serializedTask)
   }
 } else {
+  if (runningTasks >= maxConcurrentTasks) {
+logDebug("Already running max. no. of concurrent tasks.")
--- End diff --

can you update this msg to include the jobGroup and full config name?

also I think it would be good to have this log at the info level one time 
(not on every `resourceOffer`, just the first time we hit the limit for this 
TaskSet).  I just see users hitting this inadvertently and so it would be good 
to make it clear.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19194: [SPARK-20589] Allow limiting task concurrency per stage

2017-09-27 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19194
  
btw I'm not opposed to this feature, just want to make sure that given (a) 
complexity and (b) the fact that it doesn't quite meet the original ask anyway 
(configuring at the stage & no need to break up pipelines), that the folks that 
wanted it are still for it.  I wonder if it should even be marked experimental 
in case we decide to drop it in favor of something stage level eventually.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19343: [SPARK-22121][SQL] Correct database location for namenod...

2017-09-26 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19343
  
OK, closing this and the jira


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19343: [SPARK-22121][SQL] Correct database location for ...

2017-09-26 Thread squito
Github user squito closed the pull request at:

https://github.com/apache/spark/pull/19343


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps

2017-09-26 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19250
  
@HyukjinKwon you might be interested in this one also


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19343: [SPARK-22121][SQL] Correct database location for namenod...

2017-09-26 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19343
  
I don't see much point in putting this in the docs ... it seems too 
fine-grained a detail to be useful there.  I just don't see the users who 
encounter this exception from going to look at the spot in the docs to figure 
out exactly what is wrong.  I put an example exception in the JIRA, so at least 
users can find it with a search.  

Sounds like you feel pretty strongly we should close this as "won't fix"?  
I'd still prefer to have this in, but will settle for just having the 
workaround be easily searchable


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19194: [SPARK-20589] Allow limiting task concurrency per stage

2017-09-26 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19194
  
one more thought -- have you considered adding stage properties?  I know 
that is more involved since its an API change, but I dunno how many times I've 
wanted something like that, eg. for labelling a stage in the UI, for 
partitioning hints, etc.  And if you had it, then it would remove all the 
complications in this of multiple jobs & threads etc., and allow the property 
to actually be set on just one stage, as opposed to the entire job, as was 
originally requested.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19338: [SPARK-22123][CORE] Add latest failure reason for...

2017-09-26 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19338#discussion_r141098993
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala ---
@@ -61,6 +61,16 @@ private[scheduler] class TaskSetBlacklist(val conf: 
SparkConf, val stageId: Int,
   private val blacklistedExecs = new HashSet[String]()
   private val blacklistedNodes = new HashSet[String]()
 
+  private var latestFailureReason: String = null
+
+  /**
+   * Get the most recent failure reason of this TaskSet.
+   * @return
+   */
+  def getLatestFailureReason: String = {
--- End diff --

@jerryshao the whole class is `private[scheduler]`, so I think is OK.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19343: [SPARK-22121][SQL] Correct database location for namenod...

2017-09-26 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19343
  
whoops, sorry I wrote [CORE] out of habit!

> Spark SQL might not be deployed in the HDFS system. Conceptually, this 
HDFS-specific codes should not be part of our HiveExternalCatalog . 
HiveExternalCatalog is just for using Hive metastore. It does not assume we use 
HDFS.

yes, I totally understand that.  Even for users that are on hdfs, this is 
clear user-error, they should be using hive's metatool to update the database 
location.  Originally I thought this would be unnecessary complication in 
spark, but with enough complaints I figured maybe spark could just handle it 
automatically.

Is there another place this could go instead?

Anyway, if you really feel like this doesn't belong in spark, that is fine.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19343: [SPARK-22121][CORE] Correct database location for...

2017-09-25 Thread squito
GitHub user squito opened a pull request:

https://github.com/apache/spark/pull/19343

[SPARK-22121][CORE] Correct database location for namenode HA.

## What changes were proposed in this pull request?

If hdfs HA is turned on after a hive database is already created, the db
location may still reference just one namenode, instead of the
nameservice, if users do not properly follow all upgrade instructions.
After this change, spark detects the misconfiguration and tries to
auto-adjust for it, since this is the behavior from hive as well.

## How was this patch tested?

Added unit tests.  Also deployed on a cluster with hdfs ha, with the 
database location set to only one instance, and then I failed over the namenode 
so the other instance was the active one.  After this change, things worked 
without a problem.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/squito/spark SPARK-22121

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19343.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19343


commit c2e125eacb48971ee72dd61859a95ca8ae6a9fc8
Author: Imran Rashid 
Date:   2017-09-26T00:55:58Z

[SPARK-22121][CORE] Correct database location for namenode HA.

If hdfs HA is turned on after a hive database is already created, the db
location may still reference just one namenode, instead of the
nameservice, if users do not properly follow all upgrade instructions.
After this change, spark detects the misconfiguration and tries to
auto-adjust for it, since this is the behavior from hive as well.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19331: [SPARK-22109][SQL] Resolves type conflicts betwee...

2017-09-25 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19331#discussion_r140890429
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 ---
@@ -470,15 +471,15 @@ object PartitioningUtils {
* Given a collection of [[Literal]]s, resolves possible type conflicts 
by up-casting "lower"
* types.
*/
-  private def resolveTypeConflicts(literals: Seq[Literal]): Seq[Literal] = 
{
+  private def resolveTypeConflicts(literals: Seq[Literal], timeZone: 
TimeZone): Seq[Literal] = {
 val desiredType = {
   val topType = 
literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_))
--- End diff --

thanks for such a quick fix!

but, don't we also need to update 
[`upCastingOrder`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala#L430)?
  It seems this happens to work because `upcastingOrder.indexOf(TimestampType) 
= -1`.  But I think that doesn't compare the right way with NullType, so if you 
add this to [`ParquetPartitionDiscoverySuite.test("parse 
partitions")`](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala#L241),
 it fails:

```
check(Seq(
  s"hdfs://host:9000/path/a=$defaultPartitionName/b=blah",
  s"hdfs://host:9000/path/a=2014-01-01 00%3A00%3A00.0/b=foo"),
  PartitionSpec(
StructType(Seq(
  StructField("a", TimestampType),
  StructField("b", StringType))),
Seq(
  Partition(InternalRow(null, "blah"),
s"hdfs://host:9000/path/a=$defaultPartitionName/b=blah"),
  Partition(InternalRow(Timestamp.valueOf("2014-01-01 00:00:00.0"), 
"foo"),
s"hdfs://host:9000/path/a=2014-01-01 00%3A00%3A00.0/b=foo"
```

(I have to admit, I don't totally understand what the ramifications of that 
fail are -- the behavior in the resulting dataframe seems fine to me, but I 
figure there is probably some case this would mess up ...)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19338: [SPARK-21539][CORE] Add latest failure reason for...

2017-09-25 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19338#discussion_r140823560
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala ---
@@ -94,7 +96,9 @@ private[scheduler] class TaskSetBlacklist(val conf: 
SparkConf, val stageId: Int,
   private[scheduler] def updateBlacklistForFailedTask(
   host: String,
   exec: String,
-  index: Int): Unit = {
+  index: Int,
+  failureReason: Option[String] = None): Unit = {
--- End diff --

`failureReason` should always be present in this call, so it shouldn't be 
an `Option` as an arg to this method.

(I realize this is a bit of a pain as you have to modify all the call sites 
in tests, sorry about that).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19338: [SPARK-21539][CORE] Add latest failure reason for...

2017-09-25 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19338#discussion_r140823764
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -838,7 +840,7 @@ private[spark] class TaskSetManager(
 
 if (!isZombie && reason.countTowardsTaskFailures) {
   taskSetBlacklistHelperOpt.foreach(_.updateBlacklistForFailedTask(
-info.host, info.executorId, index))
+info.host, info.executorId, index, Some(failureReason)))
   assert (null != failureReason)
--- End diff --

move the `assert (null != failureReason)` first, and to go along with the 
other change, drop the `Some` wrapper around `failureReason`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19338: [SPARK-21539][CORE] Add latest failure reason for task s...

2017-09-25 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19338
  
@caneGuy thanks for working on this, looks very reasonable to me, I am 
going to take a closer look at a couple of details.  But can you make a couple 
of updates in the meantime:

1) Can you open a new jira for this, and put that in the commit summary?  
SPARK-21539 is referring to something else entirely
2) Can you reformat the new exception to look a bit more like the 
formatting for when there are too many failures of a specific task?  maybe like 
this:

```
User class threw exception: org.apache.spark.SparkException: Job aborted 
due to stage failure: Aborting TaskSet 0.0 because task 0 (partition 0) cannot 
run anywhere due to node and executor blacklist. Most recent failure:
Lost task 0.1 in stage 0.0 (TID 3,xxx, executor 1): java.lang.Exception: 
Fake error!
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:73)
 at org.apache.spark.scheduler.Task.run(Task.scala:99)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:305)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:745) 

Blacklisting behavior can be configured via spark.blacklist.*.

Driver Stacktrace:
 at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1458)
...
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19338: [SPARK-21539][CORE] Add latest failure reason for task s...

2017-09-25 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19338
  
Jenkins, ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....

2017-09-25 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19311#discussion_r140815961
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala ---
@@ -407,4 +407,119 @@ class MemoryStoreSuite
 })
 assert(memoryStore.getSize(blockId) === 1)
   }
+
+  test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") {
+// Setup a memory store with many blocks cached, and then one request 
which leads to multiple
+// blocks getting evicted.  We'll make the eviction throw an 
exception, and make sure that
+// all locks are released.
+val ct = implicitly[ClassTag[Array[Byte]]]
+def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, 
readLockAfterDrop: Boolean): Unit = {
--- End diff --

I don't think `validBlock` captures the intent here -- I don't see anything 
valid or invalid about it either way.  The part of the behavior which changes 
is whether or not another thread grabs a reader lock on the thread after it 
gets dropped to disk.

(To go along with that, we drop the block to disk, rather than just 
evicting it completely, as otherwise there is nothing to grab a lock of.  I 
could always drop the block to disk, instead of having that depend on this, it 
just seemed like another useful thing to check, whether the number of blocks 
was successfully updated in `blockInfoManager`, when the block was dropped 
completely.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....

2017-09-25 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19311#discussion_r140813788
  
--- Diff: 
core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala ---
@@ -407,4 +407,119 @@ class MemoryStoreSuite
 })
 assert(memoryStore.getSize(blockId) === 1)
   }
+
+  test("SPARK-22083: Release all locks in evictBlocksToFreeSpace") {
+// Setup a memory store with many blocks cached, and then one request 
which leads to multiple
+// blocks getting evicted.  We'll make the eviction throw an 
exception, and make sure that
+// all locks are released.
+val ct = implicitly[ClassTag[Array[Byte]]]
+def testFailureOnNthDrop(failAfterDroppingNBlocks: Int, 
readLockAfterDrop: Boolean): Unit = {
+  val tc = TaskContext.empty()
+  val memManager = new StaticMemoryManager(conf, Long.MaxValue, 100, 
numCores = 1)
+  val blockInfoManager = new BlockInfoManager
+  blockInfoManager.registerTask(tc.taskAttemptId)
+  var droppedSoFar = 0
+  val blockEvictionHandler = new BlockEvictionHandler {
+var memoryStore: MemoryStore = _
+
+override private[storage] def dropFromMemory[T: ClassTag](
+blockId: BlockId,
+data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel 
= {
+  if (droppedSoFar < failAfterDroppingNBlocks) {
+droppedSoFar += 1
+memoryStore.remove(blockId)
+if (readLockAfterDrop) {
+  // for testing purposes, we act like another thread gets the 
read lock on the new
+  // block
+  StorageLevel.DISK_ONLY
+} else {
+  StorageLevel.NONE
+}
+  } else {
+throw new RuntimeException(s"Mock error dropping block 
$droppedSoFar")
+  }
+}
+  }
+  val memoryStore = new MemoryStore(conf, blockInfoManager, 
serializerManager, memManager,
+  blockEvictionHandler) {
+override def afterDropAction(blockId: BlockId): Unit = {
+  if (readLockAfterDrop) {
+// pretend that we get a read lock on the block (now on disk) 
in another thread
+TaskContext.setTaskContext(tc)
+blockInfoManager.lockForReading(blockId)
+TaskContext.unset()
+  }
+}
+  }
+
+  blockEvictionHandler.memoryStore = memoryStore
+  memManager.setMemoryStore(memoryStore)
+
+  // Put in some small blocks to fill up the memory store
+  val initialBlocks = (1 to 10).map { id =>
+val blockId = BlockId(s"rdd_1_$id")
+val blockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, 
tellMaster = false)
+val initialWriteLock = 
blockInfoManager.lockNewBlockForWriting(blockId, blockInfo)
+assert(initialWriteLock)
+val success = memoryStore.putBytes(blockId, 10, 
MemoryMode.ON_HEAP, () => {
+  new ChunkedByteBuffer(ByteBuffer.allocate(10))
+})
+assert(success)
+blockInfoManager.unlock(blockId, None)
+  }
+  assert(blockInfoManager.size === 10)
+
+
+  // Add one big block, which will require evicting everything in the 
memorystore.  However our
+  // mock BlockEvictionHandler will throw an exception -- make sure 
all locks are cleared.
+  val largeBlockId = BlockId(s"rdd_2_1")
+  val largeBlockInfo = new BlockInfo(StorageLevel.MEMORY_ONLY, ct, 
tellMaster = false)
+  val initialWriteLock = 
blockInfoManager.lockNewBlockForWriting(largeBlockId, largeBlockInfo)
+  assert(initialWriteLock)
+  if (failAfterDroppingNBlocks < 10) {
+val exc = intercept[RuntimeException] {
+  memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () 
=> {
+new ChunkedByteBuffer(ByteBuffer.allocate(100))
+  })
+}
+assert(exc.getMessage().startsWith("Mock error dropping block"), 
exc)
+// BlockManager.doPut takes care of releasing the lock for the 
newly written block -- not
+// testing that here, so do it manually
+blockInfoManager.removeBlock(largeBlockId)
+  } else {
+memoryStore.putBytes(largeBlockId, 100, MemoryMode.ON_HEAP, () => {
+  new ChunkedByteBuffer(ByteBuffer.allocate(100))
+})
+// BlockManager.doPut takes care of releasing the lock for the 
newly written block -- not
+// testing that here, so do it manually
+blockInfoManager.unlock(largeBlockId)
+  }
+
+  val largeBlockInMemory = if (failAfterDrop

[GitHub] spark pull request #18853: [SPARK-21646][SQL] CommonType for binary comparis...

2017-09-25 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18853#discussion_r140802924
  
--- Diff: docs/sql-programming-guide.md ---
@@ -1460,6 +1460,13 @@ that these options will be deprecated in future 
release as more optimizations ar
   Configures the number of partitions to use when shuffling data for 
joins or aggregations.
 
   
+  
+spark.sql.typeCoercion.mode
+default
+
+Whether compatible with Hive. Available options are 
default and hive.
--- End diff --

This description feels inadequate to me.  I think most users will think 
"hive" means "old, legacy way of doing things and "default" means "new, better, 
spark way of doing things".  But I haven't heard an argument in favor of the 
"default" behavior, just that we don't want to have a breaking change of 
behavior.

So (a) I'd advocate that we rename "default" to "legacy", or something else 
along those lines.  I do think it should be the default value, to avoid 
changing behavior.
and (b) I think the doc section here should more clearly indicate the 
difference, eg. "The 'legacy' typeCoercion mode was used in spark prior to 2.3, 
and so it continues to be the default to avoid breaking behavior.  However, it 
has logical inconsistencies.  The 'hive' mode is preferred for most new 
applications, though it may require additional manual casting.

I am even wondering if we should have a 3rd option, to not implicit cast 
across type categories, eg. like postgres, as this avoids nasty surprises for 
the user.  While the casts are convenient, when it doesn't work there is very 
little indication to the user that anything went wrong -- most likely they'll 
just keep continue processing data though the results don't actually have the 
semantics they want.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18853: [SPARK-21646][SQL] CommonType for binary comparis...

2017-09-25 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18853#discussion_r140799432
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -115,21 +115,46 @@ object TypeCoercion {
* is a String and the other is not. It also handles when one op is a 
Date and the
* other is a Timestamp by making the target type to be String.
*/
-  val findCommonTypeForBinaryComparison: (DataType, DataType) => 
Option[DataType] = {
-// We should cast all relative timestamp/date/string comparison into 
string comparisons
-// This behaves as a user would expect because timestamp strings sort 
lexicographically.
-// i.e. TimeStamp(2013-01-01 00:00 ...) < "2014" = true
-case (StringType, DateType) => Some(StringType)
-case (DateType, StringType) => Some(StringType)
-case (StringType, TimestampType) => Some(StringType)
-case (TimestampType, StringType) => Some(StringType)
-case (TimestampType, DateType) => Some(StringType)
-case (DateType, TimestampType) => Some(StringType)
-case (StringType, NullType) => Some(StringType)
-case (NullType, StringType) => Some(StringType)
-case (l: StringType, r: AtomicType) if r != StringType => Some(r)
-case (l: AtomicType, r: StringType) if (l != StringType) => Some(l)
-case (l, r) => None
+  private def findCommonTypeForBinaryComparison(
+  plan: LogicalPlan,
+  l: DataType,
+  r: DataType): Option[DataType] =
+if (!plan.conf.isHiveTypeCoercionMode) {
+  (l, r) match {
+// We should cast all relative timestamp/date/string comparison 
into string comparisons
+// This behaves as a user would expect because timestamp strings 
sort lexicographically.
+// i.e. TimeStamp(2013-01-01 00:00 ...) < "2014" = true
--- End diff --

I think this comment should be updated given the latest investigation, 
explaining both the original motivation, and how it is flawed.  Eg.

"Originally spark cast all relative timestamp/date/string comparison into 
string comparisons.  The motivation was that this would lead to natural 
comparisons on simple string inputs for times, eg. TimeStamp(2013-01-01 00:00 
...) < "2014" = true.  However, this leads to other logical inconsistencies, 
eg. TimeStamp(2013-01-01 00:00 ...) < "5" = true.  Also, equals is not 
consistent with other binary comparisions.  Futhermore, comparing to a string 
that does not look like a time at all will still compare, just with an 
unexpected result."


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18853: [SPARK-21646][SQL] CommonType for binary comparis...

2017-09-25 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18853#discussion_r140801036
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
---
@@ -2677,4 +2677,142 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
   checkAnswer(df, Row(1, 1, 1))
 }
   }
+
+  test("SPARK-21646: CommonTypeForBinaryComparison: StringType vs 
NumericType") {
+withTempView("v") {
+  val str1 = Long.MaxValue.toString + "1"
+  val str2 = Int.MaxValue.toString + "1"
+  val str3 = "10"
+  Seq(str1, str2, str3).toDF("c1").createOrReplaceTempView("v")
+  withSQLConf(SQLConf.typeCoercionMode.key -> "hive") {
+checkAnswer(sql("SELECT c1 from v where c1 > 0"),
+  Row(str1) :: Row(str2) :: Row(str3) :: Nil)
+checkAnswer(sql("SELECT c1 from v where c1 > 0L"),
+  Row(str1) :: Row(str2) :: Row(str3) :: Nil)
+  }
+
+  withSQLConf(SQLConf.typeCoercionMode.key -> "default") {
+checkAnswer(sql("SELECT c1 from v where c1 > 0"), Row(str3) :: Nil)
+checkAnswer(sql("SELECT c1 from v where c1 > 0L"), Row(str2) :: 
Row(str3) :: Nil)
+  }
+}
+  }
+
+  test("SPARK-21646: CommonTypeForBinaryComparison: DoubleType vs 
IntegerType") {
+withTempView("v") {
+  Seq(("0", 1), ("-0.4", 2), ("0.6", 3)).toDF("c1", 
"c2").createOrReplaceTempView("v")
+  withSQLConf(SQLConf.typeCoercionMode.key -> "hive") {
+checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0"), Seq(Row("0")))
+checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0L"), Seq(Row("0")))
+checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0.0"), Seq(Row("0")))
+checkAnswer(sql("SELECT c1 FROM v WHERE c1 = -0.4"), 
Seq(Row("-0.4")))
+checkAnswer(sql("SELECT count(*) FROM v WHERE c1 > 0"), Row(1) :: 
Nil)
+  }
+
+  withSQLConf(SQLConf.typeCoercionMode.key -> "default") {
+checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0"), Seq(Row("0"), 
Row("-0.4"), Row("0.6")))
+checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0L"), Seq(Row("0"), 
Row("-0.4"), Row("0.6")))
+checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0.0"), Seq(Row("0")))
+checkAnswer(sql("SELECT c1 FROM v WHERE c1 = -0.4"), 
Seq(Row("-0.4")))
+checkAnswer(sql("SELECT count(*) FROM v WHERE c1 > 0"), Row(0) :: 
Nil)
+  }
+}
+  }
+
+  test("SPARK-21646: CommonTypeForBinaryComparison: StringType vs 
DateType") {
+withTempView("v") {
+  val v1 = Date.valueOf("2017-09-22")
+  val v2 = Date.valueOf("2017-09-09")
+  Seq(v1, v2).toDF("c1").createTempView("v")
+  withSQLConf(SQLConf.typeCoercionMode.key -> "hive") {
+checkAnswer(sql("select c1 from v where c1 > '2017-8-1'"), Row(v1) 
:: Row(v2) :: Nil)
+checkAnswer(sql("select c1 from v where c1 > cast('2017-8-1' as 
date)"),
+  Row(v1) :: Row(v2) :: Nil)
+  }
+
+  withSQLConf(SQLConf.typeCoercionMode.key -> "default") {
+checkAnswer(sql("select c1 from v where c1 > '2017-8-1'"), Nil)
+checkAnswer(sql("select c1 from v where c1 > cast('2017-8-1' as 
date)"),
+  Row(v1) :: Row(v2) :: Nil)
+  }
+}
+  }
+
+  test("SPARK-21646: CommonTypeForBinaryComparison: StringType vs 
TimestampType") {
+withTempView("v") {
+  val v1 = Timestamp.valueOf("2017-07-21 23:42:12.123")
+  val v2 = Timestamp.valueOf("2017-08-21 23:42:12.123")
+  Seq(v1, v2).toDF("c1").createTempView("v")
+  withSQLConf(SQLConf.typeCoercionMode.key -> "hive") {
+checkAnswer(sql("select c1 from v where c1 > '2017-8-1'"), Row(v2) 
:: Nil)
+checkAnswer(sql("select c1 from v where c1 > cast('2017-8-1' as 
timestamp)"),
+  Row(v2) :: Nil)
+  }
+
+  withSQLConf(SQLConf.typeCoercionMode.key -> "default") {
+checkAnswer(sql("select c1 from v where c1 > '2017-8-1'"), Nil)
+checkAnswer(sql("select c1 from v where c1 > cast('2017-8-1' as 
timestamp)"),
+  Row(v2) :: Nil)
--- End diff --

perhaps there should also be a comparison for a time with more degrees of 
precision?  It seems from the original discussion in 
https://issues.apache.org/jira/browse/SPARK-8420?focusedCommentId=14592654&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14592654
 this was one of the concerns, eg. with `'1969-12-31 16:00:00'` `'1969-12-31 
16:00:00.0'`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18853: [SPARK-21646][SQL] CommonType for binary comparis...

2017-09-25 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18853#discussion_r140797299
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
---
@@ -2677,4 +2677,142 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
   checkAnswer(df, Row(1, 1, 1))
 }
   }
+
+  test("SPARK-21646: CommonTypeForBinaryComparison: StringType vs 
NumericType") {
+withTempView("v") {
+  val str1 = Long.MaxValue.toString + "1"
+  val str2 = Int.MaxValue.toString + "1"
+  val str3 = "10"
+  Seq(str1, str2, str3).toDF("c1").createOrReplaceTempView("v")
+  withSQLConf(SQLConf.typeCoercionMode.key -> "hive") {
+checkAnswer(sql("SELECT c1 from v where c1 > 0"),
+  Row(str1) :: Row(str2) :: Row(str3) :: Nil)
+checkAnswer(sql("SELECT c1 from v where c1 > 0L"),
+  Row(str1) :: Row(str2) :: Row(str3) :: Nil)
+  }
+
+  withSQLConf(SQLConf.typeCoercionMode.key -> "default") {
+checkAnswer(sql("SELECT c1 from v where c1 > 0"), Row(str3) :: Nil)
+checkAnswer(sql("SELECT c1 from v where c1 > 0L"), Row(str2) :: 
Row(str3) :: Nil)
+  }
+}
+  }
+
+  test("SPARK-21646: CommonTypeForBinaryComparison: DoubleType vs 
IntegerType") {
+withTempView("v") {
+  Seq(("0", 1), ("-0.4", 2), ("0.6", 3)).toDF("c1", 
"c2").createOrReplaceTempView("v")
+  withSQLConf(SQLConf.typeCoercionMode.key -> "hive") {
+checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0"), Seq(Row("0")))
+checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0L"), Seq(Row("0")))
+checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0.0"), Seq(Row("0")))
+checkAnswer(sql("SELECT c1 FROM v WHERE c1 = -0.4"), 
Seq(Row("-0.4")))
+checkAnswer(sql("SELECT count(*) FROM v WHERE c1 > 0"), Row(1) :: 
Nil)
+  }
+
+  withSQLConf(SQLConf.typeCoercionMode.key -> "default") {
+checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0"), Seq(Row("0"), 
Row("-0.4"), Row("0.6")))
+checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0L"), Seq(Row("0"), 
Row("-0.4"), Row("0.6")))
+checkAnswer(sql("SELECT c1 FROM v WHERE c1 = 0.0"), Seq(Row("0")))
+checkAnswer(sql("SELECT c1 FROM v WHERE c1 = -0.4"), 
Seq(Row("-0.4")))
+checkAnswer(sql("SELECT count(*) FROM v WHERE c1 > 0"), Row(0) :: 
Nil)
+  }
+}
+  }
+
+  test("SPARK-21646: CommonTypeForBinaryComparison: StringType vs 
DateType") {
+withTempView("v") {
+  val v1 = Date.valueOf("2017-09-22")
+  val v2 = Date.valueOf("2017-09-09")
+  Seq(v1, v2).toDF("c1").createTempView("v")
+  withSQLConf(SQLConf.typeCoercionMode.key -> "hive") {
+checkAnswer(sql("select c1 from v where c1 > '2017-8-1'"), Row(v1) 
:: Row(v2) :: Nil)
+checkAnswer(sql("select c1 from v where c1 > cast('2017-8-1' as 
date)"),
+  Row(v1) :: Row(v2) :: Nil)
+  }
+
+  withSQLConf(SQLConf.typeCoercionMode.key -> "default") {
+checkAnswer(sql("select c1 from v where c1 > '2017-8-1'"), Nil)
+checkAnswer(sql("select c1 from v where c1 > cast('2017-8-1' as 
date)"),
+  Row(v1) :: Row(v2) :: Nil)
+  }
+}
+  }
+
+  test("SPARK-21646: CommonTypeForBinaryComparison: StringType vs 
TimestampType") {
+withTempView("v") {
+  val v1 = Timestamp.valueOf("2017-07-21 23:42:12.123")
+  val v2 = Timestamp.valueOf("2017-08-21 23:42:12.123")
+  Seq(v1, v2).toDF("c1").createTempView("v")
+  withSQLConf(SQLConf.typeCoercionMode.key -> "hive") {
+checkAnswer(sql("select c1 from v where c1 > '2017-8-1'"), Row(v2) 
:: Nil)
+checkAnswer(sql("select c1 from v where c1 > cast('2017-8-1' as 
timestamp)"),
+  Row(v2) :: Nil)
+  }
+
+  withSQLConf(SQLConf.typeCoercionMode.key -> "default") {
+checkAnswer(sql("select c1 from v where c1 > '2017-8-1'"), Nil)
+checkAnswer(sql("select c1 from v where c1 > cast('2017-8-1' as 
timestamp)"),
+  Row(v2) :: Nil)
--- End diff --

I think we should include a comparison which is only the year, eg. ` > 
'2014'`, as that was listed as the motivation for the "default" behavior in the 
code comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....

2017-09-22 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19311#discussion_r140559549
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -544,20 +544,39 @@ private[spark] class MemoryStore(
   }
 
   if (freedMemory >= space) {
-logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
-  s"(${Utils.bytesToString(freedMemory)} bytes)")
-for (blockId <- selectedBlocks) {
-  val entry = entries.synchronized { entries.get(blockId) }
-  // This should never be null as only one task should be dropping
-  // blocks and removing entries. However the check is still here 
for
-  // future safety.
-  if (entry != null) {
-dropBlock(blockId, entry)
+var exceptionWasThrown: Boolean = true
+try {
+  logInfo(s"${selectedBlocks.size} blocks selected for dropping " +
+s"(${Utils.bytesToString(freedMemory)} bytes)")
+  for (blockId <- selectedBlocks) {
+val entry = entries.synchronized {
+  entries.get(blockId)
+}
+// This should never be null as only one task should be 
dropping
+// blocks and removing entries. However the check is still 
here for
+// future safety.
+if (entry != null) {
+  dropBlock(blockId, entry)
+}
+  }
+  exceptionWasThrown = false
+  logInfo(s"After dropping ${selectedBlocks.size} blocks, " +
+s"free memory is ${Utils.bytesToString(maxMemory - 
blocksMemoryUsed)}")
+  freedMemory
+} finally {
+  // like BlockManager.doPut, we use a finally rather than a catch 
to avoid having to deal
+  // with InterruptedException
+  if (exceptionWasThrown) {
+selectedBlocks.foreach { id =>
+  // some of the blocks may have already been unlocked, or 
completely removed
+  blockInfoManager.get(id).foreach { info =>
--- End diff --

good point, thanks, I've handled this now


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19313: [SPARK-21928][CORE] Set classloader on Serializer...

2017-09-21 Thread squito
Github user squito closed the pull request at:

https://github.com/apache/spark/pull/19313


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19313: [SPARK-21928][CORE] Set classloader on SerializerManager...

2017-09-21 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19313
  
thanks @vanzin 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19313: [SPARK-21928][CORE] Set classloader on Serializer...

2017-09-21 Thread squito
GitHub user squito opened a pull request:

https://github.com/apache/spark/pull/19313

[SPARK-21928][CORE] Set classloader on SerializerManager's private kryo

## What changes were proposed in this pull request?

We have to make sure that SerializerManager's private instance of
kryo also uses the right classloader, regardless of the current thread
classloader.  In particular, this fixes serde during remote cache
fetches, as those occur in netty threads.

## How was this patch tested?

Manual tests & existing suite via jenkins.  I haven't been able to 
reproduce this is in a unit test, because when a remote RDD partition can be 
fetched, there is a warning message and then the partition is just recomputed 
locally.  I manually verified the warning message is no longer present.

(cherry picked from commit b75bd1777496ce0354458bf85603a8087a6a0ff8)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/squito/spark SPARK-21928_2.1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19313.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19313






---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-21 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r140337893
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager(
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  jobStart.stageInfos.foreach(stageInfo => 
stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
+
+  var jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+null
+  }
+
+  val maxConTasks = if (jobGroupId != null &&
+conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
+  } else {
+Int.MaxValue
+  }
+
+  if (maxConTasks <= 0) {
+throw new IllegalArgumentException(
+  "Maximum Concurrent Tasks should be set greater than 0 for the 
job to progress.")
+  }
+
+  if (jobGroupId == null || 
!conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+jobGroupId = DEFAULT_JOB_GROUP
+  }
+
+  jobIdToJobGroup(jobStart.jobId) = jobGroupId
+  if (!jobGroupToMaxConTasks.contains(jobGroupId)) {
--- End diff --

Sorry I am doing a really bad job explaining my concerns.

Of course, if you have multiple threads racing to set the value for the 
same job group, its unpredictable which value you'll get.  Nothing we can do 
about that, its not the scenario I'm talking about.

Here's the step-by-step (hope I understand the proposed change correctly):

(1) T1: Set jobgroup = "foo".  Set maxConTasks=10.
(2) T1: Launch job 1.1  Uses maxConTasks=10
(3) T2: Set jobgroup = "foo".
(4) T2: Launch job 2.1. Uses maxConTasks=10
(5) T1:  Finish job 1.1.  Do *not* remove the entry for 
`jobGroupToMaxConTasks("foo")`, because there is still another job running for 
this job group in T2 
(https://github.com/apache/spark/pull/19194/files#diff-b096353602813e47074ace09a3890d56R664)
(6) T1: Set maxConTasks=20
(7) T1: Launch job 1.2.  Uses maxConTasks=10, because 
`jobGroupToMaxConTasks.contains("foo")`, so we don't reset the value.
(8) T2: Finish job 2.1.  Again, do not remove 
`jobGroupToMaxConTasks("foo")`, as T1 is still running a job in this job group.
(9) Set maxConTasks=20
(10) T2: Run job 2.2.  Uses maxConTasks=10, because 
`jobGroupToMaxConTasks.contains("foo")`

As long as there a job running in T2 when T1 finishes its job (or vice 
versa), we never remove the prior value, and so never update it when starting a 
new job.  We could be stuck with maxConTasks=10 in both threads indefinitely, 
even though both threads have set maxConTasks=20.

If you remove that `if (!jobGroupToMaxConTasks.contains(jobGroupId))`, then 
when job 1.2 starts, you'd use the new value of maxConTasks=20.  The only weird 
thing is that job 2.1 suddenly switches mid-flight to using maxConTasks=20 as 
well.  But that seems more reasonable to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19311: [SPARK-22083][CORE] Release locks in MemoryStore....

2017-09-21 Thread squito
GitHub user squito opened a pull request:

https://github.com/apache/spark/pull/19311

[SPARK-22083][CORE] Release locks in MemoryStore.evictBlocksToFreeSpace

## What changes were proposed in this pull request?

MemoryStore.evictBlocksToFreeSpace acquires write locks for all the
blocks it intends to evict up front.  If there is a failure to evict
blocks (eg., some failure dropping a block to disk), then we have to
release the lock.  Otherwise the lock is never released and an executor
trying to get the lock will wait forever.

## How was this patch tested?

Added unit test.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/squito/spark SPARK-22083

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19311.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19311


commit 1ff270a1fdbd567965c6c721f0a92bc1b77bc240
Author: Imran Rashid 
Date:   2017-09-21T19:12:24Z

[SPARK-22083][CORE] Release locks in MemoryStore.evictBlocksToFreeSpace

MemoryStore.evictBlocksToFreeSpace acquires write locks for all the
blocks it intends to evict up front.  If there is a failure to evict
blocks (eg., some failure dropping a block to disk), then we have to
release the lock.  Otherwise the lock is never released and an executor
trying to get the lock will wait forever.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-21 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r140296863
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager(
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  jobStart.stageInfos.foreach(stageInfo => 
stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
+
+  var jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+null
+  }
+
+  val maxConTasks = if (jobGroupId != null &&
+conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
+  } else {
+Int.MaxValue
+  }
+
+  if (maxConTasks <= 0) {
+throw new IllegalArgumentException(
+  "Maximum Concurrent Tasks should be set greater than 0 for the 
job to progress.")
+  }
+
+  if (jobGroupId == null || 
!conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+jobGroupId = DEFAULT_JOB_GROUP
+  }
+
+  jobIdToJobGroup(jobStart.jobId) = jobGroupId
+  if (!jobGroupToMaxConTasks.contains(jobGroupId)) {
--- End diff --

You could submit jobs concurrently, from two different threads.  I didn't 
describe it very well -- say both threads are in the same job group.  Each 
thread can only do one job at a time, but maybe between the two threads, there 
is always some active job for the job group the entire time.

There are real use cases which are like this -- in "job-server" style 
deployments, there is a long-running spark context (most likely with cached 
RDDs), accepting requests from multiple users (perhaps sitting behind an http 
server).  Maybe some group of users are always put into one job group (eg., 
there is a low-priority group and a high priority group of users).  You might 
process more than one job for each group at a time, and there is a never ending 
stream of jobs.

(again, its not the most common scenario, but might as well have it behave 
correctly.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19280: [SPARK-21928][CORE] Set classloader on SerializerManager...

2017-09-20 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19280
  
> Looks ok to me, assuming the "default serializer" in SerializerManager is 
configured correctly through other means.

I think that part is fine.  The serializer is created here:

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkEnv.scala#L279

The same instance is assigned to `SparkEnv.serializer`: 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkEnv.scala#L374

Which has its default classloader set in Executor.scala, right by the part 
I'm changing: 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L131


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-20 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r140136101
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager(
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  jobStart.stageInfos.foreach(stageInfo => 
stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
+
+  var jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+null
+  }
+
+  val maxConTasks = if (jobGroupId != null &&
+conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
+  } else {
+Int.MaxValue
+  }
+
+  if (maxConTasks <= 0) {
+throw new IllegalArgumentException(
+  "Maximum Concurrent Tasks should be set greater than 0 for the 
job to progress.")
+  }
+
+  if (jobGroupId == null || 
!conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+jobGroupId = DEFAULT_JOB_GROUP
+  }
+
+  jobIdToJobGroup(jobStart.jobId) = jobGroupId
+  if (!jobGroupToMaxConTasks.contains(jobGroupId)) {
--- End diff --

lemme put my concern another way:

why don't you remove the `if 
(!jobGroupToMaxConTasks.contains(jobGroupId))`, and just unconditionally always 
make the assignment `jobGroupToMaxConTasks(jobGroupId) = maxConTasks`?

that is simpler to reason about, and has all the properties we want.  I 
agree the scenario I'm describing is pretty weird, but the only difference I 
see between your version and this is in that scenario.  And its probably not 
the behavior we want in that scenario.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19280: [SPARK-21928][CORE] Set classloader on SerializerManager...

2017-09-20 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19280
  
reaching out to some potential reviewers: @vanzin @srowen @JoshRosen 
@mridulm @tgravescs 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps

2017-09-20 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19250
  
also cc @yhuai @liancheng would appreciate a review since you've looked at 
sql & hive compatibility in the past


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory usage to...

2017-09-20 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19160
  
lgtm

@jerryshao I didn't merge yet in case you want to keep discussing the 
naming, but I'm fine with this as is, so feel free to merge.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...

2017-09-20 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19160#discussion_r140051678
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -248,6 +251,16 @@ private[spark] class BlockManager(
 logInfo(s"Initialized BlockManager: $blockManagerId")
   }
 
+  def shuffleMetricsSource: Source = {
+import BlockManager._
+
+if (externalShuffleServiceEnabled) {
+  new ShuffleMetricsSource("ExternalShuffle", 
shuffleClient.shuffleMetrics())
+} else {
+  new ShuffleMetricsSource("NettyBlockTransfer", 
shuffleClient.shuffleMetrics())
--- End diff --

ok, I guess I haven't seen enough setups to know how users subscribe to 
these and whether they'd actually want that distinction.  (especially since it 
seems like some of the distinction probably shouldn't be there, eg. the 
openBlockLatency metric.)  But I don't think there is any clear right answer 
here, if you think this is the best naming, that is fine with me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139819360
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
   }
 
+  test("limit max concurrent running tasks in a job group when configured 
") {
+val conf = new SparkConf().
+  set(config.BLACKLIST_ENABLED, true).
+  set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max 
concurrent tasks to 2
+
+sc = new SparkContext("local", "test", conf)
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+val props = new Properties();
+props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // 
set the job group
+
+val tasks = Array.tabulate[Task[_]](10) { i =>
+  new FakeTask(0, i, Nil)
+}
+val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, 
props), 2)
+
+// make some offers to our taskset
+var taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  // offer each executor twice (simulating 2 cores per executor)
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up 
to maxConcurrentTasks.
+
+// make 4 more offers
+val taskDescs2 = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs2.size === 0) // tsm doesn't accept any as it is 
already running at max tasks
+
+// inform tsm that one task has completed
+val directTaskResult = createTaskResult(0)
+tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult)
+
+// make 4 more offers after previous task completed
+taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 1) // tsm accepts one as it can run one more 
task
+  }
+
+  test("do not limit max concurrent running tasks in a job group by 
default") {
+val conf = new SparkConf().
+  set(config.BLACKLIST_ENABLED, true)
+
+sc = new SparkContext("local", "test", conf)
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+
+val tasks = Array.tabulate[Task[_]](10) { i =>
+  new FakeTask(0, i, Nil)
+}
+val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, null), 
2)
+
+// make 5 offers to our taskset
+var taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host2"
+).flatMap { case (exec, host) =>
+  // offer each executor twice (simulating 3 cores per executor)
--- End diff --

update comments
5 offers -> 6 offers
twice -> three times


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139819793
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
   }
 
+  test("limit max concurrent running tasks in a job group when configured 
") {
+val conf = new SparkConf().
+  set(config.BLACKLIST_ENABLED, true).
+  set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max 
concurrent tasks to 2
+
+sc = new SparkContext("local", "test", conf)
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+val props = new Properties();
+props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // 
set the job group
+
+val tasks = Array.tabulate[Task[_]](10) { i =>
+  new FakeTask(0, i, Nil)
+}
+val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, 
props), 2)
+
+// make some offers to our taskset
+var taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  // offer each executor twice (simulating 2 cores per executor)
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up 
to maxConcurrentTasks.
+
+// make 4 more offers
+val taskDescs2 = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs2.size === 0) // tsm doesn't accept any as it is 
already running at max tasks
+
+// inform tsm that one task has completed
+val directTaskResult = createTaskResult(0)
+tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult)
+
+// make 4 more offers after previous task completed
+taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 1) // tsm accepts one as it can run one more 
task
+  }
+
+  test("do not limit max concurrent running tasks in a job group by 
default") {
--- End diff --

I don't think this test really adds anything beyond other tests in this 
suite.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139821899
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager(
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  jobStart.stageInfos.foreach(stageInfo => 
stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
+
+  var jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+null
+  }
+
+  val maxConTasks = if (jobGroupId != null &&
+conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
+  } else {
+Int.MaxValue
+  }
+
+  if (maxConTasks <= 0) {
+throw new IllegalArgumentException(
+  "Maximum Concurrent Tasks should be set greater than 0 for the 
job to progress.")
+  }
+
+  if (jobGroupId == null || 
!conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+jobGroupId = DEFAULT_JOB_GROUP
+  }
+
+  jobIdToJobGroup(jobStart.jobId) = jobGroupId
+  if (!jobGroupToMaxConTasks.contains(jobGroupId)) {
--- End diff --

this is probably a weird / unusual situation, but is this really the 
behavior you want if there are multiple jobs submitted for the same job group?  
Wouldn't you just take the conf for the job group at the time each job was 
submitted?

Worst case with this approach: say you are *always* submitting multiple 
jobs for each job group; when one finishes, you immediately start another one, 
so that the new one partially overlaps the old one.  Then even if you change 
the conf, all jobs will keep using the old value forever.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139818031
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -758,11 +825,52 @@ private[spark] class ExecutorAllocationManager(
   allocationManager.synchronized {
 stageIdToNumSpeculativeTasks(stageId) =
   stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1
+maxConcurrentTasks = getMaxConTasks
+logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on 
spec. task submitted.")
 allocationManager.onSchedulerBacklogged()
   }
 }
 
 /**
+ * Calculate the maximum no. of concurrent tasks that can run 
currently.
+ */
+def getMaxConTasks(): Int = {
+  // We can limit the no. of concurrent tasks by a job group. A job 
group can have multiple jobs
+  // with multiple stages. We need to get all the active stages 
belonging to a job group to
+  // calculate the total no. of pending + running tasks to decide the 
maximum no. of executors
+  // we need at that time to serve the outstanding tasks. This is 
capped by the minimum no. of
+  // outstanding tasks and the max concurrent limit specified for the 
job group if any.
+
+  def getIncompleteTasksForStage(stageId: Int, numTasks: Int): Int = {
+totalPendingTasks(stageId) + totalRunningTasks(stageId)
+  }
+
+  def sumIncompleteTasksForStages: (Int, (Int, Int)) => Int = 
(totalTasks, stageToNumTasks) => {
+val activeTasks = getIncompleteTasksForStage(stageToNumTasks._1, 
stageToNumTasks._2)
+sumOrMax(totalTasks, activeTasks)
+  }
+  // Get the total running & pending tasks for all stages in a job 
group.
+  def getIncompleteTasksForJobGroup(stagesItr: mutable.HashMap[Int, 
Int]): Int = {
+stagesItr.foldLeft(0)(sumIncompleteTasksForStages)
+  }
+
+  def sumIncompleteTasksForJobGroup: (Int, (String, 
mutable.HashMap[Int, Int])) => Int = {
+(maxConTasks, x) => {
+  val totalIncompleteTasksForJobGroup = 
getIncompleteTasksForJobGroup(x._2)
+  val maxTasks = Math.min(jobGroupToMaxConTasks(x._1), 
totalIncompleteTasksForJobGroup)
+  sumOrMax(maxConTasks, maxTasks)
+}
+  }
+
+  def sumOrMax(a: Int, b: Int): Int = if (doesSumOverflow(a, b)) 
Int.MaxValue else (a + b)
+
+  def doesSumOverflow(a: Int, b: Int): Boolean = b > (Int.MaxValue - a)
+
+  val stagesByJobGroup = stageIdToNumTasks.groupBy(x => 
jobIdToJobGroup(stageIdToJobId(x._1)))
--- End diff --

you could just store `stageIdToJobGroupId`.  Simplifies this a bit, and 
then you dont' need to store `jobIdToJobGroup` at all, I think


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19145: [spark-21933][yarn] Spark Streaming request more executo...

2017-09-19 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19145
  
I'm not sure I totally follow the sequence of events, but I get the feeling 
this should be handled in yarn, not spark.

Also, I agree with Jerry, it seems like your `completedContainerIdSet` may 
grow continuously.  You'll remove from it *if* you happen to get a duplicate 
message.  But I think in most cases you will not a get duplicate message, if I 
understand correctly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...

2017-09-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19160#discussion_r139800662
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -115,6 +115,7 @@ private[spark] class Executor(
   if (!isLocal) {
 env.metricsSystem.registerSource(executorSource)
 env.blockManager.initialize(conf.getAppId)
+env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource)
--- End diff --

super nit: can you put the `reigsterSource()` calls next to each other?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...

2017-09-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19160#discussion_r139796177
  
--- Diff: 
core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
 ---
@@ -18,11 +18,14 @@
 package org.apache.spark.network.netty
 
 import java.nio.ByteBuffer
+import java.util
--- End diff --

if you are trying to avoid confusion w/ scala's hashmaps, I think our 
convention is to rename w/ "J" prefix

```
import java.util.{HashMap => JHashMap}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...

2017-09-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19160#discussion_r139795417
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -248,6 +251,16 @@ private[spark] class BlockManager(
 logInfo(s"Initialized BlockManager: $blockManagerId")
   }
 
+  def shuffleMetricsSource: Source = {
+import BlockManager._
+
+if (externalShuffleServiceEnabled) {
+  new ShuffleMetricsSource("ExternalShuffle", 
shuffleClient.shuffleMetrics())
+} else {
+  new ShuffleMetricsSource("NettyBlockTransfer", 
shuffleClient.shuffleMetrics())
--- End diff --

do you think we really need to distinguish these two cases?  whether or not 
you have the external shuffle service, this memory is still owned by the 
executor JVM (its really only external on the remote end).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...

2017-09-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19160#discussion_r139799940
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/ExternalShuffleServiceSource.scala 
---
@@ -19,19 +19,19 @@ package org.apache.spark.deploy
 
 import javax.annotation.concurrent.ThreadSafe
 
-import com.codahale.metrics.MetricRegistry
+import com.codahale.metrics.{MetricRegistry, MetricSet}
 
 import org.apache.spark.metrics.source.Source
-import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
 
 /**
  * Provides metrics source for external shuffle service
  */
 @ThreadSafe
-private class ExternalShuffleServiceSource
-(blockHandler: ExternalShuffleBlockHandler) extends Source {
+private class ExternalShuffleServiceSource extends Source {
--- End diff --

just for my own understanding, not directly related to this change --

I hadn't realized that the ExternalShuffleBlockHandler had its own 
ShuffleMetrics already.  Some of those metrics really seem like they should be 
part of regular shuffle server, in the executor.  Eg., 
[openBlockRequestLatencyMillis](https://github.com/apache/spark/blob/master/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockHandler.java#L89).
  Do you know why its separate?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19211: [SPARK-18838][core] Add separate listener queues to Live...

2017-09-19 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19211
  
lgtm

do you know how much better this makes it?  Eg., if we had an existing case 
where things go haywire in dynamic allocation because of this -- we could see 
if after this change, the only dropped events are in eventLog, hopefully.  
Seems like the right change regardless, but it would be good to know the effect.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19211: [SPARK-18838][core] Add separate listener queues ...

2017-09-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19211#discussion_r139787945
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+
+import com.codahale.metrics.{Gauge, Timer}
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.util.Utils
+
+/**
+ * An asynchronous queue for events. All events posted to this queue will 
be delivered to the child
+ * listeners in a separate thread.
+ *
+ * Delivery will only begin when the `start()` method is called. The 
`stop()` method should be
+ * called when no more events need to be delivered.
+ */
+private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: 
LiveListenerBusMetrics)
+  extends SparkListenerBus
+  with Logging {
+
+  import AsyncEventQueue._
+
+  // Cap the capacity of the queue so we get an explicit error (rather 
than an OOM exception) if
+  // it's perpetually being added to more quickly than it's being drained.
+  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
+conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
+
+  // Keep the event count separately, so that waitUntilEmpty() can be 
implemented properly;
+  // this allows that method to return only when the events in the queue 
have been fully
+  // processed (instead of just dequeued).
+  private val eventCount = new AtomicLong()
+
+  /** A counter for dropped events. It will be reset every time we log it. 
*/
+  private val droppedEventsCounter = new AtomicLong(0L)
+
+  /** When `droppedEventsCounter` was logged last time in milliseconds. */
+  @volatile private var lastReportTimestamp = 0L
+
+  private val logDroppedEvent = new AtomicBoolean(false)
+
+  private var sc: SparkContext = null
+
+  private val started = new AtomicBoolean(false)
+  private val stopped = new AtomicBoolean(false)
+
+  private val droppedEvents = 
metrics.metricRegistry.counter(s"queue.$name.numDroppedEvents")
+  private val processingTime = 
metrics.metricRegistry.timer(s"queue.$name.listenerProcessingTime")
+
+  // Remove the queue size gauge first, in case it was created by a 
previous incarnation of
+  // this queue that was removed from the listener bus.
+  metrics.metricRegistry.remove(s"queue.$name.size")
+  metrics.metricRegistry.register(s"queue.$name.size", new Gauge[Int] {
+override def getValue: Int = eventQueue.size()
+  })
+
+  private val dispatchThread = new Thread(s"spark-listener-group-$name") {
+setDaemon(true)
+override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
+  dispatch()
+}
+  }
+
+  private def dispatch(): Unit = 
LiveListenerBus.withinListenerThread.withValue(true) {
+try {
+  var next: SparkListenerEvent = eventQueue.take()
+  while (next != POISON_PILL) {
+val ctx = processingTime.time()
+try {
+  super.postToAll(next)
+} finally {
+  ctx.stop()
+}
+eventCount.decrementAndGet()
+next = eventQueue.take()
+  }
+  eventCount.decrementAndGet()
+} catch {
+  case ie: InterruptedException =>
+logInfo(s"Stopping listener queue $name.", ie)
+}
+  }
+
+  override protected def getTimer(listener: SparkListenerInterface): 
Option[Timer] = {
+
metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface]))
+  }
+

[GitHub] spark pull request #19280: [SPARK-21928][CORE] Set classloader on Serializer...

2017-09-19 Thread squito
GitHub user squito opened a pull request:

https://github.com/apache/spark/pull/19280

[SPARK-21928][CORE] Set classloader on SerializerManager private kryo

## What changes were proposed in this pull request?

We have to make sure thatthat SerializerManager's private instance of
kryo also uses the right classloader, regardless of the current thread
classloader.  In particular, this fixes serde during remote cache
fetches, as those occur in netty threads.

## How was this patch tested?

Manual tests & existing suite via jenkins.  I haven't been able to 
reproduce this is in a unit test, because when a remote RDD partition can be 
fetched, there is a warning message and then the partition is just recomputed 
locally.  I manually verified the warning message is no longer present.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/squito/spark SPARK-21928_ser_classloader

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19280.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19280


commit acbaf8b65629344d760360b768e89f1712af8942
Author: Imran Rashid 
Date:   2017-09-19T15:19:43Z

[SPARK-21928][CORE] Set classloader on SerializerManager private kryo

We have to make sure thatthat SerializerManager's private instance of
kryo also uses the right classloader, regardless of the current thread
classloader.  In particular, this fixes serde during remote cache
fetches, as those occur in netty threads.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps

2017-09-19 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19250
  
Hi @ueshin @rxin, could you please review?  thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19250: [SPARK-12297] Table timezone correction for Times...

2017-09-15 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19250#discussion_r139234142
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
@@ -92,7 +92,7 @@ case class CreateHiveTableAsSelectCommand(
   }
 
   override def argString: String = {
-s"[Database:${tableDesc.database}}, " +
+s"[Database:${tableDesc.database}, " +
--- End diff --

totally unrelated typo fix, but didn't seem worth an independent pr


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19250: [SPARK-12297] Table timezone correction for Times...

2017-09-15 Thread squito
GitHub user squito opened a pull request:

https://github.com/apache/spark/pull/19250

[SPARK-12297] Table timezone correction for Timestamps

## What changes were proposed in this pull request?

When reading and writing data, spark will adjust timestamp data based on 
the delta between the current session timezone and the table time zone 
(specified either by a persistent table property, or an option to the 
DataFrameReader / Writer).  This is particularly important for parquet data, so 
that it can be treated equivalently by other SQL engines (eg. Impala and Hive). 
 Furthermore, this is useful if the same data is processed by multiple clusters 
in different time zones, and "timestamp without time zone" semantics are 
desired.

## How was this patch tested?

Unit tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/squito/spark timestamp_all_formats

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19250.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19250


commit 54f87c2c1e0ab0645fa5497553cf031f13e98c3b
Author: Imran Rashid 
Date:   2017-08-28T19:52:15Z

SPARK-12297.  Table timezones.

commit 53b9fbe0c6128ec11afdb46d3239c693129f6952
Author: Imran Rashid 
Date:   2017-09-14T20:18:46Z

All dataformats support timezone correction.  Move rules & tests to a
more appropriate location.  Ensure rule works without hive support.
Extra checks on when table timezones are set.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18935: [SPARK-9104][CORE] Expose Netty memory metrics in Spark

2017-09-05 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/18935
  
lgtm

any more thoughts @jiangxb1987 @zsxwing  ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16992: [SPARK-19662][SCHEDULER][TEST] Add Fair Scheduler Unit T...

2017-08-28 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/16992
  
merged to master

thanks @erenavsarogullari , sorry again for the delays


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

2017-08-22 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18950#discussion_r134501736
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -724,6 +777,62 @@ private[spark] class ExecutorAllocationManager(
 }
 
 /**
+ * Calculate the maximum no. of concurrent tasks that can run 
currently.
+ */
+def getMaxConTasks(): Int = {
+  val stagesByJobGroup = stageIdToNumTasks.groupBy(x => 
jobIdToJobGroup(stageIdToJobId(x._1)))
--- End diff --

I think this needs a comment explaining why you need to look at stages at 
all -- its not obvious why its necessary.  (at first I was going to suggest the 
number of tasks left in a stage shouldn't matter, but then realized that it 
could in some scenarios)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

2017-08-22 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18950#discussion_r134501323
  
--- Diff: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
@@ -188,10 +192,195 @@ class ExecutorAllocationManagerSuite
 assert(numExecutorsTarget(manager) === 10)
   }
 
+  test("add executors capped by max concurrent tasks for a job group with 
single core executors") {
+val conf = new SparkConf()
+  .setMaster("myDummyLocalExternalClusterManager")
+  .setAppName("test-executor-allocation-manager")
+  .set("spark.dynamicAllocation.enabled", "true")
+  .set("spark.dynamicAllocation.testing", "true")
+  .set("spark.job.group1.maxConcurrentTasks", "2")
+  .set("spark.job.group2.maxConcurrentTasks", "5")
+val sc = new SparkContext(conf)
+contexts += sc
+sc.setJobGroup("group1", "", false)
+
+val manager = sc.executorAllocationManager.get
+val stages = Seq(createStageInfo(0, 10), createStageInfo(1, 10))
+// Submit the job and stage start/submit events
+sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, stages, 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(0)))
+
+// Verify that we're capped at number of max concurrent tasks in the 
stage
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+// Submit another stage in the same job
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stages(1)))
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(0)))
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stages(1)))
+sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded))
+
+// Submit a new job in the same job group
+val stage2 = createStageInfo(2, 20)
+sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq{stage2}, 
sc.getLocalProperties))
--- End diff --

nit: still a few `Seq{}` you missed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

2017-08-22 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18950#discussion_r134498890
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -724,6 +777,62 @@ private[spark] class ExecutorAllocationManager(
 }
 
 /**
+ * Calculate the maximum no. of concurrent tasks that can run 
currently.
+ */
+def getMaxConTasks(): Int = {
+  val stagesByJobGroup = stageIdToNumTasks.groupBy(x => 
jobIdToJobGroup(stageIdToJobId(x._1)))
+
+  def getMaxConTasks(maxConTasks: Int,
+stagesByJobGroupItr: Iterator[(String, mutable.HashMap[Int, 
Int])]): Int = {
--- End diff --

style nit: with multi-line method definitions, each param goes on its own 
line, double-indented:

```scala
def getMaxConTasks(
maxConTasks: Int,
stagesByJobGroupItr: Iterator[(String, mutable.HashMap[Int, Int])]): 
Int = {
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18950: [SPARK-20589][Core][Scheduler] Allow limiting task concu...

2017-08-22 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/18950
  
@dhruve looks like a real test failure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18935: [SPARK-9104][CORE] Expose Netty memory metrics in Spark

2017-08-22 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/18935
  
thanks for the added info @jerryshao.

(a) ok makes sense now about the use of this -- it is not exposed now, you 
plan to expose it in future changes.  That is fine, however I would like to at 
least see a *plan* for how you want to expose this before we merge this.  I 
think past efforts have gotten tripped on how we'd expose it in the driver; 
perhaps the simplest thing to do is expose it to the metric system?  There just 
isn't any point in this unless we're going to do something with it.

(b) thanks for including the example of the metrics.  But one minor thing I 
noticed -- are you at all surprised that the direct & heap memory are exactly 
the same?  Are things getting mixed up somewhere?  Or maybe those are just the 
values from the initialization netty always does.  I definitely think that is 
more metrics than most users care about, and would want verbose off by default. 
 I am wondering if those metrics are useful at all ... but I guess they are 
pretty cheap, and maybe there is some scenario where a user would care about 
some of them, eg. `numHugeAllocations`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18887: [SPARK-20642][core] Store FsHistoryProvider listing data...

2017-08-21 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/18887
  
I futzed around for a while with trying to keep the old stuff around, and I 
realized it really would be quite a mess.  The biggest problem is that the old 
rest api is just waay too tied into the UI, eg 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/status/api/v1/ExecutorListResource.scala#L26
 , probably my fault in that implementation of the rest apis.  You could keep 
the old code around, but would involve so much moving and refactoring that it 
seems pointless.

I'm still looking at the other changes in the larger project, and I'd 
encourage other reviewers to do the same.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16992: [SPARK-19662][SCHEDULER][TEST] Add Fair Scheduler Unit T...

2017-08-18 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/16992
  
been a while so lets run tests again just to check:
Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16992: [SPARK-19662][SCHEDULER][TEST] Add Fair Scheduler...

2017-08-18 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/16992#discussion_r134056863
  
--- Diff: docs/job-scheduling.md ---
@@ -235,7 +235,7 @@ properties:
   of the cluster. By default, each pool's `minShare` is 0.
 
 The pool properties can be set by creating an XML file, similar to 
`conf/fairscheduler.xml.template`,
-and setting a `spark.scheduler.allocation.file` property in your
+and either setting `fairscheduler.xml` into classpath or a 
`spark.scheduler.allocation.file` property in your
--- End diff --

super nit:

... and either putting a file named `fairscheduler.xml` on the classpath, 
or setting `spark.scheduler.allocation.file` ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18935: [SPARK-9104][CORE] Expose Netty memory metrics in...

2017-08-18 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18935#discussion_r134052216
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/util/NettyMemoryMetrics.java
 ---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.util;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.*;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.MetricSet;
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.PoolArenaMetric;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocatorMetric;
+
+/**
+ * A Netty memory metrics class to collect metrics from Netty 
PooledByteBufAllocator.
+ */
+public class NettyMemoryMetrics implements MetricSet {
+
+  private final PooledByteBufAllocator pooledAllocator;
+
+  private final boolean verboseMetricsEnabled;
+
+  private final Map allMetrics;
+
+  private final String metricPrefix;
+
+  @VisibleForTesting
+  final static Set VERBOSE_METRICS = new HashSet<>();
+  static {
+VERBOSE_METRICS.addAll(Arrays.asList(
+  "numAllocations",
+  "numTinyAllocations",
+  "numSmallAllocations",
+  "numNormalAllocations",
+  "numHugeAllocations",
+  "numDeallocations",
+  "numTinyDeallocations",
+  "numSmallDeallocations",
+  "numNormalDeallocations",
+  "numHugeDeallocations",
+  "numActiveAllocations",
+  "numActiveTinyAllocations",
+  "numActiveSmallAllocations",
+  "numActiveNormalAllocations",
+  "numActiveHugeAllocations",
+  "numActiveBytes"));
+  }
+
+  public NettyMemoryMetrics(PooledByteBufAllocator pooledAllocator,
+  String metricPrefix,
+  TransportConf conf) {
+this.pooledAllocator = pooledAllocator;
+this.allMetrics = new HashMap<>();
+this.metricPrefix = metricPrefix;
+this.verboseMetricsEnabled = conf.verboseMetrics();
+
+registerMetrics(this.pooledAllocator);
+  }
+
+  private void registerMetrics(PooledByteBufAllocator allocator) {
+PooledByteBufAllocatorMetric pooledAllocatorMetric = 
allocator.metric();
+
+// Register general metrics.
+allMetrics.put(MetricRegistry.name(metricPrefix, "usedHeapMemory"),
+  (Gauge) () -> pooledAllocatorMetric.usedHeapMemory());
+allMetrics.put(MetricRegistry.name(metricPrefix, "usedDirectMemory"),
+  (Gauge) () -> pooledAllocatorMetric.usedDirectMemory());
+
+if (verboseMetricsEnabled) {
+  int directArenaIndex = 0;
+  for (PoolArenaMetric metric : pooledAllocatorMetric.directArenas()) {
+registerArenaMetric(metric, "directArena" + directArenaIndex);
+directArenaIndex++;
+  }
+
+  int heapArenaIndex = 0;
+  for (PoolArenaMetric metric : pooledAllocatorMetric.heapArenas()) {
+registerArenaMetric(metric, "heapArena" + heapArenaIndex);
+heapArenaIndex++;
+  }
+}
+  }
+
+  private void registerArenaMetric(PoolArenaMetric arenaMetric, String 
arenaName) {
+for (String methodName : VERBOSE_METRICS) {
+  Method m;
+  try {
+m = PoolArenaMetric.class.getMethod(methodName);
+  } catch (Exception e) {
+// Failed to find metric related method, ignore this metric.
+continue;
+  }
+
+  if (!Modifier.isPublic(m.getModifiers())) {
+// Ignore non-public methods.
   

[GitHub] spark pull request #18935: [SPARK-9104][CORE] Expose Netty memory metrics in...

2017-08-18 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18935#discussion_r134051537
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
 ---
@@ -106,6 +105,12 @@ public TransportClientFactory(
 conf.getModuleName() + "-client");
 this.pooledAllocator = NettyUtils.createPooledByteBufAllocator(
   conf.preferDirectBufs(), false /* allowCache */, 
conf.clientThreads());
+this.metrics = new NettyMemoryMetrics(
--- End diff --

it looks to me like this doesn't get exposed at all for the user to query 
-- eg. I dont' see it getting pulled into the MetricSystem.  is that happening 
somehow?  or this exposed another way?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18935: [SPARK-9104][CORE] Expose Netty memory metrics in...

2017-08-18 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18935#discussion_r134050644
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/util/NettyMemoryMetrics.java
 ---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.util;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.*;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.MetricSet;
+import com.google.common.annotations.VisibleForTesting;
+import io.netty.buffer.PoolArenaMetric;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocatorMetric;
+
+/**
+ * A Netty memory metrics class to collect metrics from Netty 
PooledByteBufAllocator.
+ */
+public class NettyMemoryMetrics implements MetricSet {
+
+  private final PooledByteBufAllocator pooledAllocator;
+
+  private final boolean verboseMetricsEnabled;
+
+  private final Map allMetrics;
+
+  private final String metricPrefix;
+
+  @VisibleForTesting
+  final static Set VERBOSE_METRICS = new HashSet<>();
+  static {
+VERBOSE_METRICS.addAll(Arrays.asList(
+  "numAllocations",
+  "numTinyAllocations",
+  "numSmallAllocations",
+  "numNormalAllocations",
+  "numHugeAllocations",
+  "numDeallocations",
+  "numTinyDeallocations",
+  "numSmallDeallocations",
+  "numNormalDeallocations",
+  "numHugeDeallocations",
+  "numActiveAllocations",
+  "numActiveTinyAllocations",
+  "numActiveSmallAllocations",
+  "numActiveNormalAllocations",
+  "numActiveHugeAllocations",
+  "numActiveBytes"));
+  }
+
+  public NettyMemoryMetrics(PooledByteBufAllocator pooledAllocator,
+  String metricPrefix,
+  TransportConf conf) {
+this.pooledAllocator = pooledAllocator;
+this.allMetrics = new HashMap<>();
+this.metricPrefix = metricPrefix;
+this.verboseMetricsEnabled = conf.verboseMetrics();
+
+registerMetrics(this.pooledAllocator);
+  }
+
+  private void registerMetrics(PooledByteBufAllocator allocator) {
+PooledByteBufAllocatorMetric pooledAllocatorMetric = 
allocator.metric();
+
+// Register general metrics.
+allMetrics.put(MetricRegistry.name(metricPrefix, "usedHeapMemory"),
+  (Gauge) () -> pooledAllocatorMetric.usedHeapMemory());
+allMetrics.put(MetricRegistry.name(metricPrefix, "usedDirectMemory"),
+  (Gauge) () -> pooledAllocatorMetric.usedDirectMemory());
+
+if (verboseMetricsEnabled) {
+  int directArenaIndex = 0;
+  for (PoolArenaMetric metric : pooledAllocatorMetric.directArenas()) {
+registerArenaMetric(metric, "directArena" + directArenaIndex);
+directArenaIndex++;
+  }
+
+  int heapArenaIndex = 0;
+  for (PoolArenaMetric metric : pooledAllocatorMetric.heapArenas()) {
+registerArenaMetric(metric, "heapArena" + heapArenaIndex);
+heapArenaIndex++;
+  }
+}
+  }
+
+  private void registerArenaMetric(PoolArenaMetric arenaMetric, String 
arenaName) {
+for (String methodName : VERBOSE_METRICS) {
+  Method m;
+  try {
+m = PoolArenaMetric.class.getMethod(methodName);
+  } catch (Exception e) {
+// Failed to find metric related method, ignore this metric.
+continue;
+  }
+
+  if (!Modifier.isPublic(m.getModifiers())) {
+// Ignore non-public methods.

[GitHub] spark issue #18887: [SPARK-20642][core] Store FsHistoryProvider listing data...

2017-08-17 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/18887
  
Just to keep others in the loop, Marcelo and I talked about this some 
offline.  I think this PR itself is fine, but to me this is an important point 
in the larger history server project he's doing, so I'm going to take a look at 
the rest of the changes as well before I feel comfortable merging.  Also he 
explained why it extremely complicated to keep both the old & new version 
available, which make sense to me, though I may poke at a version myself just 
to see how complex it is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-08-17 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r133725104
  
--- Diff: scalastyle-config.xml ---
@@ -86,7 +86,7 @@ This file is divided into 3 sections:
   
 
   
-
+
--- End diff --

ok makes sense.  I wasn't paying enough attention to the actual change at 
first, thought you were just turning the rule entirely off, but I agree this 
changes makes sense.

btw I was curious why `package object config` was OK -- its just a 
hard-coded special case: 
https://github.com/scalastyle/scalastyle/blob/master/src/main/scala/org/scalastyle/scalariform/ClassNamesChecker.scala#L74


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18887: [SPARK-20642][core] Store FsHistoryProvider listing data...

2017-08-16 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/18887
  
>> I feel like we should have one version where the old code is still 
available, controlled by a feature flag.

>I'm not sure exactly what you're suggesting. The default behavior is 
still, as much as possible, the old one: everything is kept in memory. Keeping 
the exact old code in place would mean forking FsHistoryProvider, which is not 
something I see as desirable.

long-term, definitely not desirable.  My concern is that there is a lot of 
new code taking effect here, on some important functionality -- we don't want 
some bug to prevent adoption of 2.3. For one version, you could leave the old 
one available, rename it to something else, and put a big disclaimer in the 
code that its obsolete, and as long as thing are smooth for 2.3 delete it 
entirely for 2.4.  The HS isn't as core or tricky as the network module but I'm 
thinking of this like netty vs. nio.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-08-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r133512286
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -316,25 +350,22 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 try {
   val newLastScanTime = getNewLastScanTime()
   logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
-  val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
-.getOrElse(Seq.empty[FileStatus])
+  val statusList = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq).getOrElse(Nil)
   // scan for modified applications, replay and merge them
-  val logInfos: Seq[FileStatus] = statusList
+  val logInfos = statusList
 .filter { entry =>
-  val fileInfo = fileToAppInfo.get(entry.getPath())
-  val prevFileSize = if (fileInfo != null) fileInfo.fileSize else 
0L
   !entry.isDirectory() &&
 // FsHistoryProvider generates a hidden file which can't be 
read.  Accidentally
 // reading a garbage file is safe, but we would log an error 
which can be scary to
 // the end-user.
 !entry.getPath().getName().startsWith(".") &&
-prevFileSize < entry.getLen() &&
-SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
+SparkHadoopUtil.get.checkAccessPermission(entry, 
FsAction.READ) &&
+recordedFileSize(entry.getPath()) < entry.getLen()
 }
 .flatMap { entry => Some(entry) }
--- End diff --

realize this isn't your change, but what is the point of this?  isn't it a 
no-op?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-08-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r133517697
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -742,53 +696,146 @@ private[history] object FsHistoryProvider {
   private val APPL_END_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerApplicationEnd\""
 
   private val LOG_START_EVENT_PREFIX = 
"{\"Event\":\"SparkListenerLogStart\""
+
+  /** Current version of the data written to the listing database. */
+  private val CURRENT_LISTING_VERSION = 1L
 }
 
 /**
- * Application attempt information.
- *
- * @param logPath path to the log file, or, for a legacy log, its directory
- * @param name application name
- * @param appId application ID
- * @param attemptId optional attempt ID
- * @param startTime start time (from playback)
- * @param endTime end time (from playback). -1 if the application is 
incomplete.
- * @param lastUpdated the modification time of the log file when this 
entry was built by replaying
- *the history.
- * @param sparkUser user running the application
- * @param completed flag to indicate whether or not the application has 
completed.
- * @param fileSize the size of the log file the last time the file was 
scanned for changes
+ * A KVStoreSerializer that provides Scala types serialization too, and 
uses the same options as
+ * the API serializer.
  */
-private class FsApplicationAttemptInfo(
+private class KVStoreScalaSerializer extends KVStoreSerializer {
+
+  mapper.registerModule(DefaultScalaModule)
+  mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
+  mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat)
+
+}
+
+private[history] case class KVStoreMetadata(
+  val version: Long,
+  val logDir: String)
+
+private[history] case class LogInfo(
+  @KVIndexParam val logPath: String,
+  val fileSize: Long)
+
+private[history] class AttemptInfoWrapper(
+val info: v1.ApplicationAttemptInfo,
 val logPath: String,
-val name: String,
-val appId: String,
-attemptId: Option[String],
-startTime: Long,
-endTime: Long,
-lastUpdated: Long,
-sparkUser: String,
-completed: Boolean,
-val fileSize: Long,
-appSparkVersion: String)
-  extends ApplicationAttemptInfo(
-  attemptId, startTime, endTime, lastUpdated, sparkUser, completed, 
appSparkVersion) {
-
-  /** extend the superclass string value with the extra attributes of this 
class */
-  override def toString: String = {
-s"FsApplicationAttemptInfo($name, $appId," +
-  s" ${super.toString}, source=$logPath, size=$fileSize"
+val fileSize: Long) {
+
+  def toAppAttemptInfo(): ApplicationAttemptInfo = {
+ApplicationAttemptInfo(info.attemptId, info.startTime.getTime(),
+  info.endTime.getTime(), info.lastUpdated.getTime(), info.sparkUser,
+  info.completed, info.appSparkVersion)
   }
+
 }
 
-/**
- * Application history information
- * @param id application ID
- * @param name application name
- * @param attempts list of attempts, most recent first.
- */
-private class FsApplicationHistoryInfo(
-id: String,
-override val name: String,
-override val attempts: List[FsApplicationAttemptInfo])
-  extends ApplicationHistoryInfo(id, name, attempts)
+private[history] class ApplicationInfoWrapper(
+val info: v1.ApplicationInfo,
+val attempts: List[AttemptInfoWrapper]) {
+
+  @JsonIgnore @KVIndexParam
+  def id: String = info.id
+
+  @JsonIgnore @KVIndexParam("endTime")
+  def endTime(): Long = attempts.head.info.endTime.getTime()
+
+  @JsonIgnore @KVIndexParam("oldestAttempt")
+  def oldestAttempt(): Long = 
attempts.map(_.info.lastUpdated.getTime()).min
+
+  def toAppHistoryInfo(): ApplicationHistoryInfo = {
+ApplicationHistoryInfo(info.id, info.name, 
attempts.map(_.toAppAttemptInfo()))
+  }
+
+  def toApiInfo(): v1.ApplicationInfo = {
+new v1.ApplicationInfo(info.id, info.name, info.coresGranted, 
info.maxCores,
+  info.coresPerExecutor, info.memoryPerExecutorMB, 
attempts.map(_.info))
+  }
+
+}
+
+private[history] class AppListingListener(log: FileStatus, clock: Clock) 
extends SparkListener {
+
+  private val app = new MutableApplicationInfo()
+  private val attempt = new MutableAttemptInfo(log.getPath().getName(), 
log.getLen())
+
+  override def onApplicationStart(event: SparkListenerApplicationStart): 
Unit = {
+app.id = event.

[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-08-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r133510977
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -301,6 +334,7 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   }
 
   override def stop(): Unit = {
+listing.close()
--- End diff --

if this throws an exception, should we still try to cleanup `initThread`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-08-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r133511528
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -117,17 +122,37 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   // used for logging msgs (logs are re-scanned based on file size, rather 
than modtime)
   private val lastScanTime = new java.util.concurrent.atomic.AtomicLong(-1)
 
-  // Mapping of application IDs to their metadata, in descending end time 
order. Apps are inserted
-  // into the map in order, so the LinkedHashMap maintains the correct 
ordering.
-  @volatile private var applications: mutable.LinkedHashMap[String, 
FsApplicationHistoryInfo]
-= new mutable.LinkedHashMap()
+  private val pendingReplayTasksCount = new 
java.util.concurrent.atomic.AtomicInteger(0)
 
-  val fileToAppInfo = new ConcurrentHashMap[Path, 
FsApplicationAttemptInfo]()
+  private val storePath = conf.get(LOCAL_STORE_DIR)
 
-  // List of application logs to be deleted by event log cleaner.
-  private var attemptsToClean = new 
mutable.ListBuffer[FsApplicationAttemptInfo]
+  private val listing = storePath.map { path =>
--- End diff --

given the large initializer, could you add an explicit type annotation to 
`listing` to help the reader?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

2017-08-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18950#discussion_r133505348
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -1214,6 +1214,101 @@ class TaskSetManagerSuite extends SparkFunSuite 
with LocalSparkContext with Logg
 verify(taskSetManagerSpy, times(1)).addPendingTask(anyInt())
   }
 
+  test("limit max concurrent running tasks in a job group when configured 
") {
+val conf = new SparkConf().
+  set(config.BLACKLIST_ENABLED, true).
+  set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max 
concurrent tasks to 2
+
+sc = new SparkContext("local", "test", conf)
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+val props = new Properties();
+props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // 
set the job group
+
+val tasks = Array.tabulate[Task[_]](10) { i =>
+  new FakeTask(0, i, Nil)
+}
+val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, 
props), 2)
+
+// make some offers to our taskset
+var taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  // offer each executor twice (simulating 2 cores per executor)
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up 
to maxConcurrentTasks.
+
+// make 4 more offers
+val taskDescs2 = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs2.size === 0) // tsm doesn't accept any as it is 
already running at max tasks
+
+// inform tsm that one task has completed
+val directTaskResult = new DirectTaskResult[String](null, Seq()) {
--- End diff --

you can use `createTaskResult`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

2017-08-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18950#discussion_r133504041
  
--- Diff: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
@@ -188,6 +188,125 @@ class ExecutorAllocationManagerSuite
 assert(numExecutorsTarget(manager) === 10)
   }
 
+  test("add executors capped by max concurrent tasks for a job group with 
single core executors") {
+val conf = new SparkConf()
+  .setMaster("myDummyLocalExternalClusterManager")
+  .setAppName("test-executor-allocation-manager")
+  .set("spark.dynamicAllocation.enabled", "true")
+  .set("spark.dynamicAllocation.testing", "true")
+  .set("spark.job.group1.maxConcurrentTasks", "2")
+  .set("spark.job.group2.maxConcurrentTasks", "5")
+val sc = new SparkContext(conf)
+contexts += sc
+sc.setJobGroup("group1", "", false)
+
+val manager = sc.executorAllocationManager.get
+val stage0 = createStageInfo(0, 10)
+// Submit the job and stage start/submit events
+sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq{stage0}, 
sc.getLocalProperties))
--- End diff --

nit: parens, not braces: `Seq(stage0)`
(throughout this test)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

2017-08-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18950#discussion_r133504438
  
--- Diff: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---
@@ -188,6 +188,125 @@ class ExecutorAllocationManagerSuite
 assert(numExecutorsTarget(manager) === 10)
   }
 
+  test("add executors capped by max concurrent tasks for a job group with 
single core executors") {
+val conf = new SparkConf()
+  .setMaster("myDummyLocalExternalClusterManager")
+  .setAppName("test-executor-allocation-manager")
+  .set("spark.dynamicAllocation.enabled", "true")
+  .set("spark.dynamicAllocation.testing", "true")
+  .set("spark.job.group1.maxConcurrentTasks", "2")
+  .set("spark.job.group2.maxConcurrentTasks", "5")
+val sc = new SparkContext(conf)
+contexts += sc
+sc.setJobGroup("group1", "", false)
+
+val manager = sc.executorAllocationManager.get
+val stage0 = createStageInfo(0, 10)
+// Submit the job and stage start/submit events
+sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq{stage0}, 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0))
+
+// Verify that we're capped at number of max concurrent tasks in the 
stage
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+// Submit another stage in the same job
+val stage1 = createStageInfo(1, 10)
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1))
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage0))
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage1))
+sc.listenerBus.postToAll(SparkListenerJobEnd(0, 10, JobSucceeded))
+
+// Submit a new job in the same job group
+val stage2 = createStageInfo(2, 20)
+sc.listenerBus.postToAll(SparkListenerJobStart(1, 0, Seq{stage2}, 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage2))
+assert(maxNumExecutorsNeeded(manager) === 2)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage2))
+sc.listenerBus.postToAll(SparkListenerJobEnd(1, 10, JobSucceeded))
+
+// Set another jobGroup
+sc.setJobGroup("group2", "", false)
+
+val stage3 = createStageInfo(3, 20)
+sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq{stage3}, 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage3))
+assert(maxNumExecutorsNeeded(manager) === 5)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage3))
+sc.listenerBus.postToAll(SparkListenerJobEnd(2, 10, JobSucceeded))
+
+// Clear jobGroup
+sc.clearJobGroup()
+
+val stage4 = createStageInfo(4, 50)
+sc.listenerBus.postToAll(SparkListenerJobStart(2, 0, Seq{stage4}, 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage4))
+assert(maxNumExecutorsNeeded(manager) === 50)
+  }
+
+  test("add executors capped by max concurrent tasks for a job group with 
multi cores executors") {
+val conf = new SparkConf()
+  .setMaster("myDummyLocalExternalClusterManager")
+  .setAppName("test-executor-allocation-manager")
+  .set("spark.dynamicAllocation.enabled", "true")
+  .set("spark.dynamicAllocation.testing", "true")
+  .set("spark.job.group1.maxConcurrentTasks", "2")
+  .set("spark.job.group2.maxConcurrentTasks", "5")
+  .set("spark.executor.cores", "3")
+val sc = new SparkContext(conf)
+contexts += sc
+sc.setJobGroup("group1", "", false)
+
+val manager = sc.executorAllocationManager.get
+val stage0 = createStageInfo(0, 10)
+// Submit the job and stage start/submit events
+sc.listenerBus.postToAll(SparkListenerJobStart(0, 0, Seq{stage0}, 
sc.getLocalProperties))
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage0))
+
+// Verify that we're capped at number of max concurrent tasks in the 
stage
+assert(maxNumExecutorsNeeded(manager) === 1)
+
+// Submit another stage in the same job
+val stage1 = createStageInfo(1, 10)
+sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1))
+assert(maxNumExecutorsNeeded(manager) === 1)
+
+sc.listenerBus.postToAll(SparkListenerStageCompleted(stage0))
+sc

[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

2017-08-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18950#discussion_r133502797
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -314,7 +316,7 @@ private[spark] class ExecutorAllocationManager(
   // Do not change our target while we are still initializing,
   // Otherwise the first job may have to ramp up unnecessarily
   0
-} else if (maxNeeded < numExecutorsTarget) {
+} else if (maxNeeded <= numExecutorsTarget) {
   // The target number exceeds the number we actually need, so stop 
adding new
--- End diff --

yeah I don't think this change hurts, but unless you have a reason for 
changing it, I'd prefer to leave it just for when we need to dig through 
history.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

2017-08-16 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18950#discussion_r133499828
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -602,6 +604,21 @@ private[spark] class ExecutorAllocationManager(
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  val jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+""
+  }
+  val maxConcurrentTasks = 
conf.getInt(s"spark.job.$jobGroupId.maxConcurrentTasks",
+Int.MaxValue)
+
+  logInfo(s"Setting maximum concurrent tasks for group: ${jobGroupId} 
to $maxConcurrentTasks")
+  allocationManager.synchronized {
+allocationManager.maxConcurrentTasks = maxConcurrentTasks
--- End diff --

yeah mark is right.  after all, that is what separates a job group property 
from a global property for the entire spark context.

I see why this is desirable for the most common case, of just running one 
job at a time, but to get this to work with multiple concurrent jobs (& job 
groups) you need to track a map from jobGroup -> maxConcurrency, and then sum 
that up (handling overflow for Int.MaxValue)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-08-15 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r133316362
  
--- Diff: scalastyle-config.xml ---
@@ -86,7 +86,7 @@ This file is divided into 3 sections:
   
 
   
-
+
--- End diff --

Agree it makes more sense to make the whole thing `private[spark]`, but 
couldn't you just turn the rule off around this object, like we do for println?

(Also I'm confused why `package object config` doesn't violate the rule).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

2017-08-15 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18950#discussion_r133311545
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -602,6 +604,21 @@ private[spark] class ExecutorAllocationManager(
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  val jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+""
+  }
+  val maxConcurrentTasks = 
conf.getInt(s"spark.job.$jobGroupId.maxConcurrentTasks",
+Int.MaxValue)
+
+  logInfo(s"Setting maximum concurrent tasks for group: ${jobGroupId} 
to $maxConcurrentTasks")
+  allocationManager.synchronized {
+allocationManager.maxConcurrentTasks = maxConcurrentTasks
--- End diff --

if you have multiple jobs running concurrently, you are not keeping a 
separate maxConcurrentTasks for each one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18950: [SPARK-20589][Core][Scheduler] Allow limiting tas...

2017-08-15 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18950#discussion_r133311882
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -454,64 +477,68 @@ private[spark] class TaskSetManager(
 }
   }
 
-  dequeueTask(execId, host, allowedLocality).map { case ((index, 
taskLocality, speculative)) =>
-// Found a task; do some bookkeeping and return a task description
-val task = tasks(index)
-val taskId = sched.newTaskId()
-// Do various bookkeeping
-copiesRunning(index) += 1
-val attemptNum = taskAttempts(index).size
-val info = new TaskInfo(taskId, index, attemptNum, curTime,
-  execId, host, taskLocality, speculative)
-taskInfos(taskId) = info
-taskAttempts(index) = info :: taskAttempts(index)
-// Update our locality level for delay scheduling
-// NO_PREF will not affect the variables related to delay 
scheduling
-if (maxLocality != TaskLocality.NO_PREF) {
-  currentLocalityIndex = getLocalityIndex(taskLocality)
-  lastLaunchTime = curTime
-}
-// Serialize and return the task
-val serializedTask: ByteBuffer = try {
-  ser.serialize(task)
-} catch {
-  // If the task cannot be serialized, then there's no point to 
re-attempt the task,
-  // as it will always fail. So just abort the whole task-set.
-  case NonFatal(e) =>
-val msg = s"Failed to serialize task $taskId, not attempting 
to retry it."
-logError(msg, e)
-abort(s"$msg Exception during serialization: $e")
-throw new TaskNotSerializableException(e)
-}
-if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 
1024 &&
-  !emittedTaskSizeWarning) {
-  emittedTaskSizeWarning = true
-  logWarning(s"Stage ${task.stageId} contains a task of very large 
size " +
-s"(${serializedTask.limit / 1024} KB). The maximum recommended 
task size is " +
-s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
-}
-addRunningTask(taskId)
-
-// We used to log the time it takes to serialize the task, but 
task size is already
-// a good proxy to task serialization time.
-// val timeTaken = clock.getTime() - startTime
-val taskName = s"task ${info.id} in stage ${taskSet.id}"
-logInfo(s"Starting $taskName (TID $taskId, $host, executor 
${info.executorId}, " +
-  s"partition ${task.partitionId}, $taskLocality, 
${serializedTask.limit} bytes)")
-
-sched.dagScheduler.taskStarted(task, info)
-new TaskDescription(
-  taskId,
-  attemptNum,
-  execId,
-  taskName,
-  index,
-  addedFiles,
-  addedJars,
-  task.localProperties,
-  serializedTask)
+  dequeueTask(execId, host, allowedLocality).map {
--- End diff --

unintentional reformat?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18874: [SPARK-21656][CORE] spark dynamic allocation should not ...

2017-08-15 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/18874
  
@srowen you have a good point about a case that becomes worse after this 
change.  Still I think this change is better on balance.

btw, there are more even more odd cases with dynamic allocation right now 
-- the one that I've seen most often is if you have a run really short tasks, 
but all in sequence, you probably won't release any executors.  Say you first 
run some really large job with 10k tasks, and so you request a bunch of 
executors.  After that, you only ever run 100 tasks at a time, so you could 
release a bunch of resources.  But in 60 seconds, its enough time for a bunch 
of short stages to execute.  Each stage chooses a random set of executors to 
run on.  So no executor is ever idle for 60 seconds.

Perhaps we could also fix that in some larger change which more tightly 
integrated dynamic allocation into the scheduler that Tom was alluding to.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18874: [SPARK-21656][CORE] spark dynamic allocation should not ...

2017-08-10 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/18874
  
This change makes sense to me.

Tom's last comment about resetting that timeout every time one task is 
scheduled I think explains how you get in this situation and why you don't 
actually want to change those confs.  A few months back I was looking at that 
and chatted with Kay about why that was the case -- I forget the details off 
the top of my head, but at least remember there was some reason why it wasn't 
trivial to change.  So I think this is the right immediate fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18739: [WIP][SPARK-21539][CORE] Job should not be aborte...

2017-07-27 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18739#discussion_r129849052
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -665,10 +667,15 @@ private[spark] class TaskSetManager(
 }
   }
   if (blacklistedEverywhere) {
-val partition = tasks(indexInTaskSet).partitionId
-abort(s"Aborting $taskSet because task $indexInTaskSet 
(partition $partition) " +
-  s"cannot run anywhere due to node and executor blacklist.  
Blacklisting behavior " +
-  s"can be configured via spark.blacklist.*.")
+val dynamicAllocationEnabled = 
conf.getBoolean("spark.dynamicAllocation.enabled", false)
+val mayAllocateNewExecutor =
+  conf.getInt("spark.executor.instances", -1) > 
currentExecutorNumber
+if (!dynamicAllocationEnabled && !mayAllocateNewExecutor) {
--- End diff --

the reason we do wait until the task set has finished is that before that, 
we have no idea whether the failure is the fault of the user-code (or bad input 
data etc.), or its actually a fault with the node / executor.  Our only piece 
of information on that is when the task that fails on one executor, and then 
succeeds elsewhere, then we assume that its the failure was the fault of the 
original executor (though this heuristic also has false-positives, from what 
I've seen so far it seems tolerable.)

I have also thought of having this wait some amount of time rather than 
killing the taskset immediately, to see if another executor comes up.   
However, there are some complications with that as well.  I think this is all 
captured in the discussion on SPARK-15815, that actually discusses one of the 
trickiest cases -- just one task remaining with Dynamic Allocation, and all 
other executors have been killed b/c they were idle.  Take a look at that jira. 
 If it summarizes things, then we can close SPARK-21539 as a duplicate and 
continue discussion on SPARK-15815.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18739: [WIP][SPARK-21539][CORE] Job should not be aborte...

2017-07-26 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18739#discussion_r129749281
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -644,10 +644,12 @@ private[spark] class TaskSetManager(
 }
 
 pendingTask.foreach { indexInTaskSet =>
+  var currentExecutorNumber = 0
--- End diff --

`numBlacklistedExecutors`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18739: [WIP][SPARK-21539][CORE] Job should not be aborte...

2017-07-26 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/18739#discussion_r129749722
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
@@ -665,10 +667,15 @@ private[spark] class TaskSetManager(
 }
   }
   if (blacklistedEverywhere) {
-val partition = tasks(indexInTaskSet).partitionId
-abort(s"Aborting $taskSet because task $indexInTaskSet 
(partition $partition) " +
-  s"cannot run anywhere due to node and executor blacklist.  
Blacklisting behavior " +
-  s"can be configured via spark.blacklist.*.")
+val dynamicAllocationEnabled = 
conf.getBoolean("spark.dynamicAllocation.enabled", false)
+val mayAllocateNewExecutor =
+  conf.getInt("spark.executor.instances", -1) > 
currentExecutorNumber
+if (!dynamicAllocationEnabled && !mayAllocateNewExecutor) {
--- End diff --

but even with dynamic allocation, you might still want this, right?  Are 
you hoping that with dynamic allocation, even if everything is blacklisted, 
eventually the executors will go idle, get torn down, and then new executors 
will get created since you still have tasks left?

On large clusters, this seems desirable.  There are weird cases with small 
clusters though ... suppose the cluster only has two nodes, and you end up 
blacklisting both nodes (with such a small cluster, that can happen just 
because tasks fail from poor user code).  Then with this change, you'll go back 
to having the job sit idle for a long time, just waiting for the blacklist to 
timeout.

I agree the current solution isn't great, but I don't know if this really 
improves things.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17902: [SPARK-20641][core] Add key-value store abstraction and ...

2017-06-06 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/17902
  
merged to master


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17902: [SPARK-20641][core] Add key-value store abstraction and ...

2017-06-05 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/17902
  
fair enough on jackson.

add the log for the seed please, other than that, lgtm


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17113: [SPARK-13669][SPARK-20898][Core] Improve the blacklist m...

2017-06-05 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/17113
  
I'm fine with this, just a couple asks:

1) is there a test for SPARK-20898?  I know at some point in the 
development of killing blacklisted executors we tested on yarn, I'm really 
disappointed (in myself) for merging it though it didn't actually work.  It 
would be nice to have a regression test.  Maybe that would be a huge pain to 
do, just wanted to see if you thought about it.

2) Can you update the jira description for SPARK-13669 now?  I think a lot 
has changed since that was initially opened.  Also IIUC, we don't have any 
particular advice on when users should enable this at the moment, its just an 
escape hatch you'd like to leave in place?  If there is any advice, putting it 
in the jira would also make sense.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-06-02 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r119904509
  
--- Diff: 
common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java ---
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public abstract class DBIteratorSuite {
+
+  private static final int MIN_ENTRIES = 42;
+  private static final int MAX_ENTRIES = 1024;
+  private static final Random RND = new Random();
+
+  private static List allEntries;
+  private static List clashingEntries;
+  private static KVStore db;
+
+  private static interface BaseComparator extends Comparator {
+/**
+ * Returns a comparator that falls back to natural order if this 
comparator's ordering
+ * returns equality for two elements. Used to mimic how the index 
sorts things internally.
+ */
+default BaseComparator fallback() {
+  return (t1, t2) -> {
+int result = BaseComparator.this.compare(t1, t2);
+if (result != 0) {
+  return result;
+}
+
+return t1.key.compareTo(t2.key);
+  };
+}
+
+/** Reverses the order of this comparator. */
+default BaseComparator reverse() {
+  return (t1, t2) -> -BaseComparator.this.compare(t1, t2);
+}
+  }
+
+  private static final BaseComparator NATURAL_ORDER = (t1, t2) -> 
t1.key.compareTo(t2.key);
+  private static final BaseComparator REF_INDEX_ORDER = (t1, t2) -> 
t1.id.compareTo(t2.id);
+  private static final BaseComparator COPY_INDEX_ORDER = (t1, t2) -> 
t1.name.compareTo(t2.name);
+  private static final BaseComparator NUMERIC_INDEX_ORDER = (t1, t2) -> 
t1.num - t2.num;
+  private static final BaseComparator CHILD_INDEX_ORDER = (t1, t2) -> 
t1.child.compareTo(t2.child);
+
+  /**
+   * Implementations should override this method; it is called only once, 
before all tests are
+   * run. Any state can be safely stored in static variables and cleaned 
up in a @AfterClass
+   * handler.
+   */
+  protected abstract KVStore createStore() throws Exception;
+
+  @AfterClass
+  public static void cleanupData() throws Exception {
+allEntries = null;
+db = null;
+  }
+
+  @Before
+  public void setup() throws Exception {
+if (db != null) {
--- End diff --

for later debugging, it would be helpful to log the random generator seed 
here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-06-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r119652958
  
--- Diff: 
common/kvstore/src/test/java/org/apache/spark/kvstore/DBIteratorSuite.java ---
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.Arrays;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public abstract class DBIteratorSuite {
+
+  private static final int MIN_ENTRIES = 42;
+  private static final int MAX_ENTRIES = 1024;
+  private static final Random RND = new Random();
+
+  private static List allEntries;
+  private static List clashingEntries;
+  private static KVStore db;
+
+  private static interface BaseComparator extends Comparator {
+/**
+ * Returns a comparator that falls back to natural order if this 
comparator's ordering
+ * returns equality for two elements. Used to mimic how the index 
sorts things internally.
+ */
+default BaseComparator fallback() {
+  return (t1, t2) -> {
+int result = BaseComparator.this.compare(t1, t2);
+if (result != 0) {
+  return result;
+}
+
+return t1.key.compareTo(t2.key);
+  };
+}
+
+/** Reverses the order of this comparator. */
+default BaseComparator reverse() {
+  return (t1, t2) -> -BaseComparator.this.compare(t1, t2);
+}
+  }
+
+  private static final BaseComparator NATURAL_ORDER = (t1, t2) -> 
t1.key.compareTo(t2.key);
+  private static final BaseComparator REF_INDEX_ORDER = (t1, t2) -> 
t1.id.compareTo(t2.id);
+  private static final BaseComparator COPY_INDEX_ORDER = (t1, t2) -> 
t1.name.compareTo(t2.name);
+  private static final BaseComparator NUMERIC_INDEX_ORDER = (t1, t2) -> 
t1.num - t2.num;
+  private static final BaseComparator CHILD_INDEX_ORDER = (t1, t2) -> 
t1.child.compareTo(t2.child);
+
+  /**
+   * Implementations should override this method; it is called only once, 
before all tests are
+   * run. Any state can be safely stored in static variables and cleaned 
up in a @AfterClass
+   * handler.
+   */
+  protected abstract KVStore createStore() throws Exception;
+
+  @AfterClass
+  public static void cleanupData() throws Exception {
+allEntries = null;
+db = null;
+  }
+
+  @Before
+  public void setup() throws Exception {
+if (db != null) {
+  return;
+}
+
+db = createStore();
+
+int count = RND.nextInt(MAX_ENTRIES) + MIN_ENTRIES;
+
+// Instead of generating sequential IDs, generate random unique IDs to 
avoid the insertion
+// order matching the natural ordering. Just in case.
+boolean[] usedIDs = new boolean[count];
+
+allEntries = new ArrayList<>(count);
+for (int i = 0; i < count; i++) {
+  CustomType1 t = new CustomType1();
+
+  int id;
+  do {
+id = RND.nextInt(count);
+  } while (usedIDs[id]);
--- End diff --

I know this doesn't really matter, but this is O(n^2), listing the ids and 
then using 
https://docs.oracle.com/javase/8/docs/api/java/util/Collections.html#shuffle-java.util.List-java.util.Random-
 would be O(n)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not h

[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-06-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r119657514
  
--- Diff: 
common/kvstore/src/test/java/org/apache/spark/kvstore/LevelDBTypeInfoSuite.java 
---
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class LevelDBTypeInfoSuite {
+
+  @Test
+  public void testIndexAnnotation() throws Exception {
+KVTypeInfo ti = new KVTypeInfo(CustomType1.class);
+assertEquals(5, ti.indices().count());
+
+CustomType1 t1 = new CustomType1();
+t1.key = "key";
+t1.id = "id";
+t1.name = "name";
+t1.num = 42;
+t1.child = "child";
+
+assertEquals(t1.key, ti.getIndexValue(KVIndex.NATURAL_INDEX_NAME, t1));
+assertEquals(t1.id, ti.getIndexValue("id", t1));
+assertEquals(t1.name, ti.getIndexValue("name", t1));
+assertEquals(t1.num, ti.getIndexValue("int", t1));
+assertEquals(t1.child, ti.getIndexValue("child", t1));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testNoNaturalIndex() throws Exception {
+newTypeInfo(NoNaturalIndex.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testDuplicateIndex() throws Exception {
+newTypeInfo(DuplicateIndex.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testEmptyIndexName() throws Exception {
+newTypeInfo(EmptyIndexName.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testIllegalIndexName() throws Exception {
+newTypeInfo(IllegalIndexName.class);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testIllegalIndexMethod() throws Exception {
+newTypeInfo(IllegalIndexMethod.class);
+  }
+
+  @Test
+  public void testKeyClashes() throws Exception {
+LevelDBTypeInfo ti = newTypeInfo(CustomType1.class);
+
+CustomType1 t1 = new CustomType1();
+t1.key = "key1";
+t1.name = "a";
+
+CustomType1 t2 = new CustomType1();
+t2.key = "key2";
+t2.name = "aa";
+
+CustomType1 t3 = new CustomType1();
+t3.key = "key3";
+t3.name = "aaa";
+
+// Make sure entries with conflicting names are sorted correctly.
+assertBefore(ti.index("name").entityKey(null, t1), 
ti.index("name").entityKey(null, t2));
+assertBefore(ti.index("name").entityKey(null, t1), 
ti.index("name").entityKey(null, t3));
+assertBefore(ti.index("name").entityKey(null, t2), 
ti.index("name").entityKey(null, t3));
+  }
+
+  @Test
+  public void testNumEncoding() throws Exception {
+LevelDBTypeInfo.Index idx = 
newTypeInfo(CustomType1.class).indices().iterator().next();
+
+assertEquals("+=0001", new String(idx.toKey(1), UTF_8));
+assertEquals("+=0010", new String(idx.toKey(16), UTF_8));
+assertEquals("+=7fff", new String(idx.toKey(Integer.MAX_VALUE), 
UTF_8));
+
+assertBefore(idx.toKey(1), idx.toKey(2));
+assertBefore(idx.toKey(-1), idx.toKey(2));
+assertBefore(idx.toKey(-11), idx.toKey(2));
+assertBefore(idx.toKey(-11), idx.toKey(-1));
+assertBefore(idx.toKey(1), idx.toKey(11));
+assertBefore(idx.toKey(Integer.MIN_VALUE), 
idx.toKey(Integer.MAX_VALUE));
+
+assertBefore(idx.toKey(1L), idx.toKey(2L));
+assertBefore(idx.toKey(-1L), idx.toKey(2L));
+assertBefore(idx.toKey(Long.MIN_VALUE), idx.toKey(Long.MAX_VALUE));
+
+  

[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-06-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r119651685
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java ---
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Holds metadata about app-specific types stored in LevelDB. Serves as a 
cache for data collected
+ * via reflection, to make it cheaper to access it multiple times.
+ *
+ * 
+ * The hierarchy of keys stored in LevelDB looks roughly like the 
following. This hierarchy ensures
+ * that iteration over indices is easy, and that updating values in the 
store is not overly
+ * expensive. Of note, indices choose using more disk space (one value per 
key) instead of keeping
+ * lists of pointers, which would be more expensive to update at runtime.
+ * 
+ *
+ * 
+ * Indentation defines when a sub-key lives under a parent key. In 
LevelDB, this means the full
+ * key would be the concatenation of everything up to that point in the 
hierarchy, with each
+ * component separated by a NULL byte.
+ * 
+ *
+ * 
+ * +TYPE_NAME
+ *   NATURAL_INDEX
+ * +NATURAL_KEY
+ * -
+ *   -NATURAL_INDEX
+ *   INDEX_NAME
+ * +INDEX_VALUE
+ *   +NATURAL_KEY
+ * -INDEX_VALUE
+ * .INDEX_VALUE
+ *   CHILD_INDEX_NAME
+ * +CHILD_INDEX_VALUE
+ *   NATURAL_KEY_OR_DATA
+ * -
+ *   -INDEX_NAME
+ * 
+ *
+ * 
+ * Entity data (either the entity's natural key or a copy of the data) is 
stored in all keys
+ * that end with "+". A count of all objects that match a 
particular top-level index
+ * value is kept at the end marker ("-"). A count is also kept 
at the natural index's end
+ * marker, to make it easy to retrieve the number of all elements of a 
particular type.
+ * 
+ *
+ * 
+ * To illustrate, given a type "Foo", with a natural index and a second 
index called "bar", you'd
+ * have these keys and values in the store for two instances, one with 
natural key "key1" and the
+ * other "key2", both with value "yes" for "bar":
+ * 
+ *
+ * 
+ * Foo __main__ +key1   [data for instance 1]
+ * Foo __main__ +key2   [data for instance 2]
+ * Foo __main__ -   [count of all Foo]
+ * Foo bar +yes +key1   [instance 1 key or data, depending on index type]
+ * Foo bar +yes +key2   [instance 2 key or data, depending on index type]
+ * Foo bar +yes -   [count of all Foo with "bar=yes" ]
+ * 
+ *
+ * 
+ * Note that all indexed values are prepended with "+", even if the index 
itself does not have an
+ * explicit end marker. This allows for easily skipping to the end of an 
index by telling LevelDB
+ * to seek to the "phantom" end marker of the index.
+ * 
+ *
+ * 
+ * Child indices are stored after their parent index. In the example 
above, let's assume there is
+ * a child index "child", whose parent is "bar". If both instances have 
value "no" for this field,
+ * the data in the store would look something like the following:
+ * 
+ *
+ * 
+ * ...
+ * Foo bar +yes -
+ * Foo bar .yes .child +no +key1   [instance 1 key or data, depending on 
index type]
+ * Foo bar .yes .ch

[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-06-01 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r119649224
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDBTypeInfo.java ---
@@ -0,0 +1,502 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Holds metadata about app-specific types stored in LevelDB. Serves as a 
cache for data collected
+ * via reflection, to make it cheaper to access it multiple times.
+ *
+ * 
+ * The hierarchy of keys stored in LevelDB looks roughly like the 
following. This hierarchy ensures
+ * that iteration over indices is easy, and that updating values in the 
store is not overly
+ * expensive. Of note, indices choose using more disk space (one value per 
key) instead of keeping
+ * lists of pointers, which would be more expensive to update at runtime.
+ * 
+ *
+ * 
+ * Indentation defines when a sub-key lives under a parent key. In 
LevelDB, this means the full
+ * key would be the concatenation of everything up to that point in the 
hierarchy, with each
+ * component separated by a NULL byte.
+ * 
+ *
+ * 
+ * +TYPE_NAME
+ *   NATURAL_INDEX
+ * +NATURAL_KEY
+ * -
+ *   -NATURAL_INDEX
+ *   INDEX_NAME
+ * +INDEX_VALUE
+ *   +NATURAL_KEY
+ * -INDEX_VALUE
+ * .INDEX_VALUE
+ *   CHILD_INDEX_NAME
+ * +CHILD_INDEX_VALUE
+ *   NATURAL_KEY_OR_DATA
+ * -
+ *   -INDEX_NAME
+ * 
+ *
+ * 
+ * Entity data (either the entity's natural key or a copy of the data) is 
stored in all keys
+ * that end with "+". A count of all objects that match a 
particular top-level index
+ * value is kept at the end marker ("-"). A count is also kept 
at the natural index's end
+ * marker, to make it easy to retrieve the number of all elements of a 
particular type.
+ * 
+ *
+ * 
+ * To illustrate, given a type "Foo", with a natural index and a second 
index called "bar", you'd
+ * have these keys and values in the store for two instances, one with 
natural key "key1" and the
+ * other "key2", both with value "yes" for "bar":
+ * 
+ *
+ * 
+ * Foo __main__ +key1   [data for instance 1]
+ * Foo __main__ +key2   [data for instance 2]
--- End diff --

one thing I had some trouble keeping straight as I read through this was 
the difference between an index "key" and an index "value".  Normally i think 
of "value" as what you are calling "data" here.  It seems like you are calling 
the index value just final component "+key1", while the key refers to the 
entire thing "Foo __main__ +key1".  

Is that right?

It might also help adding comments on `entityKey()` and `getValue()` as well


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-05-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r119408662
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A configurable view that allows iterating over values in a {@link 
KVStore}.
+ *
+ * 
+ * The different methods can be used to configure the behavior of the 
iterator. Calling the same
+ * method multiple times is allowed; the most recent value will be used.
+ * 
+ *
+ * 
+ * The iterators returns by this view are of type {@link KVStoreIterator}; 
they auto-close
+ * when used in a for loop that exhausts their contents, but when used 
manually, they need
+ * to be closed explicitly unless all elements are read.
+ * 
+ */
+public abstract class KVStoreView implements Iterable {
+
+  final Class type;
+
+  boolean ascending = true;
+  String index = KVIndex.NATURAL_INDEX_NAME;
+  Object first = null;
+  Object last = null;
+  Object parent = null;
+  long skip = 0L;
+  long max = Long.MAX_VALUE;
+
+  public KVStoreView(Class type) {
+this.type = type;
+  }
+
+  /**
+   * Reverses the order of iteration. By default, iterates in ascending 
order.
+   */
+  public KVStoreView reverse() {
+ascending = !ascending;
+return this;
+  }
+
+  /**
+   * Iterates according to the given index.
+   */
+  public KVStoreView index(String name) {
+this.index = Preconditions.checkNotNull(name);
+return this;
+  }
+
+  /**
+   * Defines the value of the parent index when iterating over a child 
index. Only elements that
+   * match the parent index's value will be included in the iteration.
+   *
+   * 
+   * Required for iterating over child indices, will generate an error if 
iterating over a
+   * parent-less index.
+   * 
+   */
+  public KVStoreView parent(Object value) {
+this.parent = value;
+return this;
+  }
+
+  /**
+   * Iterates starting at the given value of the chosen index.
+   */
+  public KVStoreView first(Object value) {
+this.first = value;
+return this;
+  }
+
+  /**
+   * Stops iteration at the given value of the chosen index.
--- End diff --

would be nice to clarify whether the matching element is included or not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-05-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r119409655
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/KVTypeInfo.java ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Wrapper around types managed in a KVStore, providing easy access to 
their indexed fields.
+ */
+public class KVTypeInfo {
+
+  private final Class type;
+  private final Map indices;
+  private final Map accessors;
+
+  public KVTypeInfo(Class type) throws Exception {
+this.type = type;
+this.accessors = new HashMap<>();
+this.indices = new HashMap<>();
+
+for (Field f : type.getFields()) {
+  KVIndex idx = f.getAnnotation(KVIndex.class);
+  if (idx != null) {
+checkIndex(idx, indices);
+indices.put(idx.value(), idx);
+accessors.put(idx.value(), new FieldAccessor(f));
+  }
+}
+
+for (Method m : type.getMethods()) {
+  KVIndex idx = m.getAnnotation(KVIndex.class);
+  if (idx != null) {
+checkIndex(idx, indices);
+Preconditions.checkArgument(m.getParameterTypes().length == 0,
+  "Annotated method %s::%s should not have any parameters.", 
type.getName(), m.getName());
+indices.put(idx.value(), idx);
+accessors.put(idx.value(), new MethodAccessor(m));
+  }
+}
+
+
Preconditions.checkArgument(indices.containsKey(KVIndex.NATURAL_INDEX_NAME),
+"No natural index defined for type %s.", type.getName());
+
Preconditions.checkArgument(indices.get(KVIndex.NATURAL_INDEX_NAME).parent().isEmpty(),
+"Natural index of %s cannot have a parent.", type.getName());
+
+for (KVIndex idx : indices.values()) {
+  if (!idx.parent().isEmpty()) {
+KVIndex parent = indices.get(idx.parent());
+Preconditions.checkArgument(parent != null,
+  "Cannot find parent %s of index %s.", idx.parent(), idx.value());
+Preconditions.checkArgument(parent.parent().isEmpty(),
+  "Parent index %s of index %s cannot be itself a child index.", 
idx.parent(), idx.value());
+  }
+}
+  }
+
+  private void checkIndex(KVIndex idx, Map indices) {
+Preconditions.checkArgument(idx.value() != null && 
!idx.value().isEmpty(),
+  "No name provided for index in type %s.", type.getName());
+Preconditions.checkArgument(
+  !idx.value().startsWith("_") || 
idx.value().equals(KVIndex.NATURAL_INDEX_NAME),
+  "Index name %s (in type %s) is not allowed.", idx.value(), 
type.getName());
+Preconditions.checkArgument(idx.parent().isEmpty() || 
!idx.parent().equals(idx.value()),
+  "Index %s cannot be parent of itself.", idx.value());
+Preconditions.checkArgument(!indices.containsKey(idx.value()),
+  "Duplicate index %s for type %s.", idx.value(), type.getName());
+  }
+
+  public Class getType() {
+return type;
+  }
+
+  public Object getIndexValue(String indexName, Object instance) throws 
Exception {
+return getAccessor(indexName).get(instance);
+  }
+
+  public Stream indices() {
+return indices.values().stream();
+  }
+
+  Accessor getAccessor(String indexName) {
+Accessor a = accessors.get(indexName);
+Preconditions.checkArgument(a != null, "No index %s.", indexName);
+return a;
+  }
+
+  Accesso

[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-05-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r119411351
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/KVStore.java ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Abstraction for a local key/value store for storing app data.
+ *
+ * 
+ * There are two main features provided by the implementations of this 
interface:
+ * 
+ *
+ * Serialization
+ *
+ * 
+ * Data will be serialized to and deserialized from the underlying data 
store using a
+ * {@link KVStoreSerializer}, which can be customized by the application. 
The serializer is
+ * based on Jackson, so it supports all the Jackson annotations for 
controlling the serialization
+ * of app-defined types.
+ * 
+ *
+ * 
+ * Data is also automatically compressed to save disk space.
+ * 
+ *
+ * Automatic Key Management
+ *
+ * 
+ * When using the built-in key management, the implementation will 
automatically create unique
+ * keys for each type written to the store. Keys are based on the type 
name, and always start
+ * with the "+" prefix character (so that it's easy to use both manual and 
automatic key
+ * management APIs without conflicts).
+ * 
+ *
+ * 
+ * Another feature of automatic key management is indexing; by annotating 
fields or methods of
+ * objects written to the store with {@link KVIndex}, indices are created 
to sort the data
+ * by the values of those properties. This makes it possible to provide 
sorting without having
+ * to load all instances of those types from the store.
+ * 
+ *
+ * 
+ * KVStore instances are thread-safe for both reads and writes.
+ * 
+ */
+public interface KVStore extends Closeable {
+
+  /**
+   * Returns app-specific metadata from the store, or null if it's not 
currently set.
+   *
+   * 
+   * The metadata type is application-specific. This is a convenience 
method so that applications
+   * don't need to define their own keys for this information.
+   * 
+   */
+   T getMetadata(Class klass) throws Exception;
+
+  /**
+   * Writes the given value in the store metadata key.
+   */
+  void setMetadata(Object value) throws Exception;
+
+  /**
+   * Read a specific instance of an object.
+   */
+   T read(Class klass, Object naturalKey) throws Exception;
--- End diff --

add that key cannot be null, and if the key does not exist, you throw a 
NoSuchElementException (otherwise I might think you'd return null)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-05-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r119420304
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Implementation of KVStore that uses LevelDB as the underlying data 
store.
+ */
+public class LevelDB implements KVStore {
+
+  @VisibleForTesting
+  static final long STORE_VERSION = 1L;
+
+  @VisibleForTesting
+  static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8);
+
+  /** DB key where app metadata is stored. */
+  private static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8);
+
+  /** DB key where type aliases are stored. */
+  private static final byte[] TYPE_ALIASES_KEY = 
"__types__".getBytes(UTF_8);
+
+  final AtomicReference _db;
+  final KVStoreSerializer serializer;
+
+  private final ConcurrentMap typeAliases;
--- End diff --

this `typeAliases` thing is pretty confusing.  IIUC, the idea is to replace 
a long fully qualified type name with a shorter numeric id, and this holds the 
mapping?  I'd include a comment about it.  maybe even rename `typetoID`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-05-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r119407982
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/KVStoreView.java ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A configurable view that allows iterating over values in a {@link 
KVStore}.
+ *
+ * 
+ * The different methods can be used to configure the behavior of the 
iterator. Calling the same
+ * method multiple times is allowed; the most recent value will be used.
+ * 
+ *
+ * 
+ * The iterators returns by this view are of type {@link KVStoreIterator}; 
they auto-close
--- End diff --

typo: returned


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17902: [SPARK-20641][core] Add key-value store abstracti...

2017-05-31 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/17902#discussion_r119412048
  
--- Diff: 
common/kvstore/src/main/java/org/apache/spark/kvstore/LevelDB.java ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kvstore;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.WriteBatch;
+
+/**
+ * Implementation of KVStore that uses LevelDB as the underlying data 
store.
+ */
+public class LevelDB implements KVStore {
+
+  @VisibleForTesting
+  static final long STORE_VERSION = 1L;
+
+  @VisibleForTesting
+  static final byte[] STORE_VERSION_KEY = "__version__".getBytes(UTF_8);
+
+  /** DB key where app metadata is stored. */
+  private static final byte[] METADATA_KEY = "__meta__".getBytes(UTF_8);
+
+  /** DB key where type aliases are stored. */
+  private static final byte[] TYPE_ALIASES_KEY = 
"__types__".getBytes(UTF_8);
+
+  final AtomicReference _db;
+  final KVStoreSerializer serializer;
+
+  private final ConcurrentMap typeAliases;
+  private final ConcurrentMap, LevelDBTypeInfo> types;
+
+  public LevelDB(File path) throws Exception {
+this(path, new KVStoreSerializer());
+  }
+
+  public LevelDB(File path, KVStoreSerializer serializer) throws Exception 
{
+this.serializer = serializer;
+this.types = new ConcurrentHashMap<>();
+
+Options options = new Options();
+options.createIfMissing(!path.exists());
+this._db = new AtomicReference<>(JniDBFactory.factory.open(path, 
options));
+
+byte[] versionData = db().get(STORE_VERSION_KEY);
+if (versionData != null) {
+  long version = serializer.deserializeLong(versionData);
+  if (version != STORE_VERSION) {
+throw new UnsupportedStoreVersionException();
+  }
+} else {
+  db().put(STORE_VERSION_KEY, serializer.serialize(STORE_VERSION));
+}
+
+Map aliases;
+try {
+  aliases = get(TYPE_ALIASES_KEY, TypeAliases.class).aliases;
+} catch (NoSuchElementException e) {
+  aliases = new HashMap<>();
+}
+typeAliases = new ConcurrentHashMap<>(aliases);
+  }
+
+  @Override
+  public  T getMetadata(Class klass) throws Exception {
+try {
+  return get(METADATA_KEY, klass);
+} catch (NoSuchElementException nsee) {
+  return null;
+}
+  }
+
+  @Override
+  public void setMetadata(Object value) throws Exception {
+if (value != null) {
+  put(METADATA_KEY, value);
+} else {
+  db().delete(METADATA_KEY);
+}
+  }
+
+   T get(byte[] key, Class klass) throws Exception {
+byte[] data = db().get(key);
+if (data == null) {
+  throw new NoSuchElementException(new String(key, UTF_8));
+}
+return serializer.deserialize(data, klass);
+  }
+
+  private void put(byte[] key, Object value) throws Exception {
+Preconditions.checkArgument(value != null, "Null values are not 
allowed.");
+db().put(key, serializer.serialize(value));
+  }
+

<    7   8   9   10   11   12   13   14   15   16   >