[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-151264212
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-151264217
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44369/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-26 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-151263991
  
**[Test build #44369 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44369/consoleFull)**
 for PR 8744 at commit 
[`bb01453`](https://github.com/apache/spark/commit/bb014535c4987f73398f48973efe380f94ab4ece).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-151220033
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-26 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-151220009
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-26 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-151220647
  
**[Test build #44369 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44369/consoleFull)**
 for PR 8744 at commit 
[`bb01453`](https://github.com/apache/spark/commit/bb014535c4987f73398f48973efe380f94ab4ece).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-22 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r42784907
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnTimelineUtils.scala
 ---
@@ -0,0 +1,720 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.history.yarn
+
+import java.io.IOException
+import java.net.{URI, URL}
+import java.text.DateFormat
+import java.util.concurrent.atomic.AtomicLong
+import java.util.{ArrayList => JArrayList, Collection => JCollection, 
Date, HashMap => JHashMap, Map => JMap}
+import java.{lang, util}
+
+import scala.collection.JavaConversions._
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.service.Service
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import 
org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineEntity, 
TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.json4s.JsonAST.JObject
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+import org.apache.spark.scheduler.{SparkListenerStageCompleted, 
SparkListenerStageSubmitted, SparkListenerExecutorAdded, 
SparkListenerExecutorRemoved, SparkListenerJobEnd, SparkListenerJobStart, 
SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerEvent}
+import org.apache.spark.util.{JsonProtocol, Utils}
+
+/**
+ * Utility methods for timeline classes.
+ */
+private[spark] object YarnTimelineUtils extends Logging {
+
+  /**
+   * What attempt ID to use as the attempt ID field (not the entity ID) 
when
+   * there is no attempt info
+   */
+  val SINGLE_ATTEMPT = "1"
+
+
+  /**
+   * Exception text when there is no event info data to unmarshall
+   */
+  val E_NO_EVENTINFO = "No 'eventinfo' entry"
+
+  /**
+   * Exception text when there is event info entry in the timeline event, 
but it is empty
+   */
+
+  val E_EMPTY_EVENTINFO = "Empty 'eventinfo' entry"
+
+  /**
+   * counter incremented on every spark event to timeline event creation,
+   * so guaranteeing uniqueness of event IDs across a single application 
attempt
+   * (which is implicitly, one per JVM)
+   */
+  val uid = new AtomicLong(System.currentTimeMillis())
+
+  /**
+   * Converts a Java object to its equivalent json4s representation.
+   */
+  def toJValue(obj: Object): JValue = {
+obj match {
+  case str: String => JString(str)
+  case dbl: java.lang.Double => JDouble(dbl)
+  case dec: java.math.BigDecimal => JDecimal(dec)
+  case int: java.lang.Integer => JInt(BigInt(int))
+  case long: java.lang.Long => JInt(BigInt(long))
+  case bool: java.lang.Boolean => JBool(bool)
+  case map: JMap[_, _] =>
+val jmap = map.asInstanceOf[JMap[String, Object]]
+JObject(jmap.entrySet().map { e => (e.getKey() -> 
toJValue(e.getValue())) }.toList)
+  case array: JCollection[_] =>
+JArray(array.asInstanceOf[JCollection[Object]].map(o => 
toJValue(o)).toList)
+  case null => JNothing
+}
+  }
+
+  /**
+   * Converts a JValue into its Java equivalent.
+   */
+  def toJavaObject(v: JValue): Object = {
+v match {
+  case JNothing => null
+  case JNull => null
+  case JString(s) => s
+  case JDouble(num) => java.lang.Double.valueOf(num)
+  case JDecimal(num) => num.bigDecimal
+  case JInt(num) => java.lang.Long.valueOf(num.longValue)
+  case JBool(value) => 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-21 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r42667047
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /**
+   * Start time of the application, as received in the end event.
+   */
+  private var endTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var _batchSize = DEFAULT_BATCH_SIZE
+
+  /** queue of entities to asynchronously post, plus the number of events 
in each entry */
+  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
+
+  /** limit on the total number of events permitted */
+  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
+
+  /**
+   * List of events which will be pulled into a timeline
+   * entity when created
+   */
+  private var 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-21 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r42668737
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnTimelineUtils.scala
 ---
@@ -0,0 +1,720 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.history.yarn
+
+import java.io.IOException
+import java.net.{URI, URL}
+import java.text.DateFormat
+import java.util.concurrent.atomic.AtomicLong
+import java.util.{ArrayList => JArrayList, Collection => JCollection, 
Date, HashMap => JHashMap, Map => JMap}
+import java.{lang, util}
+
+import scala.collection.JavaConversions._
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.service.Service
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import 
org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineEntity, 
TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.json4s.JsonAST.JObject
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+import org.apache.spark.scheduler.{SparkListenerStageCompleted, 
SparkListenerStageSubmitted, SparkListenerExecutorAdded, 
SparkListenerExecutorRemoved, SparkListenerJobEnd, SparkListenerJobStart, 
SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerEvent}
+import org.apache.spark.util.{JsonProtocol, Utils}
+
+/**
+ * Utility methods for timeline classes.
+ */
+private[spark] object YarnTimelineUtils extends Logging {
+
+  /**
+   * What attempt ID to use as the attempt ID field (not the entity ID) 
when
+   * there is no attempt info
+   */
+  val SINGLE_ATTEMPT = "1"
+
+
+  /**
+   * Exception text when there is no event info data to unmarshall
+   */
+  val E_NO_EVENTINFO = "No 'eventinfo' entry"
+
+  /**
+   * Exception text when there is event info entry in the timeline event, 
but it is empty
+   */
+
+  val E_EMPTY_EVENTINFO = "Empty 'eventinfo' entry"
+
+  /**
+   * counter incremented on every spark event to timeline event creation,
+   * so guaranteeing uniqueness of event IDs across a single application 
attempt
+   * (which is implicitly, one per JVM)
+   */
+  val uid = new AtomicLong(System.currentTimeMillis())
+
+  /**
+   * Converts a Java object to its equivalent json4s representation.
+   */
+  def toJValue(obj: Object): JValue = {
+obj match {
+  case str: String => JString(str)
+  case dbl: java.lang.Double => JDouble(dbl)
+  case dec: java.math.BigDecimal => JDecimal(dec)
+  case int: java.lang.Integer => JInt(BigInt(int))
+  case long: java.lang.Long => JInt(BigInt(long))
+  case bool: java.lang.Boolean => JBool(bool)
+  case map: JMap[_, _] =>
+val jmap = map.asInstanceOf[JMap[String, Object]]
+JObject(jmap.entrySet().map { e => (e.getKey() -> 
toJValue(e.getValue())) }.toList)
+  case array: JCollection[_] =>
+JArray(array.asInstanceOf[JCollection[Object]].map(o => 
toJValue(o)).toList)
+  case null => JNothing
+}
+  }
+
+  /**
+   * Converts a JValue into its Java equivalent.
+   */
+  def toJavaObject(v: JValue): Object = {
+v match {
+  case JNothing => null
+  case JNull => null
+  case JString(s) => s
+  case JDouble(num) => java.lang.Double.valueOf(num)
+  case JDecimal(num) => num.bigDecimal
+  case JInt(num) => java.lang.Long.valueOf(num.longValue)
+  case JBool(value) => 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-21 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r42669119
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnTimelineUtils.scala
 ---
@@ -0,0 +1,765 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.io.IOException
+import java.net.{InetSocketAddress, NoRouteToHostException, URI, URL}
+import java.text.DateFormat
+import java.util.concurrent.atomic.AtomicLong
+import java.util.{ArrayList => JArrayList, Collection => JCollection, 
Date, HashMap => JHashMap, Map => JMap}
+import java.{lang, util}
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import 
org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineEntity, 
TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.json4s.JsonAST.JObject
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+import org.apache.spark.scheduler.{SparkListenerApplicationEnd, 
SparkListenerApplicationStart, SparkListenerEvent, SparkListenerExecutorAdded, 
SparkListenerExecutorRemoved, SparkListenerJobEnd, SparkListenerJobStart, 
SparkListenerStageCompleted, SparkListenerStageSubmitted}
+import org.apache.spark.util.{JsonProtocol, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * Utility methods for timeline classes.
+ */
+private[spark] object YarnTimelineUtils extends Logging {
+
+  /**
+   * What attempt ID to use as the attempt ID field (not the entity ID) 
when
+   * there is no attempt info.
+   */
+  val SINGLE_ATTEMPT = "1"
+
+  /**
+   * Exception text when there is no event info data to unmarshall.
+   */
+  val E_NO_EVENTINFO = "No 'eventinfo' entry"
+
+  /**
+   * Exception text when there is event info entry in the timeline event, 
but it is empty.
+   */
+
+  val E_EMPTY_EVENTINFO = "Empty 'eventinfo' entry"
+
+  /**
+   * counter incremented on every spark event to timeline event creation,
+   * so guaranteeing uniqueness of event IDs across a single application 
attempt
+   * (which is implicitly, one per JVM).
+   */
+  val uid = new AtomicLong(System.currentTimeMillis())
+
+  /**
+   * Converts a Java object to its equivalent json4s representation.
+   */
+  def toJValue(obj: Object): JValue = {
+obj match {
+  case str: String => JString(str)
+  case dbl: java.lang.Double => JDouble(dbl)
+  case dec: java.math.BigDecimal => JDecimal(dec)
+  case int: java.lang.Integer => JInt(BigInt(int))
+  case long: java.lang.Long => JInt(BigInt(long))
+  case bool: java.lang.Boolean => JBool(bool)
+  case map: JMap[_, _] =>
+val jmap = map.asInstanceOf[JMap[String, Object]]
+JObject(jmap.entrySet().asScala.map { e => e.getKey -> 
toJValue(e.getValue) }.toList)
+  case array: JCollection[_] =>
+JArray(array.asInstanceOf[JCollection[Object]].asScala.map(o => 
toJValue(o)).toList)
+  case null => JNothing
+}
+  }
+
+  /**
+   * Converts a JValue into its Java equivalent.
+   */
+  def toJavaObject(v: JValue): Object = {
+v match {
+  case JNothing => null
+  case JNull => null
+  case JString(s) => s
+  case JDouble(num) => java.lang.Double.valueOf(num)
+  case JDecimal(num) => num.bigDecimal
+  case JInt(num) => java.lang.Long.valueOf(num.longValue())
+  case JBool(value) => 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-21 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r42674559
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnTimelineUtils.scala
 ---
@@ -0,0 +1,720 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.history.yarn
+
+import java.io.IOException
+import java.net.{URI, URL}
+import java.text.DateFormat
+import java.util.concurrent.atomic.AtomicLong
+import java.util.{ArrayList => JArrayList, Collection => JCollection, 
Date, HashMap => JHashMap, Map => JMap}
+import java.{lang, util}
+
+import scala.collection.JavaConversions._
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.service.Service
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import 
org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineEntity, 
TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.json4s.JsonAST.JObject
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+import org.apache.spark.scheduler.{SparkListenerStageCompleted, 
SparkListenerStageSubmitted, SparkListenerExecutorAdded, 
SparkListenerExecutorRemoved, SparkListenerJobEnd, SparkListenerJobStart, 
SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerEvent}
+import org.apache.spark.util.{JsonProtocol, Utils}
+
+/**
+ * Utility methods for timeline classes.
+ */
+private[spark] object YarnTimelineUtils extends Logging {
+
+  /**
+   * What attempt ID to use as the attempt ID field (not the entity ID) 
when
+   * there is no attempt info
+   */
+  val SINGLE_ATTEMPT = "1"
+
+
+  /**
+   * Exception text when there is no event info data to unmarshall
+   */
+  val E_NO_EVENTINFO = "No 'eventinfo' entry"
+
+  /**
+   * Exception text when there is event info entry in the timeline event, 
but it is empty
+   */
+
+  val E_EMPTY_EVENTINFO = "Empty 'eventinfo' entry"
+
+  /**
+   * counter incremented on every spark event to timeline event creation,
+   * so guaranteeing uniqueness of event IDs across a single application 
attempt
+   * (which is implicitly, one per JVM)
+   */
+  val uid = new AtomicLong(System.currentTimeMillis())
+
+  /**
+   * Converts a Java object to its equivalent json4s representation.
+   */
+  def toJValue(obj: Object): JValue = {
+obj match {
+  case str: String => JString(str)
+  case dbl: java.lang.Double => JDouble(dbl)
+  case dec: java.math.BigDecimal => JDecimal(dec)
+  case int: java.lang.Integer => JInt(BigInt(int))
+  case long: java.lang.Long => JInt(BigInt(long))
+  case bool: java.lang.Boolean => JBool(bool)
+  case map: JMap[_, _] =>
+val jmap = map.asInstanceOf[JMap[String, Object]]
+JObject(jmap.entrySet().map { e => (e.getKey() -> 
toJValue(e.getValue())) }.toList)
+  case array: JCollection[_] =>
+JArray(array.asInstanceOf[JCollection[Object]].map(o => 
toJValue(o)).toList)
+  case null => JNothing
+}
+  }
+
+  /**
+   * Converts a JValue into its Java equivalent.
+   */
+  def toJavaObject(v: JValue): Object = {
+v match {
+  case JNothing => null
+  case JNull => null
+  case JString(s) => s
+  case JDouble(num) => java.lang.Double.valueOf(num)
+  case JDecimal(num) => num.bigDecimal
+  case JInt(num) => java.lang.Long.valueOf(num.longValue)
+  case JBool(value) => 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-149643277
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43986/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r42537310
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /**
+   * Start time of the application, as received in the end event.
+   */
+  private var endTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var _batchSize = DEFAULT_BATCH_SIZE
+
+  /** queue of entities to asynchronously post, plus the number of events 
in each entry */
+  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
+
+  /** limit on the total number of events permitted */
+  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
+
+  /**
+   * List of events which will be pulled into a timeline
+   * entity when created
+   */
+  private var 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-149643275
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r42538300
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /**
+   * Start time of the application, as received in the end event.
+   */
+  private var endTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var _batchSize = DEFAULT_BATCH_SIZE
+
+  /** queue of entities to asynchronously post, plus the number of events 
in each entry */
+  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
+
+  /** limit on the total number of events permitted */
+  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
+
+  /**
+   * List of events which will be pulled into a timeline
+   * entity when created
+   */
+  private var 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r42539302
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /**
+   * Start time of the application, as received in the end event.
+   */
+  private var endTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var _batchSize = DEFAULT_BATCH_SIZE
+
+  /** queue of entities to asynchronously post, plus the number of events 
in each entry */
+  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
+
+  /** limit on the total number of events permitted */
+  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
+
+  /**
+   * List of events which will be pulled into a timeline
+   * entity when created
+   */
+  private var 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-149642585
  
**[Test build #43986 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43986/consoleFull)**
 for PR 8744 at commit 
[`0dc3f13`](https://github.com/apache/spark/commit/0dc3f13339c2223dead09e4bb59d538593438c0e).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:\n  * `  
case class RegisterWorkerFailed(message: String) extends DeployMessage with 
RegisterWorkerResponse`\n  * `trait SchedulerExtensionService `\n  * `case 
class SchedulerExtensionServiceBinding(`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r42531200
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /**
+   * Start time of the application, as received in the end event.
+   */
+  private var endTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var _batchSize = DEFAULT_BATCH_SIZE
+
+  /** queue of entities to asynchronously post, plus the number of events 
in each entry */
+  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
+
+  /** limit on the total number of events permitted */
+  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
+
+  /**
+   * List of events which will be pulled into a timeline
+   * entity when created
+   */
+  private var 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r42540050
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /**
+   * Start time of the application, as received in the end event.
+   */
+  private var endTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var _batchSize = DEFAULT_BATCH_SIZE
+
+  /** queue of entities to asynchronously post, plus the number of events 
in each entry */
+  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
+
+  /** limit on the total number of events permitted */
+  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
+
+  /**
+   * List of events which will be pulled into a timeline
+   * entity when created
+   */
+  private var 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r42539027
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /**
+   * Start time of the application, as received in the end event.
+   */
+  private var endTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var _batchSize = DEFAULT_BATCH_SIZE
+
+  /** queue of entities to asynchronously post, plus the number of events 
in each entry */
+  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
+
+  /** limit on the total number of events permitted */
+  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
+
+  /**
+   * List of events which will be pulled into a timeline
+   * entity when created
+   */
+  private var 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-149608844
  
**[Test build #43975 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43975/consoleFull)**
 for PR 8744 at commit 
[`20216b4`](https://github.com/apache/spark/commit/20216b435b25a4757775cacec1b01882b6eacb31).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r42525776
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
--- End diff --

will do; will go to upper case word & review all javadocs to verify


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-149610234
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43975/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-149610233
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-149597210
  
**[Test build #43986 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43986/consoleFull)**
 for PR 8744 at commit 
[`0dc3f13`](https://github.com/apache/spark/commit/0dc3f13339c2223dead09e4bb59d538593438c0e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-149593229
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-149593131
  
**[Test build #43978 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43978/consoleFull)**
 for PR 8744 at commit 
[`3b7d31b`](https://github.com/apache/spark/commit/3b7d31b2a0f913140ced29285a8f26c46c24928f).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:\n  * 
`trait SchedulerExtensionService `\n  * `case class 
SchedulerExtensionServiceBinding(`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-149593230
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43978/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-149595578
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread steveloughran
Github user steveloughran commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-149595658
  
Test failed as there was a class `StubApplicationReport` which had been 
deleted but the deletion not pushed up. The YARN `ApplicationReport` class has 
grown on every hadoop release, and you can't have a stub/mock one that builds 
across Hadoop 2.x versions. The latest history-server tests have cut the class 
and are instead constructing the `ApplicationReportPBImpl` direct. And I've 
filed [YARN-4279](https://issues.apache.org/jira/browse/YARN-4279) to at least 
annotate the simple, stable attempt and application ID class constructors 
public. If mapreduce uses them, any YARN app should be able to.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-149595521
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-149563274
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-149563254
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-149564509
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-149564531
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-149567517
  
**[Test build #43978 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43978/consoleFull)**
 for PR 8744 at commit 
[`3b7d31b`](https://github.com/apache/spark/commit/3b7d31b2a0f913140ced29285a8f26c46c24928f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-20 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-149567694
  
**[Test build #43975 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43975/consoleFull)**
 for PR 8744 at commit 
[`20216b4`](https://github.com/apache/spark/commit/20216b435b25a4757775cacec1b01882b6eacb31).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-13 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r41951480
  
--- Diff: yarn/pom.xml ---
@@ -164,6 +164,92 @@
  
   
 
+  
+
--- End diff --

Can this say "The YARN application server..." there is already a different 
component in Spark called history server.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-13 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-147746836
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-13 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-147746916
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-13 Thread steveloughran
Github user steveloughran commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-147749231
  
I've pushed out a rebase of this against master.

1. The asynchronous publishing of events with retries and, eventually 
dropped events, appears to be stable, at least during unit and integration 
tests.
1. scalastyle is set up also scan the yarn/history source trees (the code 
is still there), and is happy.
1. I've renamed Service, SchedulerExtensionService, as I proposed earlier. 
That helps distinguish it from 'YARN services', which are entirely independent.
1. I've also moved the test of extension service instantiation into the 
`yarn/src/test` source tree as 
`org.apache.spark.scheduler.cluster.ExtensionServiceIntegrationSuite`.  With 
that isolated test, I can submit the extension as a self-contained patch for 
independent review. Note how the extension service takes a case class 
{{SchedulerExtensionServiceBinding}} as its binding, rathe than a series of 
parameters. That's so that in future, if new binding attributes were added 
(currently: context, yarn app ID and yarn attempt ID), it would remain 
backwards compatible with existing scheduler extensions.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-13 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-147749964
  
  [Test build #43644 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43644/consoleFull)
 for   PR 8744 at commit 
[`aac3a1d`](https://github.com/apache/spark/commit/aac3a1d5ab4557e02261155452192ac37b6a4c0a).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-13 Thread vanzin
Github user vanzin commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-147783828
  
@steveloughran anything that can break these patches into smaller PRs for 
review is welcome.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-13 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-147784582
  
  [Test build #43644 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43644/console)
 for   PR 8744 at commit 
[`aac3a1d`](https://github.com/apache/spark/commit/aac3a1d5ab4557e02261155452192ac37b6a4c0a).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `trait SchedulerExtensionService `
  * `case class SchedulerExtensionServiceBinding(`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-13 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-147784673
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-13 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-147784706
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43644/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-10-06 Thread steveloughran
Github user steveloughran commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-145818164
  
@tgravescs: ATS v1. What we got from the ATS team was a commitment to not 
break that API at some ws/v1/ endpoint they'll continue to offer.

However, V2 is designed to address other things (like availability), as 
well as scale. 1.x clients won't get that. 

This implementation is designed to keep load on ATS down (you can tell it 
not to do any refresh in the background), but an 8GB event, while it should be 
handled by leveldb, is a pretty serious json ser/deser load at both ends. I'd 
like to move to background playback where the getAppUI() returns the UI, but 
the UI is built up asynchronously, as you'd get from a running app. The 
filesystem history provider could do the same thing.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-29 Thread tgravescs
Github user tgravescs commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-144080509
  
Sorry I haven't had time to follow all of these changes,  which version of 
ATS are you testing this on and at what scale?  I'm assuming ATS v1 since v2 
still in devel.

We have been using ATS with tez and it can't handle nearly the load we need 
it to.  With Spark I see cases where that just gets worse if you have longer 
running iterative things. I have seen history files in the 8GB size on spark 
before. 

I thought the ATS v2 was going to try to keep api compatibility but don't 
think its guaranteed.  Obviously it doesn't hurt to get framework in place and 
I assume it will work for smaller clusters/# jobs.  Have you looked at ats v2 
at all to know if the changes to go to it would be fairly minimal?

Mostly just curious where this is at as I'm seeing a bunch of small issues 
with the Spark history server.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-28 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r40601188
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /**
+   * Start time of the application, as received in the end event.
+   */
+  private var endTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var _batchSize = DEFAULT_BATCH_SIZE
+
+  /** queue of entities to asynchronously post, plus the number of events 
in each entry */
+  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
+
+  /** limit on the total number of events permitted */
+  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
+
+  /**
+   * List of events which will be pulled into a timeline
+   * entity when created
+   */
+  private var pendingEvents = new 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-28 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r40605842
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /**
+   * Start time of the application, as received in the end event.
+   */
+  private var endTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var _batchSize = DEFAULT_BATCH_SIZE
+
+  /** queue of entities to asynchronously post, plus the number of events 
in each entry */
+  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
+
+  /** limit on the total number of events permitted */
+  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
+
+  /**
+   * List of events which will be pulled into a timeline
+   * entity when created
+   */
+  private var pendingEvents = new 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-28 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r40606009
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /**
+   * Start time of the application, as received in the end event.
+   */
+  private var endTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var _batchSize = DEFAULT_BATCH_SIZE
+
+  /** queue of entities to asynchronously post, plus the number of events 
in each entry */
+  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
+
+  /** limit on the total number of events permitted */
+  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
+
+  /**
+   * List of events which will be pulled into a timeline
+   * entity when created
+   */
+  private var pendingEvents = new 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-28 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r40609120
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /**
+   * Start time of the application, as received in the end event.
+   */
+  private var endTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var _batchSize = DEFAULT_BATCH_SIZE
+
+  /** queue of entities to asynchronously post, plus the number of events 
in each entry */
+  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
+
+  /** limit on the total number of events permitted */
+  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
+
+  /**
+   * List of events which will be pulled into a timeline
+   * entity when created
+   */
+  private var pendingEvents = new 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-28 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r40609137
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /**
+   * Start time of the application, as received in the end event.
+   */
+  private var endTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var _batchSize = DEFAULT_BATCH_SIZE
+
+  /** queue of entities to asynchronously post, plus the number of events 
in each entry */
+  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
+
+  /** limit on the total number of events permitted */
+  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
+
+  /**
+   * List of events which will be pulled into a timeline
+   * entity when created
+   */
+  private var pendingEvents = new 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-28 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r40600846
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnEventListener.scala
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import org.apache.spark.scheduler._
+import org.apache.spark.util.{Clock, SystemClock}
+import org.apache.spark.{Logging, SparkContext, SparkFirehoseListener}
+
+/**
+ * Spark listener which queues up all received events to the 
[[YarnHistoryService]] passed
+ * as a constructor. There's no attempt to filter event types at this 
point.
+ *
+ * @param sc context
+ * @param service service to forward events to
+ */
+private[spark] class YarnEventListener(sc: SparkContext, service: 
YarnHistoryService)
+  extends SparkFirehoseListener with Logging {
+
+  private val clock: Clock = new SystemClock()
--- End diff --

Not used?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-28 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r40600885
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
--- End diff --

nit: ordering


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-28 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r40606368
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /**
+   * Start time of the application, as received in the end event.
+   */
+  private var endTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var _batchSize = DEFAULT_BATCH_SIZE
+
+  /** queue of entities to asynchronously post, plus the number of events 
in each entry */
+  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
+
+  /** limit on the total number of events permitted */
+  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
+
+  /**
+   * List of events which will be pulled into a timeline
+   * entity when created
+   */
+  private var pendingEvents = new 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-28 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r40608807
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /**
+   * Start time of the application, as received in the end event.
+   */
+  private var endTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var _batchSize = DEFAULT_BATCH_SIZE
+
+  /** queue of entities to asynchronously post, plus the number of events 
in each entry */
+  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
+
+  /** limit on the total number of events permitted */
+  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
+
+  /**
+   * List of events which will be pulled into a timeline
+   * entity when created
+   */
+  private var pendingEvents = new 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-28 Thread vanzin
Github user vanzin commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-143882558
  
A lot of the logic in `YarnHistoryService` changed since the last review, 
so I might have missed some stuff. I didn't have the time to go through the 
rest yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-28 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r40601372
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /**
+   * Start time of the application, as received in the end event.
+   */
+  private var endTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var _batchSize = DEFAULT_BATCH_SIZE
+
+  /** queue of entities to asynchronously post, plus the number of events 
in each entry */
+  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
+
+  /** limit on the total number of events permitted */
+  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
+
+  /**
+   * List of events which will be pulled into a timeline
+   * entity when created
+   */
+  private var pendingEvents = new 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-28 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r40608080
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /**
+   * Start time of the application, as received in the end event.
+   */
+  private var endTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var _batchSize = DEFAULT_BATCH_SIZE
+
+  /** queue of entities to asynchronously post, plus the number of events 
in each entry */
+  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
+
+  /** limit on the total number of events permitted */
+  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
+
+  /**
+   * List of events which will be pulled into a timeline
+   * entity when created
+   */
+  private var pendingEvents = new 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-28 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r40608423
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /**
+   * Start time of the application, as received in the end event.
+   */
+  private var endTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var _batchSize = DEFAULT_BATCH_SIZE
+
+  /** queue of entities to asynchronously post, plus the number of events 
in each entry */
+  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
+
+  /** limit on the total number of events permitted */
+  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
+
+  /**
+   * List of events which will be pulled into a timeline
+   * entity when created
+   */
+  private var pendingEvents = new 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-28 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r40600962
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
--- End diff --

nit: add empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-28 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r40606942
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /**
+   * Start time of the application, as received in the end event.
+   */
+  private var endTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var _batchSize = DEFAULT_BATCH_SIZE
+
+  /** queue of entities to asynchronously post, plus the number of events 
in each entry */
+  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
+
+  /** limit on the total number of events permitted */
+  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
+
+  /**
+   * List of events which will be pulled into a timeline
+   * entity when created
+   */
+  private var pendingEvents = new 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-28 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r40607349
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /**
+   * Start time of the application, as received in the end event.
+   */
+  private var endTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var _batchSize = DEFAULT_BATCH_SIZE
+
+  /** queue of entities to asynchronously post, plus the number of events 
in each entry */
+  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
+
+  /** limit on the total number of events permitted */
+  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
+
+  /**
+   * List of events which will be pulled into a timeline
+   * entity when created
+   */
+  private var pendingEvents = new 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-28 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r40609349
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /**
+   * Start time of the application, as received in the end event.
+   */
+  private var endTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var _batchSize = DEFAULT_BATCH_SIZE
+
+  /** queue of entities to asynchronously post, plus the number of events 
in each entry */
+  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
+
+  /** limit on the total number of events permitted */
+  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
+
+  /**
+   * List of events which will be pulled into a timeline
+   * entity when created
+   */
+  private var pendingEvents = new 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-28 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r40601106
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
--- End diff --

nit: please be consistent re: single line vs. multi-line docs, and starting 
with capital or lower case letters.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-28 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r40601587
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /**
+   * Start time of the application, as received in the end event.
+   */
+  private var endTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var _batchSize = DEFAULT_BATCH_SIZE
+
+  /** queue of entities to asynchronously post, plus the number of events 
in each entry */
+  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
+
+  /** limit on the total number of events permitted */
+  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
+
+  /**
+   * List of events which will be pulled into a timeline
+   * entity when created
+   */
+  private var pendingEvents = new 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-28 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r40606120
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /**
+   * Start time of the application, as received in the end event.
+   */
+  private var endTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var _batchSize = DEFAULT_BATCH_SIZE
+
+  /** queue of entities to asynchronously post, plus the number of events 
in each entry */
+  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
+
+  /** limit on the total number of events permitted */
+  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
+
+  /**
+   * List of events which will be pulled into a timeline
+   * entity when created
+   */
+  private var pendingEvents = new 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-28 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r40606029
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /**
+   * Start time of the application, as received in the end event.
+   */
+  private var endTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var _batchSize = DEFAULT_BATCH_SIZE
+
+  /** queue of entities to asynchronously post, plus the number of events 
in each entry */
+  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
+
+  /** limit on the total number of events permitted */
+  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
+
+  /**
+   * List of events which will be pulled into a timeline
+   * entity when created
+   */
+  private var pendingEvents = new 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-28 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r40606779
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean, 
AtomicInteger}
+import java.util.concurrent.{TimeUnit, LinkedBlockingDeque}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelineEvent, TimelinePutResponse}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.{YarnExtensionService, 
YarnExtensionServiceBinding}
+import org.apache.spark.util.{SystemClock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * A Yarn Extension Service to post lifecycle events to a registered
+ * YARN Timeline Server.
+ */
+private[spark] class YarnHistoryService extends YarnExtensionService with 
Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+
+  /** Simple state model implemented in an atomic integer */
+  private val _serviceState = new AtomicInteger(CreatedState)
+
+  def serviceState: Int = {
+_serviceState.get()
+  }
+  def enterState(state: Int): Int = {
+logDebug(s"Entering state $state from $serviceState")
+_serviceState.getAndSet(state)
+  }
+
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** YARN configuration from the spark context */
+  private var config: YarnConfiguration = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var _timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Application name  from the spark start event */
+  private var applicationName: String = _
+
+  /** Application ID*/
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName = Utils.getCurrentUserName()
+
+  /** Clock for recording time */
+  private val clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /**
+   * Start time of the application, as received in the end event.
+   */
+  private var endTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var _batchSize = DEFAULT_BATCH_SIZE
+
+  /** queue of entities to asynchronously post, plus the number of events 
in each entry */
+  private var _entityQueue = new LinkedBlockingDeque[TimelineEntity]()
+
+  /** limit on the total number of events permitted */
+  private var _postQueueLimit = DEFAULT_POST_QUEUE_LIMIT
+
+  /**
+   * List of events which will be pulled into a timeline
+   * entity when created
+   */
+  private var pendingEvents = new 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-143024215
  
  [Test build #42976 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42976/consoleFull)
 for   PR 8744 at commit 
[`231301a`](https://github.com/apache/spark/commit/231301a943f449db1ccf46573f2e98a47034c797).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-143021446
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-143021415
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-143057982
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-24 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-143057931
  
  [Test build #42976 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42976/console)
 for   PR 8744 at commit 
[`231301a`](https://github.com/apache/spark/commit/231301a943f449db1ccf46573f2e98a47034c797).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `trait YarnExtensionService `
  * `case class YarnExtensionServiceBinding(`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-143057985
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42976/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-142672107
  
  [Test build #42912 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42912/consoleFull)
 for   PR 8744 at commit 
[`103047c`](https://github.com/apache/spark/commit/103047cb989669ae7b666b4ba52d728fed5b1b88).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-142670534
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-142670505
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-142711558
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42912/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-142711556
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-23 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/8744#issuecomment-142711417
  
  [Test build #42912 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42912/console)
 for   PR 8744 at commit 
[`103047c`](https://github.com/apache/spark/commit/103047cb989669ae7b666b4ba52d728fed5b1b88).
 * This patch **passes all tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `trait YarnExtensionService `
  * `case class YarnExtensionServiceBinding(`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r39633153
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnEventListener.scala
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import org.apache.spark.scheduler._
+import org.apache.spark.util.{SystemClock, Clock}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * Spark listener which queues up all received events to the 
[[YarnHistoryService]] passed
+ * as a constructor. There's no attempt to filter event types at this 
point.
+ * @param sc context
+ * @param service service to forward events to
+ */
+private[spark] class YarnEventListener(sc: SparkContext, service: 
YarnHistoryService)
+  extends SparkListener with Logging {
--- End diff --

Why yes it would —thank you for pointing that class out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r39635426
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,981 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+
+import scala.collection.mutable.LinkedList
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.service.{AbstractService, Service}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelinePutResponse}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.YarnExtensionService
+import org.apache.spark.util.{SystemClock, Clock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * Implements a Hadoop service with the init/start logic replaced by that
+ * of the YarnService.
+ * 
+ * As [[AbstractService]] implements `close()`, routing
+ * to its `stop` method, calling `close()` is sufficient
+ * to stop the service instance.
+ * 
+ * However, when registered to receive spark events, the service will 
continue to
+ * receive them until the spark context is stopped. Events received when 
this service
+ * is in a `STOPPED` state will be discarded.
+ */
+private[spark] class YarnHistoryService  extends AbstractService("History 
Service")
+  with YarnExtensionService with Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Applicaton name */
+  private var appName: String = _
+
+  /** Application ID from the spark start event */
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName: String = Utils.getCurrentUserName
+
+  /**
+   * Clock for recording time
+   */
+  private val clock: Clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var batchSize: Int = DEFAULT_BATCH_SIZE
+
+  /** queue of actions*/
+  private val actionQueue = new LinkedBlockingQueue[QueuedAction]
+
+  /** cache layer to handle timeline client failure.*/
+  private var entityList = new LinkedList[TimelineEntity]
+
+  /** current entity; wil be created on demand. */
+  private var curEntity: Option[TimelineEntity] = None
+
+  /** Has a start event been processed? */
+  private val appStartEventProcessed = new AtomicBoolean(false)
+
+  /* has the application event event been processed 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r39635411
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,981 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+
+import scala.collection.mutable.LinkedList
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.service.{AbstractService, Service}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelinePutResponse}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.YarnExtensionService
+import org.apache.spark.util.{SystemClock, Clock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * Implements a Hadoop service with the init/start logic replaced by that
+ * of the YarnService.
+ * 
+ * As [[AbstractService]] implements `close()`, routing
+ * to its `stop` method, calling `close()` is sufficient
+ * to stop the service instance.
+ * 
+ * However, when registered to receive spark events, the service will 
continue to
+ * receive them until the spark context is stopped. Events received when 
this service
+ * is in a `STOPPED` state will be discarded.
+ */
+private[spark] class YarnHistoryService  extends AbstractService("History 
Service")
+  with YarnExtensionService with Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Applicaton name */
+  private var appName: String = _
+
+  /** Application ID from the spark start event */
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName: String = Utils.getCurrentUserName
+
+  /**
+   * Clock for recording time
+   */
+  private val clock: Clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var batchSize: Int = DEFAULT_BATCH_SIZE
+
+  /** queue of actions*/
+  private val actionQueue = new LinkedBlockingQueue[QueuedAction]
+
+  /** cache layer to handle timeline client failure.*/
+  private var entityList = new LinkedList[TimelineEntity]
+
+  /** current entity; wil be created on demand. */
+  private var curEntity: Option[TimelineEntity] = None
+
+  /** Has a start event been processed? */
+  private val appStartEventProcessed = new AtomicBoolean(false)
+
+  /* has the application event event been processed 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r39638971
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,981 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+
+import scala.collection.mutable.LinkedList
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.service.{AbstractService, Service}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelinePutResponse}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.YarnExtensionService
+import org.apache.spark.util.{SystemClock, Clock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * Implements a Hadoop service with the init/start logic replaced by that
+ * of the YarnService.
+ * 
+ * As [[AbstractService]] implements `close()`, routing
+ * to its `stop` method, calling `close()` is sufficient
+ * to stop the service instance.
+ * 
+ * However, when registered to receive spark events, the service will 
continue to
+ * receive them until the spark context is stopped. Events received when 
this service
+ * is in a `STOPPED` state will be discarded.
+ */
+private[spark] class YarnHistoryService  extends AbstractService("History 
Service")
+  with YarnExtensionService with Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Applicaton name */
+  private var appName: String = _
+
+  /** Application ID from the spark start event */
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName: String = Utils.getCurrentUserName
+
+  /**
+   * Clock for recording time
+   */
+  private val clock: Clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var batchSize: Int = DEFAULT_BATCH_SIZE
+
+  /** queue of actions*/
+  private val actionQueue = new LinkedBlockingQueue[QueuedAction]
+
+  /** cache layer to handle timeline client failure.*/
+  private var entityList = new LinkedList[TimelineEntity]
+
+  /** current entity; wil be created on demand. */
+  private var curEntity: Option[TimelineEntity] = None
+
+  /** Has a start event been processed? */
+  private val appStartEventProcessed = new AtomicBoolean(false)
+
+  /* has the application event event been processed 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r39634546
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,981 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+
+import scala.collection.mutable.LinkedList
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.service.{AbstractService, Service}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelinePutResponse}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.YarnExtensionService
+import org.apache.spark.util.{SystemClock, Clock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * Implements a Hadoop service with the init/start logic replaced by that
+ * of the YarnService.
+ * 
+ * As [[AbstractService]] implements `close()`, routing
+ * to its `stop` method, calling `close()` is sufficient
+ * to stop the service instance.
+ * 
+ * However, when registered to receive spark events, the service will 
continue to
+ * receive them until the spark context is stopped. Events received when 
this service
+ * is in a `STOPPED` state will be discarded.
+ */
+private[spark] class YarnHistoryService  extends AbstractService("History 
Service")
--- End diff --

1. It makes it easier to aggregate up things, and it has a robust model for 
thread-safe subclassing through a lifecycle. Most of YARN is a composition of 
these; in fact the spark YARN client and AM could adopt it for better 
management of `AMRMClient`, `YarnClient`, `YarnExtensionServices` & the like

2. The hive-thriftserver code *almost* uses it. More specifically, it used 
a cut-and-paste of the Hadoop 2.04-alpha era and then abuses java reflection to 
bypass bits of the direct parent. Once Hive moves to Hadoop 2.2+ only then it 
can be moved back into the YARN model —though once you modify Hive 1.2.x to 
provide override points the spark-hivethriftserver module doesn't need the 
`ReflectedCompositeService` hack any more.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r39637541
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,981 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+
+import scala.collection.mutable.LinkedList
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.service.{AbstractService, Service}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelinePutResponse}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.YarnExtensionService
+import org.apache.spark.util.{SystemClock, Clock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * Implements a Hadoop service with the init/start logic replaced by that
+ * of the YarnService.
+ * 
+ * As [[AbstractService]] implements `close()`, routing
+ * to its `stop` method, calling `close()` is sufficient
+ * to stop the service instance.
+ * 
+ * However, when registered to receive spark events, the service will 
continue to
+ * receive them until the spark context is stopped. Events received when 
this service
+ * is in a `STOPPED` state will be discarded.
+ */
+private[spark] class YarnHistoryService  extends AbstractService("History 
Service")
+  with YarnExtensionService with Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Applicaton name */
+  private var appName: String = _
+
+  /** Application ID from the spark start event */
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName: String = Utils.getCurrentUserName
+
+  /**
+   * Clock for recording time
+   */
+  private val clock: Clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var batchSize: Int = DEFAULT_BATCH_SIZE
+
+  /** queue of actions*/
+  private val actionQueue = new LinkedBlockingQueue[QueuedAction]
+
+  /** cache layer to handle timeline client failure.*/
+  private var entityList = new LinkedList[TimelineEntity]
+
+  /** current entity; wil be created on demand. */
+  private var curEntity: Option[TimelineEntity] = None
+
+  /** Has a start event been processed? */
+  private val appStartEventProcessed = new AtomicBoolean(false)
+
+  /* has the application event event been processed 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r39635159
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,981 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+
+import scala.collection.mutable.LinkedList
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.service.{AbstractService, Service}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelinePutResponse}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.YarnExtensionService
+import org.apache.spark.util.{SystemClock, Clock, Utils}
--- End diff --

sorry, I'd thought Idea was getting it right. I'll review all of them and 
see if that 3rd party organiser helps. Keeping the imports organised is 
critical given how brittle it is across patches.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r39638560
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,981 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+
+import scala.collection.mutable.LinkedList
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.service.{AbstractService, Service}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelinePutResponse}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.YarnExtensionService
+import org.apache.spark.util.{SystemClock, Clock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * Implements a Hadoop service with the init/start logic replaced by that
+ * of the YarnService.
+ * 
+ * As [[AbstractService]] implements `close()`, routing
+ * to its `stop` method, calling `close()` is sufficient
+ * to stop the service instance.
+ * 
+ * However, when registered to receive spark events, the service will 
continue to
+ * receive them until the spark context is stopped. Events received when 
this service
+ * is in a `STOPPED` state will be discarded.
+ */
+private[spark] class YarnHistoryService  extends AbstractService("History 
Service")
+  with YarnExtensionService with Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Applicaton name */
+  private var appName: String = _
+
+  /** Application ID from the spark start event */
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName: String = Utils.getCurrentUserName
+
+  /**
+   * Clock for recording time
+   */
+  private val clock: Clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var batchSize: Int = DEFAULT_BATCH_SIZE
+
+  /** queue of actions*/
+  private val actionQueue = new LinkedBlockingQueue[QueuedAction]
+
+  /** cache layer to handle timeline client failure.*/
+  private var entityList = new LinkedList[TimelineEntity]
+
+  /** current entity; wil be created on demand. */
+  private var curEntity: Option[TimelineEntity] = None
+
+  /** Has a start event been processed? */
+  private val appStartEventProcessed = new AtomicBoolean(false)
+
+  /* has the application event event been processed 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r39638651
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,981 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+
+import scala.collection.mutable.LinkedList
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.service.{AbstractService, Service}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelinePutResponse}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.YarnExtensionService
+import org.apache.spark.util.{SystemClock, Clock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * Implements a Hadoop service with the init/start logic replaced by that
+ * of the YarnService.
+ * 
+ * As [[AbstractService]] implements `close()`, routing
+ * to its `stop` method, calling `close()` is sufficient
+ * to stop the service instance.
+ * 
+ * However, when registered to receive spark events, the service will 
continue to
+ * receive them until the spark context is stopped. Events received when 
this service
+ * is in a `STOPPED` state will be discarded.
+ */
+private[spark] class YarnHistoryService  extends AbstractService("History 
Service")
+  with YarnExtensionService with Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Applicaton name */
+  private var appName: String = _
+
+  /** Application ID from the spark start event */
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName: String = Utils.getCurrentUserName
+
+  /**
+   * Clock for recording time
+   */
+  private val clock: Clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var batchSize: Int = DEFAULT_BATCH_SIZE
+
+  /** queue of actions*/
+  private val actionQueue = new LinkedBlockingQueue[QueuedAction]
+
+  /** cache layer to handle timeline client failure.*/
+  private var entityList = new LinkedList[TimelineEntity]
+
+  /** current entity; wil be created on demand. */
+  private var curEntity: Option[TimelineEntity] = None
+
+  /** Has a start event been processed? */
+  private val appStartEventProcessed = new AtomicBoolean(false)
+
+  /* has the application event event been processed 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r39638893
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,981 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+
+import scala.collection.mutable.LinkedList
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.service.{AbstractService, Service}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelinePutResponse}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.YarnExtensionService
+import org.apache.spark.util.{SystemClock, Clock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * Implements a Hadoop service with the init/start logic replaced by that
+ * of the YarnService.
+ * 
+ * As [[AbstractService]] implements `close()`, routing
+ * to its `stop` method, calling `close()` is sufficient
+ * to stop the service instance.
+ * 
+ * However, when registered to receive spark events, the service will 
continue to
+ * receive them until the spark context is stopped. Events received when 
this service
+ * is in a `STOPPED` state will be discarded.
+ */
+private[spark] class YarnHistoryService  extends AbstractService("History 
Service")
+  with YarnExtensionService with Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Applicaton name */
+  private var appName: String = _
+
+  /** Application ID from the spark start event */
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName: String = Utils.getCurrentUserName
+
+  /**
+   * Clock for recording time
+   */
+  private val clock: Clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var batchSize: Int = DEFAULT_BATCH_SIZE
+
+  /** queue of actions*/
+  private val actionQueue = new LinkedBlockingQueue[QueuedAction]
+
+  /** cache layer to handle timeline client failure.*/
+  private var entityList = new LinkedList[TimelineEntity]
+
+  /** current entity; wil be created on demand. */
+  private var curEntity: Option[TimelineEntity] = None
+
+  /** Has a start event been processed? */
+  private val appStartEventProcessed = new AtomicBoolean(false)
+
+  /* has the application event event been processed 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r39637695
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,981 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+
+import scala.collection.mutable.LinkedList
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.service.{AbstractService, Service}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelinePutResponse}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.YarnExtensionService
+import org.apache.spark.util.{SystemClock, Clock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * Implements a Hadoop service with the init/start logic replaced by that
+ * of the YarnService.
+ * 
+ * As [[AbstractService]] implements `close()`, routing
+ * to its `stop` method, calling `close()` is sufficient
+ * to stop the service instance.
+ * 
+ * However, when registered to receive spark events, the service will 
continue to
+ * receive them until the spark context is stopped. Events received when 
this service
+ * is in a `STOPPED` state will be discarded.
+ */
+private[spark] class YarnHistoryService  extends AbstractService("History 
Service")
+  with YarnExtensionService with Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Applicaton name */
+  private var appName: String = _
+
+  /** Application ID from the spark start event */
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName: String = Utils.getCurrentUserName
+
+  /**
+   * Clock for recording time
+   */
+  private val clock: Clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var batchSize: Int = DEFAULT_BATCH_SIZE
+
+  /** queue of actions*/
+  private val actionQueue = new LinkedBlockingQueue[QueuedAction]
+
+  /** cache layer to handle timeline client failure.*/
+  private var entityList = new LinkedList[TimelineEntity]
+
+  /** current entity; wil be created on demand. */
+  private var curEntity: Option[TimelineEntity] = None
+
+  /** Has a start event been processed? */
+  private val appStartEventProcessed = new AtomicBoolean(false)
+
+  /* has the application event event been processed 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r39638905
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,981 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+
+import scala.collection.mutable.LinkedList
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.service.{AbstractService, Service}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelinePutResponse}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.YarnExtensionService
+import org.apache.spark.util.{SystemClock, Clock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * Implements a Hadoop service with the init/start logic replaced by that
+ * of the YarnService.
+ * 
+ * As [[AbstractService]] implements `close()`, routing
+ * to its `stop` method, calling `close()` is sufficient
+ * to stop the service instance.
+ * 
+ * However, when registered to receive spark events, the service will 
continue to
+ * receive them until the spark context is stopped. Events received when 
this service
+ * is in a `STOPPED` state will be discarded.
+ */
+private[spark] class YarnHistoryService  extends AbstractService("History 
Service")
+  with YarnExtensionService with Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Applicaton name */
+  private var appName: String = _
+
+  /** Application ID from the spark start event */
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName: String = Utils.getCurrentUserName
+
+  /**
+   * Clock for recording time
+   */
+  private val clock: Clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var batchSize: Int = DEFAULT_BATCH_SIZE
+
+  /** queue of actions*/
+  private val actionQueue = new LinkedBlockingQueue[QueuedAction]
+
+  /** cache layer to handle timeline client failure.*/
+  private var entityList = new LinkedList[TimelineEntity]
+
+  /** current entity; wil be created on demand. */
+  private var curEntity: Option[TimelineEntity] = None
+
+  /** Has a start event been processed? */
+  private val appStartEventProcessed = new AtomicBoolean(false)
+
+  /* has the application event event been processed 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r39643031
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,981 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+
+import scala.collection.mutable.LinkedList
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.service.{AbstractService, Service}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelinePutResponse}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.YarnExtensionService
+import org.apache.spark.util.{SystemClock, Clock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * Implements a Hadoop service with the init/start logic replaced by that
+ * of the YarnService.
+ * 
+ * As [[AbstractService]] implements `close()`, routing
+ * to its `stop` method, calling `close()` is sufficient
+ * to stop the service instance.
+ * 
+ * However, when registered to receive spark events, the service will 
continue to
+ * receive them until the spark context is stopped. Events received when 
this service
+ * is in a `STOPPED` state will be discarded.
+ */
+private[spark] class YarnHistoryService  extends AbstractService("History 
Service")
+  with YarnExtensionService with Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Applicaton name */
+  private var appName: String = _
+
+  /** Application ID from the spark start event */
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName: String = Utils.getCurrentUserName
+
+  /**
+   * Clock for recording time
+   */
+  private val clock: Clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var batchSize: Int = DEFAULT_BATCH_SIZE
+
+  /** queue of actions*/
+  private val actionQueue = new LinkedBlockingQueue[QueuedAction]
+
+  /** cache layer to handle timeline client failure.*/
+  private var entityList = new LinkedList[TimelineEntity]
+
+  /** current entity; wil be created on demand. */
+  private var curEntity: Option[TimelineEntity] = None
+
+  /** Has a start event been processed? */
+  private val appStartEventProcessed = new AtomicBoolean(false)
+
+  /* has the application event event been processed 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r39643240
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,981 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+
+import scala.collection.mutable.LinkedList
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.service.{AbstractService, Service}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelinePutResponse}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.YarnExtensionService
+import org.apache.spark.util.{SystemClock, Clock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * Implements a Hadoop service with the init/start logic replaced by that
+ * of the YarnService.
+ * 
+ * As [[AbstractService]] implements `close()`, routing
+ * to its `stop` method, calling `close()` is sufficient
+ * to stop the service instance.
+ * 
+ * However, when registered to receive spark events, the service will 
continue to
+ * receive them until the spark context is stopped. Events received when 
this service
+ * is in a `STOPPED` state will be discarded.
+ */
+private[spark] class YarnHistoryService  extends AbstractService("History 
Service")
+  with YarnExtensionService with Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Applicaton name */
+  private var appName: String = _
+
+  /** Application ID from the spark start event */
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName: String = Utils.getCurrentUserName
+
+  /**
+   * Clock for recording time
+   */
+  private val clock: Clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var batchSize: Int = DEFAULT_BATCH_SIZE
+
+  /** queue of actions*/
+  private val actionQueue = new LinkedBlockingQueue[QueuedAction]
+
+  /** cache layer to handle timeline client failure.*/
+  private var entityList = new LinkedList[TimelineEntity]
+
+  /** current entity; wil be created on demand. */
+  private var curEntity: Option[TimelineEntity] = None
+
+  /** Has a start event been processed? */
+  private val appStartEventProcessed = new AtomicBoolean(false)
+
+  /* has the application event event been processed 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r39643548
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,981 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+
+import scala.collection.mutable.LinkedList
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.service.{AbstractService, Service}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelinePutResponse}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.YarnExtensionService
+import org.apache.spark.util.{SystemClock, Clock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * Implements a Hadoop service with the init/start logic replaced by that
+ * of the YarnService.
+ * 
+ * As [[AbstractService]] implements `close()`, routing
+ * to its `stop` method, calling `close()` is sufficient
+ * to stop the service instance.
+ * 
+ * However, when registered to receive spark events, the service will 
continue to
+ * receive them until the spark context is stopped. Events received when 
this service
+ * is in a `STOPPED` state will be discarded.
+ */
+private[spark] class YarnHistoryService  extends AbstractService("History 
Service")
+  with YarnExtensionService with Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Applicaton name */
+  private var appName: String = _
+
+  /** Application ID from the spark start event */
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName: String = Utils.getCurrentUserName
+
+  /**
+   * Clock for recording time
+   */
+  private val clock: Clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var batchSize: Int = DEFAULT_BATCH_SIZE
+
+  /** queue of actions*/
+  private val actionQueue = new LinkedBlockingQueue[QueuedAction]
+
+  /** cache layer to handle timeline client failure.*/
+  private var entityList = new LinkedList[TimelineEntity]
+
+  /** current entity; wil be created on demand. */
+  private var curEntity: Option[TimelineEntity] = None
+
+  /** Has a start event been processed? */
+  private val appStartEventProcessed = new AtomicBoolean(false)
+
+  /* has the application event event been processed 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r39644786
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,981 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+
+import scala.collection.mutable.LinkedList
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.service.{AbstractService, Service}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelinePutResponse}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.YarnExtensionService
+import org.apache.spark.util.{SystemClock, Clock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * Implements a Hadoop service with the init/start logic replaced by that
+ * of the YarnService.
+ * 
+ * As [[AbstractService]] implements `close()`, routing
+ * to its `stop` method, calling `close()` is sufficient
+ * to stop the service instance.
+ * 
+ * However, when registered to receive spark events, the service will 
continue to
+ * receive them until the spark context is stopped. Events received when 
this service
+ * is in a `STOPPED` state will be discarded.
+ */
+private[spark] class YarnHistoryService  extends AbstractService("History 
Service")
+  with YarnExtensionService with Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Applicaton name */
+  private var appName: String = _
+
+  /** Application ID from the spark start event */
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName: String = Utils.getCurrentUserName
+
+  /**
+   * Clock for recording time
+   */
+  private val clock: Clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var batchSize: Int = DEFAULT_BATCH_SIZE
+
+  /** queue of actions*/
+  private val actionQueue = new LinkedBlockingQueue[QueuedAction]
+
+  /** cache layer to handle timeline client failure.*/
+  private var entityList = new LinkedList[TimelineEntity]
+
+  /** current entity; wil be created on demand. */
+  private var curEntity: Option[TimelineEntity] = None
+
+  /** Has a start event been processed? */
+  private val appStartEventProcessed = new AtomicBoolean(false)
+
+  /* has the application event event been processed 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r39645121
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,981 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+
+import scala.collection.mutable.LinkedList
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.service.{AbstractService, Service}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelinePutResponse}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.YarnExtensionService
+import org.apache.spark.util.{SystemClock, Clock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * Implements a Hadoop service with the init/start logic replaced by that
+ * of the YarnService.
+ * 
+ * As [[AbstractService]] implements `close()`, routing
+ * to its `stop` method, calling `close()` is sufficient
+ * to stop the service instance.
+ * 
+ * However, when registered to receive spark events, the service will 
continue to
+ * receive them until the spark context is stopped. Events received when 
this service
+ * is in a `STOPPED` state will be discarded.
+ */
+private[spark] class YarnHistoryService  extends AbstractService("History 
Service")
+  with YarnExtensionService with Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Applicaton name */
+  private var appName: String = _
+
+  /** Application ID from the spark start event */
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName: String = Utils.getCurrentUserName
+
+  /**
+   * Clock for recording time
+   */
+  private val clock: Clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var batchSize: Int = DEFAULT_BATCH_SIZE
+
+  /** queue of actions*/
+  private val actionQueue = new LinkedBlockingQueue[QueuedAction]
+
+  /** cache layer to handle timeline client failure.*/
+  private var entityList = new LinkedList[TimelineEntity]
+
+  /** current entity; wil be created on demand. */
+  private var curEntity: Option[TimelineEntity] = None
+
+  /** Has a start event been processed? */
+  private val appStartEventProcessed = new AtomicBoolean(false)
+
+  /* has the application event event been processed 

[GitHub] spark pull request: SPARK-1537 publisher-side code and tests

2015-09-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/8744#discussion_r39645278
  
--- Diff: 
yarn/history/src/main/scala/org/apache/spark/deploy/history/yarn/YarnHistoryService.scala
 ---
@@ -0,0 +1,981 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history.yarn
+
+import java.net.{ConnectException, URI}
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+
+import scala.collection.mutable.LinkedList
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.service.{AbstractService, Service}
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, 
ApplicationId}
+import org.apache.hadoop.yarn.api.records.timeline.{TimelineDomain, 
TimelineEntity, TimelinePutResponse}
+import org.apache.hadoop.yarn.client.api.TimelineClient
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.spark.deploy.history.yarn.YarnTimelineUtils._
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.YarnExtensionService
+import org.apache.spark.util.{SystemClock, Clock, Utils}
+import org.apache.spark.{Logging, SparkContext}
+
+/**
+ * Implements a Hadoop service with the init/start logic replaced by that
+ * of the YarnService.
+ * 
+ * As [[AbstractService]] implements `close()`, routing
+ * to its `stop` method, calling `close()` is sufficient
+ * to stop the service instance.
+ * 
+ * However, when registered to receive spark events, the service will 
continue to
+ * receive them until the spark context is stopped. Events received when 
this service
+ * is in a `STOPPED` state will be discarded.
+ */
+private[spark] class YarnHistoryService  extends AbstractService("History 
Service")
+  with YarnExtensionService with Logging {
+
+  import org.apache.spark.deploy.history.yarn.YarnHistoryService._
+  /**
+   * Spark context; valid once started
+   */
+  private var sparkContext: SparkContext = _
+
+  /** application ID. */
+  private var _applicationId: ApplicationId = _
+
+  /** attempt ID this will be null if the service is started in 
yarn-client mode */
+  private var _attemptId: Option[ApplicationAttemptId] = None
+
+  /** YARN timeline client */
+  private var timelineClient: Option[TimelineClient] = None
+
+  /** registered event listener */
+  private var listener: Option[YarnEventListener] = None
+
+  /** Applicaton name */
+  private var appName: String = _
+
+  /** Application ID from the spark start event */
+  private var sparkApplicationId: Option[String] = None
+
+  /** Optional Attempt ID from the spark start event */
+  private var sparkApplicationAttemptId: Option[String] = None
+
+  /** user name as derived from `SPARK_USER` env var or `UGI` */
+  private var userName: String = Utils.getCurrentUserName
+
+  /**
+   * Clock for recording time
+   */
+  private val clock: Clock = new SystemClock()
+
+  /**
+   * Start time of the application, as received in the start event.
+   */
+  private var startTime: Long = _
+
+  /** number of events to batch up before posting*/
+  private var batchSize: Int = DEFAULT_BATCH_SIZE
+
+  /** queue of actions*/
+  private val actionQueue = new LinkedBlockingQueue[QueuedAction]
+
+  /** cache layer to handle timeline client failure.*/
+  private var entityList = new LinkedList[TimelineEntity]
+
+  /** current entity; wil be created on demand. */
+  private var curEntity: Option[TimelineEntity] = None
+
+  /** Has a start event been processed? */
+  private val appStartEventProcessed = new AtomicBoolean(false)
+
+  /* has the application event event been processed 

  1   2   3   >