Joal has uploaded a new change for review. https://gerrit.wikimedia.org/r/272977
Change subject: Update CamusPartitionChecker (errors and history) ...................................................................... Update CamusPartitionChecker (errors and history) The checker now continues to check and flag partitions if one partition is in error. The checker now can be applied to the N last camus runs instead of just 1. Bug: T127909 Change-Id: Ia317557fed310cb04050924660547fdd34b5119a --- M refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala M refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala M refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala 3 files changed, 71 insertions(+), 36 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source refs/changes/77/272977/1 diff --git a/refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala b/refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala index 333d46d..589ab40 100644 --- a/refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala +++ b/refinery-camus/src/main/scala/org/wikimedia/analytics/refinery/camus/CamusStatusReader.scala @@ -66,18 +66,25 @@ } /** - * Finds the most recent run in a camus-history folder - * @return the most recent run path - */ - def mostRecentRun(path: Path): Path = { - // Filter folders with format YYYY-MM-DD-HH-MM-SS - // Sort folders by name DESC - val executions = fs.listStatus(path, new RegexpPathFilter("[0-9]{4}(-[0-9]{2}){5}")).map(_.getPath) - .sortWith((f1, f2) => f1.getName().compareTo(f2.getName()) < 0) - if (executions.length > 0) - executions(executions.length - 1) - else - throw new IllegalArgumentException("Given Path is doesn't contain camus-run folders.") + * Finds the most recent runs in a camus-history folder, + * up to a certain number of runs. + * @return the sequence of most recent run paths in increasing time order + */ + def mostRecentRuns(path: Path, upTo: Int): Seq[Path] = { + // Filter folders with format YYYY-MM-DD-HH-MM-SS + // Sort folders by name DESC + val executions = fs.listStatus(path, new RegexpPathFilter("[0-9]{4}(-[0-9]{2}){5}")).map(_.getPath) + .sortWith((f1, f2) => f1.getName().compareTo(f2.getName()) > 0) + if (executions.length > 0) + executions.take(upTo).reverse + else + throw new IllegalArgumentException("Given Path doesn't contain camus-run folders.") } + /** + * Finds the most recent run in a camus-history folder + * @return the most recent run path + */ + def mostRecentRun(path: Path): Path = mostRecentRuns(path, 1).head + } diff --git a/refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala b/refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala index dde5dce..ed4b209 100644 --- a/refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala +++ b/refinery-camus/src/test/scala/org/wikimedia/analytics/refinery/camus/TestCamusStatusReader.scala @@ -11,6 +11,7 @@ val camusHistoryTestFolder = "src/test/resources/camus-test-data" val runFolder = "2015-08-15-17-52-01" val mostRecentRunFolder = "2015-10-02-08-00-07" + val twoMostRecentRunFolders = Seq("2015-09-29-15-20-08", "2015-10-02-08-00-07") val wrongFolder = "wrong-folder" val fs = FileSystem.get(new Configuration) val cr = new CamusStatusReader(fs) @@ -86,6 +87,19 @@ previousOffsetsFilesNames should be (Seq.empty) } + it should "return the most recent camus runs in a camus-history folder" in { + val folder: String = camusHistoryTestFolder + val path: Path = new Path(folder) + + val mostRecentRunsPath = cr.mostRecentRuns(path, 2) + + (twoMostRecentRunFolders zip mostRecentRunsPath).foreach( { + case (expected_path: String, path: Path) => { + path.getName should equal (expected_path) + } + }) + } + it should "return the most recent camus run in a camus-history folder" in { val folder: String = camusHistoryTestFolder val path: Path = new Path(folder) diff --git a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala index 7c371b1..f63cacd 100644 --- a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala +++ b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/CamusPartitionChecker.scala @@ -3,6 +3,7 @@ import java.io.FileInputStream import java.util.Properties +import com.github.nscala_time.time.Imports._ import com.linkedin.camus.etl.kafka.CamusJob import com.linkedin.camus.etl.kafka.common.EtlKey import com.linkedin.camus.etl.kafka.mapred.{EtlInputFormat, EtlMultiOutputFormat} @@ -10,10 +11,9 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.log4j.{LogManager, Logger} -import org.joda.time.{Hours, DateTimeZone, DateTime} +import org.joda.time.{DateTime, Hours} import org.wikimedia.analytics.refinery.camus.CamusStatusReader import scopt.OptionParser -import com.github.nscala_time.time.Imports._ /** * Class marking checking camus runs based on a camus.properties file. @@ -121,6 +121,7 @@ case class Params(camusPropertiesFilePath: String = "", datetimeToCheck: Option[String] = None, + mostRecentRunsToCheck: Int = 1, hadoopCoreSitePath: String = "/etc/hadoop/conf/core-site.xml", hadoopHdfsSitePath: String = "/etc/hadoop/conf/hdfs-site.xml", flag: String = "_IMPORTED", @@ -129,7 +130,8 @@ val argsParser = new OptionParser[Params]("Camus Checker") { head("Camus partition checker", "") note( - "This job checked for most recent camus run correctness and flag hour partitions when fully imported.") + "This job checked camus runs correctness and flag hour partitions when fully imported.\n" + + "\tWhen dateTimeToCheck parameter is set, is overrides mostRecentRunsToCheck parameter.") help("help") text ("Prints this usage text") opt[String]('c', "camus-properties-file") required() valueName ("<path>") action { (x, p) => @@ -138,7 +140,12 @@ opt[String]('d', "datetimeToCheck") optional() valueName ("yyyy-mm-dd-HH-MM-SS") action { (x, p) => p.copy(datetimeToCheck = Some(x)) - } text ("Datetime camus run to check (must be present in history folder) - Default to most recent run.") + } text ("Datetime camus run to check (must be present in history folder)") + + opt[Int]('n', "mostRecentRunsToCheck") optional() valueName ("<num>") action { (x, p) => + p.copy(mostRecentRunsToCheck = x) + } validate { x => if (x > 0) success else failure("mostRecentRunsToCheck must be greater than 0") + } text ("Number of most recent camus runs to check (default to 1, overwritten by datetimeToCheck if set).") opt[String]("hadoop-core-site-file") optional() valueName ("<path>") action { (x, p) => p.copy(hadoopCoreSitePath = x) @@ -186,35 +193,42 @@ log.info("Loading camus properties file.") props.load(new FileInputStream(params.camusPropertiesFilePath)) - val camusPathToCheck: Path = { - val history_folder = props.getProperty(CamusJob.ETL_EXECUTION_HISTORY_PATH) - if (params.datetimeToCheck.isEmpty) { - log.info("Getting camus most recent run from history folder.") - camusReader.mostRecentRun(new Path(history_folder)) - } else { + val camusPathsToCheck: Seq[Path] = { + + if (params.datetimeToCheck.isDefined) { val p = new Path(props.getProperty(CamusJob.ETL_EXECUTION_HISTORY_PATH) + "/" + params.datetimeToCheck.get) if (fs.isDirectory(p)) { log.info("Set job to given datetime to check.") - p + Seq(p) } else { - log.error("The given datetime to check is not a folder in camus history.") - null + throw new IllegalArgumentException("The given datetime to check is not a folder in camus history.") } + } else { + log.info(s"Getting ${params.mostRecentRunsToCheck} camus most recent runs from history folder.") + val history_folder = props.getProperty(CamusJob.ETL_EXECUTION_HISTORY_PATH) + camusReader.mostRecentRuns(new Path(history_folder), params.mostRecentRunsToCheck) } } - if (null == camusPathToCheck) - System.exit(1) - - log.info("Checking job correctness and computing partitions to flag as imported.") - val topicsAndHours = getTopicsAndHoursToFlag(camusPathToCheck) - - log.info("Job is correct, flag imported partitions.") - flagFullyImportedPartitions(params.flag, params.dryRun, topicsAndHours) - - log.info("Done.") + log.info(s"Working ${camusPathsToCheck.size} camus history folders.") + val (sucesses, errors) = camusPathsToCheck.foldLeft((0, 0))((successes_errors: (Int, Int), p: Path) => { + log.info(s"Checking ${p.toString}") + try { + val topicsAndHours = getTopicsAndHoursToFlag(p) + log.info(s"Flagging imported partitions for ${p.toString}") + flagFullyImportedPartitions(params.flag, params.dryRun, topicsAndHours) + log.info(s"Done ${p.toString}.") + (successes_errors._1 + 1, successes_errors._2) + } catch { + case e: IllegalStateException => { + log.error(s"An error occured while processing ${p}.", e) + (successes_errors._1, successes_errors._2 + 1) + } + } + }) + log.info(s"Done - ${sucesses} correct folders and ${errors} folders generating an error.") } catch { case e: Exception => { - log.error("An error occured during execution.", e) + log.error("A fatal error occurred during execution.", e) sys.exit(1) } } -- To view, visit https://gerrit.wikimedia.org/r/272977 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ia317557fed310cb04050924660547fdd34b5119a Gerrit-PatchSet: 1 Gerrit-Project: analytics/refinery/source Gerrit-Branch: master Gerrit-Owner: Joal <j...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits