Joal has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/272977

Change subject: Update CamusPartitionChecker (errors and history)
......................................................................

Update CamusPartitionChecker (errors and history)

The checker now continues to check and flag partitions
if one partition is in error.
The checker now can be applied to the N last camus runs
instead of just 1.

Bug: T127909
Change-Id: Ia317557fed310cb04050924660547fdd34b5119a
---
M 
refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala
M 
refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala
M 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala
3 files changed, 71 insertions(+), 36 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source 
refs/changes/77/272977/1

diff --git 
a/refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala
 
b/refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala
index 333d46d..589ab40 100644
--- 
a/refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala
+++ 
b/refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala
@@ -66,18 +66,25 @@
   }
 
   /**
-   * Finds the most recent run in a camus-history folder
-   * @return the most recent run path
-   */
-  def mostRecentRun(path: Path): Path = {
-      // Filter folders with format YYYY-MM-DD-HH-MM-SS
-      // Sort folders by name DESC
-      val executions = fs.listStatus(path, new 
RegexpPathFilter("[0-9]{4}(-[0-9]{2}){5}")).map(_.getPath)
-                         .sortWith((f1, f2) => 
f1.getName().compareTo(f2.getName()) < 0)
-      if (executions.length > 0)
-        executions(executions.length - 1)
-      else
-        throw new IllegalArgumentException("Given Path is doesn't contain 
camus-run folders.")
+    * Finds the most recent runs in a camus-history folder,
+    * up to a certain number of runs.
+    * @return the sequence of most recent run paths in increasing time order
+    */
+  def mostRecentRuns(path: Path, upTo: Int): Seq[Path] = {
+    // Filter folders with format YYYY-MM-DD-HH-MM-SS
+    // Sort folders by name DESC
+    val executions = fs.listStatus(path, new 
RegexpPathFilter("[0-9]{4}(-[0-9]{2}){5}")).map(_.getPath)
+      .sortWith((f1, f2) => f1.getName().compareTo(f2.getName()) > 0)
+    if (executions.length > 0)
+      executions.take(upTo).reverse
+    else
+      throw new IllegalArgumentException("Given Path doesn't contain camus-run 
folders.")
   }
 
+  /**
+    * Finds the most recent run in a camus-history folder
+    * @return the most recent run path
+    */
+  def mostRecentRun(path: Path): Path = mostRecentRuns(path, 1).head
+
 }
diff --git 
a/refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala
 
b/refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala
index dde5dce..ed4b209 100644
--- 
a/refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala
+++ 
b/refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala
@@ -11,6 +11,7 @@
   val camusHistoryTestFolder = "src/test/resources/camus-test-data"
   val runFolder = "2015-08-15-17-52-01"
   val mostRecentRunFolder = "2015-10-02-08-00-07"
+  val twoMostRecentRunFolders = Seq("2015-09-29-15-20-08", 
"2015-10-02-08-00-07")
   val wrongFolder = "wrong-folder"
   val fs = FileSystem.get(new Configuration)
   val cr = new CamusStatusReader(fs)
@@ -86,6 +87,19 @@
     previousOffsetsFilesNames should be (Seq.empty)
   }
 
+  it should "return the most recent camus runs in a camus-history folder" in {
+    val folder: String = camusHistoryTestFolder
+    val path: Path = new Path(folder)
+
+    val mostRecentRunsPath = cr.mostRecentRuns(path, 2)
+
+    (twoMostRecentRunFolders zip mostRecentRunsPath).foreach( {
+      case (expected_path: String, path: Path) => {
+        path.getName should equal (expected_path)
+      }
+    })
+  }
+
   it should "return the most recent camus run in a camus-history folder" in {
     val folder: String = camusHistoryTestFolder
     val path: Path = new Path(folder)
diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala
index 7c371b1..f63cacd 100644
--- 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala
@@ -3,6 +3,7 @@
 import java.io.FileInputStream
 import java.util.Properties
 
+import com.github.nscala_time.time.Imports._
 import com.linkedin.camus.etl.kafka.CamusJob
 import com.linkedin.camus.etl.kafka.common.EtlKey
 import com.linkedin.camus.etl.kafka.mapred.{EtlInputFormat, 
EtlMultiOutputFormat}
@@ -10,10 +11,9 @@
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.log4j.{LogManager, Logger}
-import org.joda.time.{Hours, DateTimeZone, DateTime}
+import org.joda.time.{DateTime, Hours}
 import org.wikimedia.analytics.refinery.camus.CamusStatusReader
 import scopt.OptionParser
-import com.github.nscala_time.time.Imports._
 
 /**
  * Class marking checking camus runs based on a camus.properties file.
@@ -121,6 +121,7 @@
 
   case class Params(camusPropertiesFilePath: String = "",
                     datetimeToCheck: Option[String] = None,
+                    mostRecentRunsToCheck: Int = 1,
                     hadoopCoreSitePath: String = 
"/etc/hadoop/conf/core-site.xml",
                     hadoopHdfsSitePath: String = 
"/etc/hadoop/conf/hdfs-site.xml",
                     flag: String = "_IMPORTED",
@@ -129,7 +130,8 @@
   val argsParser = new OptionParser[Params]("Camus Checker") {
     head("Camus partition checker", "")
     note(
-      "This job checked for most recent camus run correctness and flag hour 
partitions when fully imported.")
+      "This job checked camus runs correctness and flag hour partitions when 
fully imported.\n" +
+        "\tWhen dateTimeToCheck parameter is set, is overrides 
mostRecentRunsToCheck parameter.")
     help("help") text ("Prints this usage text")
 
     opt[String]('c', "camus-properties-file") required() valueName ("<path>") 
action { (x, p) =>
@@ -138,7 +140,12 @@
 
     opt[String]('d', "datetimeToCheck") optional() valueName 
("yyyy-mm-dd-HH-MM-SS") action { (x, p) =>
       p.copy(datetimeToCheck = Some(x))
-    } text ("Datetime camus run to check (must be present in history folder) - 
Default to most recent run.")
+    } text ("Datetime camus run to check (must be present in history folder)")
+
+    opt[Int]('n', "mostRecentRunsToCheck") optional() valueName ("<num>") 
action { (x, p) =>
+      p.copy(mostRecentRunsToCheck = x)
+    } validate { x => if (x > 0) success else failure("mostRecentRunsToCheck 
must be greater than 0")
+    } text ("Number of most recent camus runs to check (default to 1, 
overwritten by datetimeToCheck if set).")
 
     opt[String]("hadoop-core-site-file") optional() valueName ("<path>") 
action { (x, p) =>
       p.copy(hadoopCoreSitePath = x)
@@ -186,35 +193,42 @@
           log.info("Loading camus properties file.")
           props.load(new FileInputStream(params.camusPropertiesFilePath))
 
-          val camusPathToCheck: Path = {
-            val history_folder = 
props.getProperty(CamusJob.ETL_EXECUTION_HISTORY_PATH)
-            if (params.datetimeToCheck.isEmpty) {
-              log.info("Getting camus most recent run from history folder.")
-              camusReader.mostRecentRun(new Path(history_folder))
-            } else {
+          val camusPathsToCheck: Seq[Path] = {
+
+            if (params.datetimeToCheck.isDefined) {
               val p = new 
Path(props.getProperty(CamusJob.ETL_EXECUTION_HISTORY_PATH) + "/" + 
params.datetimeToCheck.get)
               if (fs.isDirectory(p)) {
                 log.info("Set job to given datetime to check.")
-                p
+                Seq(p)
               } else {
-                log.error("The given datetime to check is not a folder in 
camus history.")
-                null
+                throw new IllegalArgumentException("The given datetime to 
check is not a folder in camus history.")
               }
+            } else {
+              log.info(s"Getting ${params.mostRecentRunsToCheck} camus most 
recent runs from history folder.")
+              val history_folder = 
props.getProperty(CamusJob.ETL_EXECUTION_HISTORY_PATH)
+              camusReader.mostRecentRuns(new Path(history_folder), 
params.mostRecentRunsToCheck)
             }
           }
-          if (null == camusPathToCheck)
-            System.exit(1)
-
-          log.info("Checking job correctness and computing partitions to flag 
as imported.")
-          val topicsAndHours = getTopicsAndHoursToFlag(camusPathToCheck)
-
-          log.info("Job is correct, flag imported partitions.")
-          flagFullyImportedPartitions(params.flag, params.dryRun, 
topicsAndHours)
-
-          log.info("Done.")
+          log.info(s"Working ${camusPathsToCheck.size} camus history folders.")
+          val (sucesses, errors) = camusPathsToCheck.foldLeft((0, 
0))((successes_errors: (Int, Int), p: Path) => {
+            log.info(s"Checking ${p.toString}")
+            try {
+              val topicsAndHours = getTopicsAndHoursToFlag(p)
+              log.info(s"Flagging imported partitions for ${p.toString}")
+              flagFullyImportedPartitions(params.flag, params.dryRun, 
topicsAndHours)
+              log.info(s"Done ${p.toString}.")
+              (successes_errors._1 + 1, successes_errors._2)
+            } catch {
+              case e: IllegalStateException => {
+                log.error(s"An error occured while processing ${p}.", e)
+                (successes_errors._1, successes_errors._2 + 1)
+              }
+            }
+          })
+          log.info(s"Done  - ${sucesses} correct folders and ${errors} folders 
generating an error.")
         } catch {
           case e: Exception => {
-            log.error("An error occured during execution.", e)
+            log.error("A fatal error occurred during execution.", e)
             sys.exit(1)
           }
         }

-- 
To view, visit https://gerrit.wikimedia.org/r/272977
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ia317557fed310cb04050924660547fdd34b5119a
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Joal <j...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to