Ottomata has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/405800 )

Change subject: [WIP] Add configurable transform function to JSONRefine
......................................................................

[WIP] Add configurable transform function to JSONRefine

Bug: T185237
Change-Id: If1272f7d354e94a0a140f71a9135389131c8a1eb
---
A 
refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/ReflectUtils.scala
M 
refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkJsonToHive.scala
A 
refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestReflectUtils.scala
M 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/JsonRefine.scala
4 files changed, 145 insertions(+), 36 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source 
refs/changes/00/405800/1

diff --git 
a/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/ReflectUtils.scala
 
b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/ReflectUtils.scala
new file mode 100644
index 0000000..7cfe209
--- /dev/null
+++ 
b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/ReflectUtils.scala
@@ -0,0 +1,32 @@
+package org.wikimedia.analytics.refinery.core
+
+import scala.reflect.runtime.universe
+
+object ReflectUtils {
+
+    /**
+      * Given a fully qualified String package.ObjectName and String method 
name, this
+      * Function will return a scala.reflect.runtime.universe.MethodMirror 
that can be
+      * used for calling the method on the object.  Note that MethodMirror is 
not a direct
+      * reference to the actual method, and as such does not have compile time 
type
+      * and signature checking.  You must ensure that you call the method with 
exactly the
+      * same arguments and types that the method expects, or you will get a 
runtime exception.
+      *
+      * @param moduleName Fully qualified name for an object, e.g. 
org.wikimedia.analytics.refinery.core.DeduplicateEventLogging
+      * @param methodName Name of method in the object.  Default "apply".
+      * @return
+      */
+    def getStaticMethodMirror(moduleName: String, methodName: String = 
"apply"): universe.MethodMirror = {
+        val mirror = universe.runtimeMirror(getClass.getClassLoader)
+        val module = mirror.staticModule(moduleName)
+        val method = 
module.typeSignature.member(universe.newTermName(methodName)).asMethod
+        val methodMirror = 
mirror.reflect(mirror.reflectModule(module).instance).reflectMethod(method)
+        if (!methodMirror.symbol.isMethod || !methodMirror.symbol.isStatic) {
+            throw new RuntimeException(
+                s"Cannot get static method for $moduleName.$methodName, it is 
not a static method"
+            )
+        }
+        methodMirror
+    }
+
+}
diff --git 
a/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkJsonToHive.scala
 
b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkJsonToHive.scala
index 8369104..dfb415c 100644
--- 
a/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkJsonToHive.scala
+++ 
b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/SparkJsonToHive.scala
@@ -4,9 +4,6 @@
 
 import scala.util.control.Exception.{allCatch, ignoring}
 
-import org.apache.hadoop.fs.Path
-
-
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException
 
 import org.apache.spark.sql.SQLContext
@@ -19,7 +16,6 @@
 // This allows us use these types with an extendend API
 // that includes schema merging and Hive DDL statement generation.
 import SparkSQLHiveExtensions._
-
 
 /**
   * Converts arbitrary JSON to Hive Parquet by 'evolving' the Hive table to
@@ -66,29 +62,37 @@
       * Reads inputPath as JSON data, creates or alters tableName in Hive to 
match the inferred
       * schema of the input JSON data, and then inserts the data into the 
table.
       *
-      * @param hiveContext      Spark HiveContext
+      * @param hiveContext       Spark HiveContext
       *
-      * @param inputPath        Path to JSON data
+      * @param inputPath         Path to JSON data
       *
       *
-      * @param partition        HivePartition.  This helper class contains
-      *                         database and table name, as well as external 
location
-      *                         and partition keys and values.
+      * @param partition         HivePartition.  This helper class contains
+      *                          database and table name, as well as external 
location
+      *                          and partition keys and values.
       *
-      * @param isSequenceFile   If true, inputPath is expected to contain JSON 
in
-      *                         Hadoop Sequence Files, else JSON in text files.
+      * @param isSequenceFile    If true, inputPath is expected to contain 
JSON in
+      *                          Hadoop Sequence Files, else JSON in text 
files.
       *
-      * @param doneCallback     Function to call after a successful run
+      * @param doneCallback      Function to call after a successful run
       *
-      * @return                 The number of records refined
+      * @param transformFunction Function to do any last minute DataFrame 
transformations.
+      *                          DO NOT return a DataFrame with a different 
schema.
+      *                          You should use this function to do things like
+      *                          redacting or de-duplicatin records in your 
dataframe
+      *                          bu you should not change the schema.
+      *
+      * @return                  The number of records inserted into the 
HivePartition.
       */
     def apply(
         hiveContext: HiveContext,
         inputPath: String,
         partition: HivePartition,
         isSequenceFile: Boolean,
-        doneCallback: () => Unit
+        doneCallback: () => Unit,
+        transformFunction: Option[(DataFrame, HivePartition) => DataFrame] = 
None
     ): Long = {
+
         // Set this so we can partition by fields in the DataFrame.
         hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
 
@@ -164,18 +168,25 @@
         // but, that's ok.  We're only inserting into Hive now, and Spark 
HiveContext will do the
         // lower casing for us.
 
+        // If defined, apply the transformFunction to do any final 
modifications to the
+        // output DataFrame (de-duplication, anonymizing, etc.)
+        val outputDf: DataFrame = transformFunction match {
+            case None =>  mergedSchemaDf
+            case Some(fn) => fn(mergedSchemaDf, partition)
+            case _ => throw new RuntimeException(s"Cannot call 
transformFunction ${transformFunction}")
+        }
         log.info(
             s"""Inserting into `${partition.tableName}` DataFrame with 
schema:\n
-               |${mergedSchemaDf.schema.treeString} for partition 
$partition""".stripMargin
+               |${outputDf.schema.treeString} for partition 
$partition""".stripMargin
         )
+
         // Insert data into Hive table.
         // TODO parameterize "overwrite" to allow "append"?
-        mergedSchemaDf.write.mode("overwrite")
+        outputDf.write.mode("overwrite")
             .partitionBy(partitionNames:_*)
             .insertInto(partition.tableName)
 
 
-        // Return the String path of the external partition we just inserted 
into.
         val partitionPath = hivePartitionPath(hiveContext, partition)
         log.info(
             s"Finished inserting into `${partition.tableName}` DataFrame, " +
diff --git 
a/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestReflectUtils.scala
 
b/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestReflectUtils.scala
new file mode 100644
index 0000000..f34bd9d
--- /dev/null
+++ 
b/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestReflectUtils.scala
@@ -0,0 +1,37 @@
+package org.wikimedia.analytics.refinery.core
+
+import org.scalatest.{FlatSpec, Matchers}
+
+object TestObject {
+    def apply(s: String): String = {
+        "apply " + s
+    }
+
+    def otherMethod(i: Int): Int = {
+        i + 10
+    }
+}
+
+class TestReflectUtils extends FlatSpec with Matchers {
+
+    it should "Lookup object by name with apply method name" in {
+        val methodMirror = ReflectUtils.getStaticMethodMirror(
+            "org.wikimedia.analytics.refinery.core.TestObject"
+        )
+        methodMirror.symbol.fullName should 
equal("org.wikimedia.analytics.refinery.core.TestObject.apply")
+    }
+
+    it should "Lookup object by name with any method name" in {
+        val methodMirror = ReflectUtils.getStaticMethodMirror(
+            "org.wikimedia.analytics.refinery.core.TestObject", "otherMethod"
+        )
+        methodMirror.symbol.fullName should 
equal("org.wikimedia.analytics.refinery.core.TestObject.otherMethod")
+    }
+
+    it should "MethodMirror returned should be callable" in {
+        val methodMirror = ReflectUtils.getStaticMethodMirror(
+            "org.wikimedia.analytics.refinery.core.TestObject"
+        )
+        methodMirror("TEST") should equal ("apply TEST")
+    }
+}
\ No newline at end of file
diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/JsonRefine.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/JsonRefine.scala
index 32d7957..f82bfdf 100644
--- 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/JsonRefine.scala
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/JsonRefine.scala
@@ -7,30 +7,30 @@
 import scala.util.matching.Regex
 import scala.collection.parallel.ForkJoinTaskSupport
 import scala.concurrent.forkjoin.ForkJoinPool
-
 import org.joda.time.Hours
 import org.joda.time.format.DateTimeFormatter
 import com.github.nscala_time.time.Imports._
-
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.DataFrame
+import org.wikimedia.analytics.refinery.core.{HivePartition, ReflectUtils, 
SparkJsonToHive, Utilities}
 
-import org.wikimedia.analytics.refinery.core.{HivePartition, SparkJsonToHive, 
Utilities}
 
 
 // TODO: ERROR Hive: Table otto not found: default.otto table not found ???
 // TODO: support append vs overwrite?
 // TODO: Hive Table Locking?
 
+
 /**
   * Looks for hourly input partition directories with JSON data that need 
refinement,
   * and refines them into Hive Parquet tables using SparkJsonToHive.
   */
 object JsonRefine {
-
     private val log = LogManager.getLogger("JsonRefine")
     private val iso8601DateFormatter = 
DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss")
+
     /**
       * Config class for CLI argument parser using scopt
       */
@@ -45,8 +45,9 @@
         inputPathDateTimeFormat: DateTimeFormatter = 
DateTimeFormat.forPattern("'hourly'/yyyy/MM/dd/HH"),
         tableWhitelistRegex: Option[Regex]         = None,
         tableBlacklistRegex: Option[Regex]         = None,
+        transformFunction: String                  = "",
         doneFlag: String                           = "_REFINED",
-        failureFlag: String                        = "_REFINE_FAILED",
+//        failureFlag: String                        = "_REFINE_FAILED",
         shouldIgnoreFailureFlag: Boolean           = false,
         parallelism: Option[Int]                   = None,
         compressionCodec: String                   = "snappy",
@@ -83,7 +84,6 @@
                 DateTime.parse(s, iso8601DateFormatter)
         }
     }
-
 
 
     /**
@@ -190,6 +190,10 @@
             p.copy(tableBlacklistRegex = x)
         } text "Blacklist regex of table names to skip.\n".stripMargin
 
+        opt[String]('c', "transform-function") optional() valueName "<fn>" 
action { (x, p) =>
+            p.copy(transformFunction = x)
+        } text "Blacklist regex of table names to skip.\n".stripMargin
+
         opt[String]('D', "done-flag") optional() valueName "<filename>" action 
{ (x, p) =>
             p.copy(doneFlag = x)
         } text
@@ -199,15 +203,15 @@
               |data has changed meaning the partition needs to be re-refined.
               |Default: _REFINED""".stripMargin.replace("\n", "\n\t") + "\n"
 
-        opt[String]('X', "failure-flag") optional() valueName "<filename>" 
action { (x, p) =>
-            p.copy(failureFlag = x)
-        } text
-            """When a partition fails refinement, this file will be created in 
the
-              |output partition path with the binary timestamp of the input 
source partition's
-              |modification timestamp.  Any partition with this flag will be 
excluded
-              |from refinement if the input data's modtime hasn't changed.  If 
the
-              |modtime has changed, this will re-attempt refinement anyway.
-              |Default: _REFINE_FAILED""".stripMargin.replace("\n", "\n\t") + 
"\n"
+//        opt[String]('X', "failure-flag") optional() valueName "<filename>" 
action { (x, p) =>
+//            p.copy(failureFlag = x)
+//        } text
+//            """When a partition fails refinement, this file will be created 
in the
+//              |output partition path with the binary timestamp of the input 
source partition's
+//              |modification timestamp.  Any partition with this flag will be 
excluded
+//              |from refinement if the input data's modtime hasn't changed.  
If the
+//              |modtime has changed, this will re-attempt refinement anyway.
+//              |Default: _REFINE_FAILED""".stripMargin.replace("\n", "\n\t") 
+ "\n"
 
         opt[Unit]('I', "ignore-failure-flag") optional() action { (_, p) =>
             p.copy(shouldIgnoreFailureFlag = true)
@@ -289,6 +293,7 @@
     }
 
 
+
     /**
       * Given params, refine all discovered JsonTargets.
       *
@@ -321,6 +326,28 @@
             params.inputPathPatternCaptureGroups: _*
         )
 
+        log.setLevel(Level.DEBUG)
+
+        // If we are given a transformFunction name, it should be a fully
+        // qualified package.objectName to an object with an apply method that
+        // takes a DataFrame and HiveParititon, and returns a DataFrame
+        val transformFunction = params.transformFunction match {
+            case "" => None
+            case objectName => {
+                val transformMirror = 
ReflectUtils.getStaticMethodMirror(objectName)
+                // Lookup the object's apply method as a reflect MethodMirror, 
and wrap
+                // it in a anonymous function that has the signature expected 
by
+                // SparkJsonToHive's transformFunction parameter.
+                val wrapperFn: (DataFrame, HivePartition) => DataFrame = {
+                    case(df, hp) => {
+                        log.info(s"Applying ${transformMirror.receiver} to 
$hp")
+                        transformMirror(df, hp).asInstanceOf[DataFrame]
+                    }
+                }
+                // SparkJsonToHive takes transformFunction as an option.
+                Some(wrapperFn)
+            }
+        }
 
         log.info(
             s"Looking for JSON targets to refine in ${params.inputBasePath} 
between " +
@@ -335,7 +362,7 @@
             new Path(params.outputBasePath),
             params.databaseName,
             params.doneFlag,
-            params.failureFlag,
+            "_REFINED_FAILED", //params.failureFlag,
             params.inputPathDateTimeFormat,
             inputPathRegex,
             params.sinceDateTime
@@ -395,7 +422,7 @@
             // next one to use the created table, or ALTER it if necessary.  
We don't
             // want multiple CREATEs for the same table to happen in parallel.
             if (!params.dryRun)
-                table -> refineJsonTargets(hiveContext, tableTargets.seq)
+                table -> refineJsonTargets(hiveContext, tableTargets.seq, 
transformFunction)
             // If --dry-run was given, don't refine, just map to Successes.
             else
                 table -> tableTargets.seq.map(Success(_))
@@ -524,7 +551,8 @@
       */
     def refineJsonTargets(
         hiveContext: HiveContext,
-        targets: Seq[JsonTarget]
+        targets: Seq[JsonTarget],
+        transformFunction: Option[(DataFrame, HivePartition) => DataFrame]
     ): Seq[Try[JsonTarget]] = {
         targets.map(target => {
             log.info(s"Beginning refinement of $target...")
@@ -535,7 +563,8 @@
                     target.inputPath.toString,
                     target.partition,
                     target.inputIsSequenceFile,
-                    () => target.writeDoneFlag()
+                    () => target.writeDoneFlag(),
+                    transformFunction
                 )
 
                 log.info(

-- 
To view, visit https://gerrit.wikimedia.org/r/405800
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: If1272f7d354e94a0a140f71a9135389131c8a1eb
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Ottomata <ao...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to