Ottomata has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/405800 )
Change subject: [WIP] Add configurable transform function to JSONRefine ...................................................................... [WIP] Add configurable transform function to JSONRefine Bug: T185237 Change-Id: If1272f7d354e94a0a140f71a9135389131c8a1eb --- A refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/ReflectUtils.scala M refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkJsonToHive.scala A refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestReflectUtils.scala M refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/JsonRefine.scala 4 files changed, 145 insertions(+), 36 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source refs/changes/00/405800/1 diff --git a/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/ReflectUtils.scala b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/ReflectUtils.scala new file mode 100644 index 0000000..7cfe209 --- /dev/null +++ b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/ReflectUtils.scala @@ -0,0 +1,32 @@ +package org.wikimedia.analytics.refinery.core + +import scala.reflect.runtime.universe + +object ReflectUtils { + + /** + * Given a fully qualified String package.ObjectName and String method name, this + * Function will return a scala.reflect.runtime.universe.MethodMirror that can be + * used for calling the method on the object. Note that MethodMirror is not a direct + * reference to the actual method, and as such does not have compile time type + * and signature checking. You must ensure that you call the method with exactly the + * same arguments and types that the method expects, or you will get a runtime exception. + * + * @param moduleName Fully qualified name for an object, e.g. org.wikimedia.analytics.refinery.core.DeduplicateEventLogging + * @param methodName Name of method in the object. Default "apply". + * @return + */ + def getStaticMethodMirror(moduleName: String, methodName: String = "apply"): universe.MethodMirror = { + val mirror = universe.runtimeMirror(getClass.getClassLoader) + val module = mirror.staticModule(moduleName) + val method = module.typeSignature.member(universe.newTermName(methodName)).asMethod + val methodMirror = mirror.reflect(mirror.reflectModule(module).instance).reflectMethod(method) + if (!methodMirror.symbol.isMethod || !methodMirror.symbol.isStatic) { + throw new RuntimeException( + s"Cannot get static method for $moduleName.$methodName, it is not a static method" + ) + } + methodMirror + } + +} diff --git a/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkJsonToHive.scala b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkJsonToHive.scala index 8369104..dfb415c 100644 --- a/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkJsonToHive.scala +++ b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkJsonToHive.scala @@ -4,9 +4,6 @@ import scala.util.control.Exception.{allCatch, ignoring} -import org.apache.hadoop.fs.Path - - import org.apache.hadoop.hive.metastore.api.AlreadyExistsException import org.apache.spark.sql.SQLContext @@ -19,7 +16,6 @@ // This allows us use these types with an extendend API // that includes schema merging and Hive DDL statement generation. import SparkSQLHiveExtensions._ - /** * Converts arbitrary JSON to Hive Parquet by 'evolving' the Hive table to @@ -66,29 +62,37 @@ * Reads inputPath as JSON data, creates or alters tableName in Hive to match the inferred * schema of the input JSON data, and then inserts the data into the table. * - * @param hiveContext Spark HiveContext + * @param hiveContext Spark HiveContext * - * @param inputPath Path to JSON data + * @param inputPath Path to JSON data * * - * @param partition HivePartition. This helper class contains - * database and table name, as well as external location - * and partition keys and values. + * @param partition HivePartition. This helper class contains + * database and table name, as well as external location + * and partition keys and values. * - * @param isSequenceFile If true, inputPath is expected to contain JSON in - * Hadoop Sequence Files, else JSON in text files. + * @param isSequenceFile If true, inputPath is expected to contain JSON in + * Hadoop Sequence Files, else JSON in text files. * - * @param doneCallback Function to call after a successful run + * @param doneCallback Function to call after a successful run * - * @return The number of records refined + * @param transformFunction Function to do any last minute DataFrame transformations. + * DO NOT return a DataFrame with a different schema. + * You should use this function to do things like + * redacting or de-duplicatin records in your dataframe + * bu you should not change the schema. + * + * @return The number of records inserted into the HivePartition. */ def apply( hiveContext: HiveContext, inputPath: String, partition: HivePartition, isSequenceFile: Boolean, - doneCallback: () => Unit + doneCallback: () => Unit, + transformFunction: Option[(DataFrame, HivePartition) => DataFrame] = None ): Long = { + // Set this so we can partition by fields in the DataFrame. hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict") @@ -164,18 +168,25 @@ // but, that's ok. We're only inserting into Hive now, and Spark HiveContext will do the // lower casing for us. + // If defined, apply the transformFunction to do any final modifications to the + // output DataFrame (de-duplication, anonymizing, etc.) + val outputDf: DataFrame = transformFunction match { + case None => mergedSchemaDf + case Some(fn) => fn(mergedSchemaDf, partition) + case _ => throw new RuntimeException(s"Cannot call transformFunction ${transformFunction}") + } log.info( s"""Inserting into `${partition.tableName}` DataFrame with schema:\n - |${mergedSchemaDf.schema.treeString} for partition $partition""".stripMargin + |${outputDf.schema.treeString} for partition $partition""".stripMargin ) + // Insert data into Hive table. // TODO parameterize "overwrite" to allow "append"? - mergedSchemaDf.write.mode("overwrite") + outputDf.write.mode("overwrite") .partitionBy(partitionNames:_*) .insertInto(partition.tableName) - // Return the String path of the external partition we just inserted into. val partitionPath = hivePartitionPath(hiveContext, partition) log.info( s"Finished inserting into `${partition.tableName}` DataFrame, " + diff --git a/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestReflectUtils.scala b/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestReflectUtils.scala new file mode 100644 index 0000000..f34bd9d --- /dev/null +++ b/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestReflectUtils.scala @@ -0,0 +1,37 @@ +package org.wikimedia.analytics.refinery.core + +import org.scalatest.{FlatSpec, Matchers} + +object TestObject { + def apply(s: String): String = { + "apply " + s + } + + def otherMethod(i: Int): Int = { + i + 10 + } +} + +class TestReflectUtils extends FlatSpec with Matchers { + + it should "Lookup object by name with apply method name" in { + val methodMirror = ReflectUtils.getStaticMethodMirror( + "org.wikimedia.analytics.refinery.core.TestObject" + ) + methodMirror.symbol.fullName should equal("org.wikimedia.analytics.refinery.core.TestObject.apply") + } + + it should "Lookup object by name with any method name" in { + val methodMirror = ReflectUtils.getStaticMethodMirror( + "org.wikimedia.analytics.refinery.core.TestObject", "otherMethod" + ) + methodMirror.symbol.fullName should equal("org.wikimedia.analytics.refinery.core.TestObject.otherMethod") + } + + it should "MethodMirror returned should be callable" in { + val methodMirror = ReflectUtils.getStaticMethodMirror( + "org.wikimedia.analytics.refinery.core.TestObject" + ) + methodMirror("TEST") should equal ("apply TEST") + } +} \ No newline at end of file diff --git a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/JsonRefine.scala b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/JsonRefine.scala index 32d7957..f82bfdf 100644 --- a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/JsonRefine.scala +++ b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/JsonRefine.scala @@ -7,30 +7,30 @@ import scala.util.matching.Regex import scala.collection.parallel.ForkJoinTaskSupport import scala.concurrent.forkjoin.ForkJoinPool - import org.joda.time.Hours import org.joda.time.format.DateTimeFormatter import com.github.nscala_time.time.Imports._ - import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.DataFrame +import org.wikimedia.analytics.refinery.core.{HivePartition, ReflectUtils, SparkJsonToHive, Utilities} -import org.wikimedia.analytics.refinery.core.{HivePartition, SparkJsonToHive, Utilities} // TODO: ERROR Hive: Table otto not found: default.otto table not found ??? // TODO: support append vs overwrite? // TODO: Hive Table Locking? + /** * Looks for hourly input partition directories with JSON data that need refinement, * and refines them into Hive Parquet tables using SparkJsonToHive. */ object JsonRefine { - private val log = LogManager.getLogger("JsonRefine") private val iso8601DateFormatter = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss") + /** * Config class for CLI argument parser using scopt */ @@ -45,8 +45,9 @@ inputPathDateTimeFormat: DateTimeFormatter = DateTimeFormat.forPattern("'hourly'/yyyy/MM/dd/HH"), tableWhitelistRegex: Option[Regex] = None, tableBlacklistRegex: Option[Regex] = None, + transformFunction: String = "", doneFlag: String = "_REFINED", - failureFlag: String = "_REFINE_FAILED", +// failureFlag: String = "_REFINE_FAILED", shouldIgnoreFailureFlag: Boolean = false, parallelism: Option[Int] = None, compressionCodec: String = "snappy", @@ -83,7 +84,6 @@ DateTime.parse(s, iso8601DateFormatter) } } - /** @@ -190,6 +190,10 @@ p.copy(tableBlacklistRegex = x) } text "Blacklist regex of table names to skip.\n".stripMargin + opt[String]('c', "transform-function") optional() valueName "<fn>" action { (x, p) => + p.copy(transformFunction = x) + } text "Blacklist regex of table names to skip.\n".stripMargin + opt[String]('D', "done-flag") optional() valueName "<filename>" action { (x, p) => p.copy(doneFlag = x) } text @@ -199,15 +203,15 @@ |data has changed meaning the partition needs to be re-refined. |Default: _REFINED""".stripMargin.replace("\n", "\n\t") + "\n" - opt[String]('X', "failure-flag") optional() valueName "<filename>" action { (x, p) => - p.copy(failureFlag = x) - } text - """When a partition fails refinement, this file will be created in the - |output partition path with the binary timestamp of the input source partition's - |modification timestamp. Any partition with this flag will be excluded - |from refinement if the input data's modtime hasn't changed. If the - |modtime has changed, this will re-attempt refinement anyway. - |Default: _REFINE_FAILED""".stripMargin.replace("\n", "\n\t") + "\n" +// opt[String]('X', "failure-flag") optional() valueName "<filename>" action { (x, p) => +// p.copy(failureFlag = x) +// } text +// """When a partition fails refinement, this file will be created in the +// |output partition path with the binary timestamp of the input source partition's +// |modification timestamp. Any partition with this flag will be excluded +// |from refinement if the input data's modtime hasn't changed. If the +// |modtime has changed, this will re-attempt refinement anyway. +// |Default: _REFINE_FAILED""".stripMargin.replace("\n", "\n\t") + "\n" opt[Unit]('I', "ignore-failure-flag") optional() action { (_, p) => p.copy(shouldIgnoreFailureFlag = true) @@ -289,6 +293,7 @@ } + /** * Given params, refine all discovered JsonTargets. * @@ -321,6 +326,28 @@ params.inputPathPatternCaptureGroups: _* ) + log.setLevel(Level.DEBUG) + + // If we are given a transformFunction name, it should be a fully + // qualified package.objectName to an object with an apply method that + // takes a DataFrame and HiveParititon, and returns a DataFrame + val transformFunction = params.transformFunction match { + case "" => None + case objectName => { + val transformMirror = ReflectUtils.getStaticMethodMirror(objectName) + // Lookup the object's apply method as a reflect MethodMirror, and wrap + // it in a anonymous function that has the signature expected by + // SparkJsonToHive's transformFunction parameter. + val wrapperFn: (DataFrame, HivePartition) => DataFrame = { + case(df, hp) => { + log.info(s"Applying ${transformMirror.receiver} to $hp") + transformMirror(df, hp).asInstanceOf[DataFrame] + } + } + // SparkJsonToHive takes transformFunction as an option. + Some(wrapperFn) + } + } log.info( s"Looking for JSON targets to refine in ${params.inputBasePath} between " + @@ -335,7 +362,7 @@ new Path(params.outputBasePath), params.databaseName, params.doneFlag, - params.failureFlag, + "_REFINED_FAILED", //params.failureFlag, params.inputPathDateTimeFormat, inputPathRegex, params.sinceDateTime @@ -395,7 +422,7 @@ // next one to use the created table, or ALTER it if necessary. We don't // want multiple CREATEs for the same table to happen in parallel. if (!params.dryRun) - table -> refineJsonTargets(hiveContext, tableTargets.seq) + table -> refineJsonTargets(hiveContext, tableTargets.seq, transformFunction) // If --dry-run was given, don't refine, just map to Successes. else table -> tableTargets.seq.map(Success(_)) @@ -524,7 +551,8 @@ */ def refineJsonTargets( hiveContext: HiveContext, - targets: Seq[JsonTarget] + targets: Seq[JsonTarget], + transformFunction: Option[(DataFrame, HivePartition) => DataFrame] ): Seq[Try[JsonTarget]] = { targets.map(target => { log.info(s"Beginning refinement of $target...") @@ -535,7 +563,8 @@ target.inputPath.toString, target.partition, target.inputIsSequenceFile, - () => target.writeDoneFlag() + () => target.writeDoneFlag(), + transformFunction ) log.info( -- To view, visit https://gerrit.wikimedia.org/r/405800 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: If1272f7d354e94a0a140f71a9135389131c8a1eb Gerrit-PatchSet: 1 Gerrit-Project: analytics/refinery/source Gerrit-Branch: master Gerrit-Owner: Ottomata <ao...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits