jenkins-bot has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/386882 )

Change subject: Add core class and job to import EL hive tables to Druid
......................................................................


Add core class and job to import EL hive tables to Druid

Created resources directory to store master-ingestion-spec

Bug: T166414
Change-Id: I30c374c3dcba44bbd0608e72ae0162bcc442cd0f
---
M pom.xml
M refinery-core/pom.xml
A refinery-core/src/main/resources/ingestion_spec_template.json
A 
refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/DataFrameToDruid.scala
A 
refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestDataFrameToDruid.scala
M refinery-job/pom.xml
A 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/EventLoggingToDruid.scala
A 
refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestEventLoggingToDruid.scala
8 files changed, 1,083 insertions(+), 14 deletions(-)

Approvals:
  jenkins-bot: Verified
  Nuria: Looks good to me, approved



diff --git a/pom.xml b/pom.xml
index dcaa0fd..68ce8da 100644
--- a/pom.xml
+++ b/pom.xml
@@ -190,7 +190,21 @@
         <scope>test</scope>
       </dependency>
 
-      <dependency>
+
+    <!-- 
+             adding explicit dep for snappy, otherwise
+             spark assumes is on the java.library.path
+             see: 
https://github.com/rvs/snappy-java/blob/master/src/main/resources/org/xerial/snappy/SnappyNativeLoader.java#L47
+    -->
+    <!-- https://mvnrepository.com/artifact/org.xerial.snappy/snappy-java -->
+    <dependency>
+        <groupId>org.xerial.snappy</groupId>
+        <artifactId>snappy-java</artifactId>
+        <version>1.1.2.5</version>
+        <scope>test</scope>
+    </dependency>
+        
+    <dependency>
         <groupId>com.github.nscala-time</groupId>
         <artifactId>nscala-time_2.10</artifactId>
         <version>2.0.0</version>
diff --git a/refinery-core/pom.xml b/refinery-core/pom.xml
index 8ace48a..5e6d6f7 100644
--- a/refinery-core/pom.xml
+++ b/refinery-core/pom.xml
@@ -76,6 +76,13 @@
         </dependency>
 
         <dependency>
+            <groupId>org.scalamock</groupId>
+            <artifactId>scalamock-scalatest-support_2.10</artifactId>
+            <version>3.2.2</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.scalatest</groupId>
             <artifactId>scalatest_2.10</artifactId>
             <scope>test</scope>
@@ -119,7 +126,32 @@
             <version>1.4.7</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <!-- javax.servlet in hadoop-common is older than the one 
in spark -->
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
 
+        <dependency>
+            <groupId>com.holdenkarau</groupId>
+            <artifactId>spark-testing-base_2.10</artifactId>
+            <version>1.6.0_0.4.7</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- 
https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>4.5.3</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/refinery-core/src/main/resources/ingestion_spec_template.json 
b/refinery-core/src/main/resources/ingestion_spec_template.json
new file mode 100644
index 0000000..f814559
--- /dev/null
+++ b/refinery-core/src/main/resources/ingestion_spec_template.json
@@ -0,0 +1,50 @@
+{
+    "type" : "index_hadoop",
+    "spec" : {
+        "ioConfig" : {
+            "type" : "hadoop",
+            "inputSpec" : {
+                "type" : "static",
+                "paths" : "{{INPUT_PATH}}"
+            }
+        },
+        "dataSchema" : {
+            "dataSource" : "{{DATA_SOURCE}}",
+            "granularitySpec" : {
+                "type" : "uniform",
+                "segmentGranularity" : "{{SEGMENT_GRANULARITY}}",
+                "queryGranularity" : "{{QUERY_GRANULARITY}}",
+                "intervals" : {{INTERVALS_ARRAY}}
+            },
+            "parser" : {
+                "type" : "string",
+                "parseSpec" : {
+                    "format" : "json",
+                    "dimensionsSpec" : {
+                        "dimensions" : {{DIMENSIONS}}
+                    },
+                    "timestampSpec" : {
+                        "format" : "{{TIMESTAMP_FORMAT}}",
+                        "column" : "{{TIMESTAMP_COLUMN}}"
+                    }
+                }
+            },
+            "metricsSpec" : {{METRICS}}
+        },
+        "tuningConfig" : {
+            "type" : "hadoop",
+            "overwriteFiles": true,
+            "ignoreInvalidRows" : false,
+            "partitionsSpec" : {
+                "type" : "hashed",
+                "numShards" : {{NUM_SHARDS}}
+            },
+            "jobProperties" : {
+                "mapreduce.reduce.memory.mb" : "{{REDUCE_MEMORY}}",
+                "mapreduce.output.fileoutputformat.compress":
+                    "org.apache.hadoop.io.compress.GzipCodec",
+                "mapreduce.job.queuename": "{{HADOOP_QUEUE}}"
+            }
+        }
+    }
+}
diff --git 
a/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/DataFrameToDruid.scala
 
b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/DataFrameToDruid.scala
new file mode 100644
index 0000000..09339cc
--- /dev/null
+++ 
b/refinery-core/src/main/scala/org/wikimedia/analytics/refinery/core/DataFrameToDruid.scala
@@ -0,0 +1,354 @@
+package org.wikimedia.analytics.refinery.core
+
+import java.io.InputStream
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.apache.http.client.HttpClient
+import org.apache.http.client.methods.HttpGet
+import org.apache.http.client.methods.HttpPost
+import org.apache.http.entity.StringEntity
+import org.apache.http.impl.client.DefaultHttpClient
+import org.apache.http.impl.client.LaxRedirectStrategy
+import org.apache.log4j.LogManager
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.functions.lit
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.types.{IntegerType, LongType, FloatType, 
DoubleType}
+import org.joda.time.DateTime
+import scala.util.parsing.json.JSON
+import scala.util.Random
+import scala.io.Source;
+
+/**
+ * Ingestion status enumeration object
+ */
+object IngestionStatus extends Enumeration {
+    val Initial, Loading, Done, Error = Value
+}
+
+/**
+ * DataFrameToDruid companion object
+ */
+object DataFrameToDruid {
+    // Constant: URL path to launch Druid ingestion.
+    val LaunchTaskPath = "/druid/indexer/v1/task"
+
+    // Constant: URL path to check task status.
+    val CheckTaskPath = "/druid/indexer/v1/task/{{DRUID_TASK_ID}}/status"
+
+    // Constant: DateTime format for Druid interval specs.
+    val IntervalDateTimeFormat = "yyyy-MM-dd'T'HH:mm'Z'"
+
+    // Constant: Name of the additional metric field that counts events.
+    val EventCountMetricName = "eventCount"
+
+    // Constant: Template to build Druid ingestion specs.
+    val stream: InputStream = 
getClass().getResourceAsStream("/ingestion_spec_template.json");
+
+    val IngestionSpecTemplate: String = 
Source.fromInputStream(stream).getLines.mkString;
+}
+/**
+ * DataFrame to Druid transaction class
+ *
+ * This class serves as a transaction helper to load data into Druid.
+ * The source data should be passed in as a DataFrame instance.
+ * Then an ingestion spec is created from the DataFrame schema.
+ * And finally a request is sent to Druid to trigger data ingestion.
+ * The process can be followed in 3 ways: callback, waiting and polling.
+ *
+ * Constructor example:
+ *     val dftd = new DataFrameToDruid(
+ *         dataSource = "some_dataset",
+ *         inputDf = hiveContext.sql("select..."),
+ *         dimensions = Seq("project", "language", ...),
+ *         metrics = Seq("edits", "views", ...),
+ *         intervals = Seq((new DateTime(2017, 1, 1), new DateTime(2017, 2, 
1))),
+ *         numShards = 4,
+ *         hadoopQueue = "production"
+ *     )
+ *
+ * Callback example:
+ *     dftd.start((status: IngestionStatus.Value) => {
+ *         // DataFrameToDruid will call this function asynchronously when the
+ *         // process has ended, and pass it the resulting status.
+ *     })
+ *
+ * Waiting example:
+ *     dftd.start().await()
+ *     // The code will only proceed when the process is done.
+ *
+ * Polling example:
+ *     dftd.start()
+ *     while (dftd.status() == IngestionStatus.Loading) Thread.sleep(10000)
+ *     // The process is finished at this point.
+ *
+ * Parameters:
+ *     sc  SparkContext.
+ *     dataSource  Name of the target Druid data set (snake_case).
+ *     inputDf  DataFrame containing the data to be loaded. The data must 
already
+ *              be sliced to only contain the desired time intervals. It must 
also
+ *              be flat (no nested fields) and contain only the fields that 
are to
+ *              ingested into Druid.
+ *     dimensions  Sequence of field names that Druid should index as 
dimensions.
+ *     metrics  Sequence of field names that Druid should ingest as metrics.
+ *              Those fields have to be of numerical type.
+ *     intervals  Sequence of pairs (startDateTime, endDateTime) delimiting the
+ *                intervals where the input DataFrame contains data.
+ *     timestampColumn  Name of the field containing the timestamp. This field 
is
+ *                      mandatory for Druid ingestion spec.
+ *     timestampFormat  A string indicating the format of the timestamp field
+ *                      (iso|millis|posix|auto|or any Joda time format).
+ *     segmentGranularity  A string indicating the granularity of Druid's 
segments
+ *                         for the data to be loaded 
(quarter|month|week|day|hour).
+ *     queryGranularity  A string indicating the granularity of Druid's queries
+ *                       for the data to be loaded 
(week|day|hour|minute|second).
+ *     numShards  Number of shards for Druid ingestion [optional].
+ *     reduceMemory  Memory to be used for Druid ingestion (string).
+ *     hadoopQueue  Name of Hadoop queue to launch the ingestion.
+ *     druidHost  String with Druid host.
+ *     druidPort  String with Druid port.
+ *     checkInterval  Integer with the number of milliseconds to wait between 
checks.
+ *     tempFilePathOver  Optional string that overrides path to temporary file.
+ *     httpClientOver  Optional HttpClient instance (only for testing 
purposes).
+ */
+class DataFrameToDruid(
+    sc: SparkContext,
+    dataSource: String,
+    inputDf: DataFrame,
+    dimensions: Seq[String],
+    metrics: Seq[String],
+    intervals: Seq[(DateTime, DateTime)],
+    timestampColumn: String,
+    timestampFormat: String,
+    segmentGranularity: String,
+    queryGranularity: String,
+    numShards: Int,
+    reduceMemory: String,
+    hadoopQueue: String,
+    druidHost: String,
+    druidPort: String,
+    checkInterval: Int = 10000,
+    tempFilePathOver: String = null.asInstanceOf[String],
+    httpClientOver: HttpClient = null.asInstanceOf[HttpClient]
+) {
+    private val log = LogManager.getLogger("DataFrameToDruid")
+
+    // Create a temporary file path for Druid data.
+    private val tempFilePath: String = if (tempFilePathOver != null) 
tempFilePathOver else {
+        val randomId = Random.alphanumeric.take(5).mkString("")
+        val timestamp = DateTime.now.toString("yyyyMMddHHmmss")
+        s"/tmp/DataFrameToDruid/${dataSource}/${timestamp}/${randomId}"
+    }
+
+    // Add the event count to the DataFrame.
+    private val inputDfWithCount = 
inputDf.withColumn(DataFrameToDruid.EventCountMetricName, lit(1L))
+    private val metricsWithCount = metrics :+ 
DataFrameToDruid.EventCountMetricName
+
+    // Initialize Druid ingestion spec.
+    log.info(s"Creating ingestion spec for ${dataSource}.")
+    private var ingestionSpec: String = createIngestionSpec()
+    log.info(ingestionSpec)
+
+    // Create a runnable that will execute the ingestion when launched.
+    private val statusUpdater: Thread = getStatusUpdater()
+
+    // Instance variables: need to be modified after constructor.
+    private var ingestionStatus: IngestionStatus.Value = 
IngestionStatus.Initial
+    private var userCallback: Option[(IngestionStatus.Value) => Unit] = None
+    private var druidTaskId: String = ""
+
+    // Initialize httpClient, instruct it to follow redirects.
+    private val httpClient: HttpClient = if (httpClientOver != null) 
httpClientOver else {
+        val client = new DefaultHttpClient()
+        client.setRedirectStrategy(new LaxRedirectStrategy())
+        client
+    }
+
+    /**
+     * Starts the process of loading the DataFrame to Druid.
+     *
+     * Params:
+     *     callback  Function to be executed once the process is finished 
[optional].
+     *               It should accept a parameter of type 
IngestionStatus.Value,
+     *               which will be passed the final status of the process.
+     * Returns:
+     *     This DataFrameToDruid instance
+     *     (to allow things like `dftd.start().await()`).
+     */
+    def start(
+        callback: Option[(IngestionStatus.Value) => Unit] = None
+    ): DataFrameToDruid = {
+        if (ingestionStatus == IngestionStatus.Initial) {
+            userCallback = callback
+
+            log.info(s"Writing temporary file for ${dataSource}.")
+            inputDfWithCount.write.json(tempFilePath)
+
+            log.info(s"Launching indexation task for ${dataSource}.")
+            druidTaskId = sendIngestionRequest()
+            log.info(s"Indexation task for ${dataSource} launched 
successfully. " +
+                     s"Task ID: ${druidTaskId}")
+            ingestionStatus = IngestionStatus.Loading
+            statusUpdater.start()
+        } else {
+            log.warn("Can not call start more than once. Ignoring.")
+        }
+        this
+    }
+
+    /**
+     * Blocks execution until the loading process has finished.
+     *
+     * This method has the ugly name 'await', because scala classes
+     * automatically define a method wait, which is not overridable.
+     *
+     * Returns:
+     *     This DataFrameToDruid instance
+     *     (to allow things like `dftd.start().await().status()`).
+     */
+    def await(): DataFrameToDruid = {
+        if (ingestionStatus == IngestionStatus.Initial) {
+            log.warn("Can not call await before calling start. Ignoring.")
+        } else {
+            statusUpdater.join()
+        }
+        this
+    }
+
+    /**
+     * Returns the status of the loading process.
+     *
+     * Returns:
+     *     IngestionStatus.Value (Initial|Loading|Done|Error).
+     */
+    def status(): IngestionStatus.Value = {
+        ingestionStatus
+    }
+
+    // Creates the ingestion spec string by filling in the ingestion spec 
template
+    // with the passed parameters. Returns the resulting string.
+    private def createIngestionSpec(): String = {
+        DataFrameToDruid.IngestionSpecTemplate
+            .replace("{{INPUT_PATH}}", tempFilePath)
+            .replace("{{DATA_SOURCE}}", dataSource)
+            .replace("{{SEGMENT_GRANULARITY}}", segmentGranularity)
+            .replace("{{QUERY_GRANULARITY}}", queryGranularity)
+            .replace("{{INTERVALS_ARRAY}}", formatIntervals())
+            .replace("{{DIMENSIONS}}", formatDimensions())
+            .replace("{{TIMESTAMP_FORMAT}}", timestampFormat)
+            .replace("{{TIMESTAMP_COLUMN}}", timestampColumn)
+            .replace("{{METRICS}}", formatMetrics())
+            .replace("{{NUM_SHARDS}}", numShards.toString())
+            .replace("{{REDUCE_MEMORY}}", reduceMemory)
+            .replace("{{HADOOP_QUEUE}}", hadoopQueue)
+    }
+
+    // Formats a sequence of pairs of DateTime objects into Druid intervals.
+    private def formatIntervals(): String = {
+        val formattedIntervals = intervals.map((interval) => {
+            val startStr = 
interval._1.toString(DataFrameToDruid.IntervalDateTimeFormat)
+            val endStr = 
interval._2.toString(DataFrameToDruid.IntervalDateTimeFormat)
+            "\"" + startStr + "/" + endStr + "\""
+        })
+        "[" + formattedIntervals.mkString(", ") + "]"
+    }
+
+    // Formats a sequence of field names into Druid dimensions.
+    private def formatDimensions(): String = {
+        val formattedDimensions = dimensions.map((d) => "\"" + d + "\"")
+        "[" + formattedDimensions.mkString(", ") + "]"
+    }
+
+    // Formats a sequence of field names into Druid metrics.
+    // Only longSum and doubleSum metrics are supported, so metric fields with 
types
+    // other than Integer, Long, Float and Double will raise an error.
+    private def formatMetrics(): String = {
+        val formattedMetrics = metricsWithCount.map((field) => {
+            val fieldType = inputDfWithCount.schema.apply(field).dataType 
match {
+                case IntegerType | LongType => "longSum"
+                case FloatType | DoubleType => "doubleSum"
+            }
+            s"""{\"name\": \"${field}\", \"fieldName\": \"${field}\", 
\"type\": \"${fieldType}\"}"""
+        })
+        "[" + formattedMetrics.mkString(", ") + "]"
+    }
+
+    // Returns a thread that keeps polling Druid to check the status of the
+    // indexation task and updates the ingestionStatus var accordingly.
+    // When the task is finished, executes finalizations.
+    private def getStatusUpdater(): Thread = {
+        new Thread(
+            new Runnable {
+                def run() {
+                    if (druidTaskId == "ERROR") {
+                        ingestionStatus = IngestionStatus.Error
+                    } else {
+                        while (ingestionStatus == IngestionStatus.Loading) {
+                            Thread.sleep(checkInterval)
+                            log.info(s"Checking status of task ${druidTaskId} 
for ${dataSource}.")
+                            ingestionStatus = getDruidTaskStatus() match {
+                                case "RUNNING" => IngestionStatus.Loading
+                                case "SUCCESS" => IngestionStatus.Done
+                                case "FAILED" | "ERROR" => 
IngestionStatus.Error
+                            }
+                        }
+                    }
+                    conclude()
+                }
+            }
+        )
+    }
+
+    // Sends an http post request to Druid to trigger ingestion.
+    // Returns the Druid task id.
+    private def sendIngestionRequest(): String = {
+        val url = 
s"http://${druidHost}:${druidPort}${DataFrameToDruid.LaunchTaskPath}";
+        val post = new HttpPost(url)
+        post.addHeader("Content-type", "application/json")
+        post.setEntity(new StringEntity(ingestionSpec))
+        val response = httpClient.execute(post)
+        val statusCode = response.getStatusLine().getStatusCode()
+        if (statusCode == 200) {
+            val contentStream = response.getEntity().getContent()
+            val responseStr = IOUtils.toString(contentStream)
+            val responseObj = 
JSON.parseFull(responseStr).get.asInstanceOf[Map[String, Any]]
+            responseObj("task").asInstanceOf[String]
+        } else "ERROR"
+    }
+
+    // Sends an http get request to Druid to check the ingestion task.
+    // Returns the resulting task status.
+    private def getDruidTaskStatus(): String = {
+        val path = DataFrameToDruid.CheckTaskPath.replace("{{DRUID_TASK_ID}}", 
druidTaskId)
+        val url = s"http://${druidHost}:${druidPort}${path}";
+        val get = new HttpGet(url)
+        val response = httpClient.execute(get)
+        val statusCode = response.getStatusLine().getStatusCode()
+        if (statusCode == 200) {
+            val contentStream = response.getEntity().getContent()
+            val responseStr = IOUtils.toString(contentStream)
+            val responseObj = 
JSON.parseFull(responseStr).get.asInstanceOf[Map[String, Any]]
+            val statusObj = responseObj("status").asInstanceOf[Map[String, 
Any]]
+            statusObj("status").asInstanceOf[String]
+        } else "ERROR"
+    }
+
+    // Deletes the temporary file and calls user callback.
+    // Note that scala classes have dibs on the method name 'finalize', see: 
await().
+    private def conclude(): Unit = {
+        ingestionStatus match {
+            case IngestionStatus.Done => log.info(
+                s"Druid ingestion task ${druidTaskId} for ${dataSource} 
succeeded.")
+            case IngestionStatus.Error => log.error(
+                s"Druid ingestion task ${druidTaskId} for ${dataSource} 
failed.")
+        }
+        val path = new Path(tempFilePath)
+        val fs = FileSystem.get(sc.hadoopConfiguration)
+        if (fs.exists(path)) fs.delete(path, true)
+        if (userCallback.isDefined) userCallback.get(ingestionStatus)
+    }
+}
diff --git 
a/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestDataFrameToDruid.scala
 
b/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestDataFrameToDruid.scala
new file mode 100644
index 0000000..77cd491
--- /dev/null
+++ 
b/refinery-core/src/test/scala/org/wikimedia/analytics/refinery/core/TestDataFrameToDruid.scala
@@ -0,0 +1,242 @@
+package org.wikimedia.analytics.refinery.core
+
+import com.holdenkarau.spark.testing.SharedSparkContext
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+import org.apache.http.client.HttpClient
+import org.apache.http.client.methods.{HttpGet, HttpPost, HttpUriRequest}
+import org.apache.http.entity.StringEntity
+import org.apache.http.message.BasicHttpResponse
+import org.apache.http.ProtocolVersion
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.functions.lit
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.types.{StructType, StringType, IntegerType, 
LongType, DoubleType}
+import org.joda.time.DateTime
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.{FlatSpec, Matchers, BeforeAndAfterEach}
+import scala.util.parsing.json.JSON
+
+
+class TestDataFrameToDruid extends FlatSpec
+    with Matchers with BeforeAndAfterEach with SharedSparkContext with 
MockFactory {
+
+    val testTempFile = "file:///tmp/TestDataFrameToDruid"
+    var httpClientMock: HttpClient = null.asInstanceOf[HttpClient]
+    var testDf: DataFrame = null.asInstanceOf[DataFrame]
+
+    // Prevents JSON.parseFull to parse integers as doubles.
+    JSON.globalNumberParser = (input: String) => {
+        if (input.contains(".")) input.toDouble else input.toLong
+    }
+
+    // Asserts if the given json path for the given json string matches the 
expected value.
+    // This function circumvents the type cast limitations of JSON.parseFull.
+    def assertJson[T](jsonString: String, jsonPath: String, expectedValue: T): 
Unit = {
+        val jsonObject = 
JSON.parseFull(jsonString).get.asInstanceOf[Map[String, Any]]
+        // Allows paths with escaped dots for when they are part of the field 
name.
+        val pathElements = jsonPath.replace("\\.", 
"_").split("\\.").map(_.replace("_", "."))
+        val pathValue = 
pathElements.foldLeft(jsonObject.asInstanceOf[Any])((j, p) => {
+            j.asInstanceOf[Map[String, Any]](p).asInstanceOf[Any]
+        })
+        assert(pathValue.asInstanceOf[T] == expectedValue)
+    }
+
+    // Creates a DataFrameToDruid instance with testing defaults.
+    def createDftd(): DataFrameToDruid = {
+        new DataFrameToDruid(
+            sc = sc,
+            dataSource = "test",
+            inputDf = testDf,
+            dimensions = Seq("event_category", "event_action", "wiki"),
+            metrics = Seq("event_seconds"),
+            intervals = Seq((new DateTime(1970, 1, 1, 0, 0), new 
DateTime(1970, 1, 2, 0, 0))),
+            timestampColumn = "timestamp",
+            timestampFormat = "posix",
+            segmentGranularity = "hour",
+            queryGranularity = "minute",
+            numShards = 2,
+            reduceMemory = "8192",
+            hadoopQueue = "default",
+            druidHost = "test.druid.host",
+            druidPort = "8090",
+            checkInterval = 100,
+            tempFilePathOver = testTempFile,
+            httpClientOver = httpClientMock
+        )
+    }
+
+    // Creates a BasicHttpResponse given a status code and the respose data.
+    def createHttpResponse(statusCode: Int, data: String): BasicHttpResponse = 
{
+        val response = new BasicHttpResponse(
+            new ProtocolVersion("HTTP", 1, 1),
+            statusCode,
+            "TestDataFrameToDruid"
+        )
+        response.setEntity(new StringEntity(data))
+        response
+    }
+
+    override def beforeEach(): Unit = {
+        // Create schema and data to be used in tests.
+        val sqlContext = new SQLContext(sc)
+        val testSchema: StructType = (new StructType)
+            .add("event_category", StringType)
+            .add("event_action", StringType)
+            .add("event_seconds", DoubleType)
+            .add("timestamp", LongType)
+            .add("wiki", StringType)
+        val testRDD = sc.parallelize(Seq(
+            Row("cat1", "read", 10.0, 1L, "enwiki"),
+            Row("cat2", "edit", 20.0, 2L, "enwiki"),
+            Row("cat1", "read", 30.0, 3L, "enwiki"),
+            Row("cat3", "read", 40.0, 4L, "enwiki")
+        ))
+        testDf = sqlContext.createDataFrame(testRDD, testSchema)
+
+        // Mock HttpClient to be injected into DataFrameToDuid.
+        // Its behavior will be defined in each test.
+        httpClientMock = mock[HttpClient]
+    }
+
+    override def afterEach(): Unit = {
+        // Delete temp file in case DataFrameToDuid can not accomplish it
+        // because of execution errors or assert failures.
+        val path = new Path(testTempFile)
+        val fs = FileSystem.get(sc.hadoopConfiguration)
+        if (fs.exists(path)) fs.delete(path, true)
+    }
+
+    it should "request for ingestion with the correct url" in {
+        inSequence {
+            // Should recevie an ingestion request; checks method and url, 
returns ingestion task id.
+            (httpClientMock.execute(_: HttpUriRequest)).expects(*).onCall { 
request: HttpUriRequest =>
+                val uri = request.getURI()
+                assert(uri.getHost() == "test.druid.host")
+                assert(uri.getPort() == 8090)
+                assert(uri.getPath() == "/druid/indexer/v1/task")
+                createHttpResponse(200, """{"task": "test-task-1"}""")
+            }
+            // Should receive a status request; returns succeeded.
+            (httpClientMock.execute(_: HttpUriRequest)).expects(*).returning(
+                createHttpResponse(200, """{"status": {"status": 
"SUCCEEDED"}}""")
+            )
+        }
+        createDftd().start().await()
+    }
+
+    it should "request for ingestion with the correct spec" in {
+        inSequence {
+            // Should recevie an ingestion request; checks spec, returns 
ingestion task id.
+            (httpClientMock.execute(_: HttpUriRequest)).expects(*).onCall { 
request: HttpUriRequest =>
+                val contentStream = 
request.asInstanceOf[HttpPost].getEntity().getContent()
+                val spec = IOUtils.toString(contentStream)
+
+                assertJson(spec, "spec.ioConfig.inputSpec.paths", testTempFile)
+                assertJson(spec, "spec.dataSchema.dataSource", "test")
+                assertJson(spec, 
"spec.dataSchema.granularitySpec.segmentGranularity", "hour")
+                assertJson(spec, 
"spec.dataSchema.granularitySpec.queryGranularity", "minute")
+                assertJson(spec, "spec.dataSchema.granularitySpec.intervals",
+                    Seq("1970-01-01T00:00Z/1970-01-02T00:00Z"))
+                assertJson(spec, 
"spec.dataSchema.parser.parseSpec.dimensionsSpec.dimensions",
+                    Seq("event_category", "event_action", "wiki"))
+                assertJson(spec, 
"spec.dataSchema.parser.parseSpec.timestampSpec.format", "posix")
+                assertJson(spec, 
"spec.dataSchema.parser.parseSpec.timestampSpec.column", "timestamp")
+                assertJson(spec, "spec.dataSchema.metricsSpec", Seq(
+                    Map("name" -> "event_seconds", "fieldName" -> 
"event_seconds", "type" -> "doubleSum"),
+                    Map("name" -> "eventCount", "fieldName" -> "eventCount", 
"type" -> "longSum")
+                ))
+                assertJson(spec, "spec.tuningConfig.partitionsSpec.numShards", 
2L)
+                assertJson(spec, 
"spec.tuningConfig.jobProperties.mapreduce\\.reduce\\.memory\\.mb", "8192")
+                assertJson(spec, 
"spec.tuningConfig.jobProperties.mapreduce\\.job\\.queuename", "default")
+
+                createHttpResponse(200, """{"task": "test-task-1"}""")
+            }
+            // Should receive a status request; returns succeeded.
+            (httpClientMock.execute(_: HttpUriRequest)).expects(*).returning(
+                createHttpResponse(200, """{"status": {"status": 
"SUCCEEDED"}}""")
+            )
+        }
+        createDftd().start().await()
+    }
+
+    it should "request for ingestion with the correct data" in {
+        inSequence {
+            // Should recevie an ingestion request; checks data, returns 
ingestion task id.
+            (httpClientMock.execute(_: HttpUriRequest)).expects(*).onCall { 
request: HttpUriRequest =>
+                val sqlContext = new SQLContext(sc)
+                val inputDf = sqlContext.read.json(testTempFile)
+                val expectedDf = testDf.withColumn("eventCount", lit(1L))
+                assert(inputDf.intersect(expectedDf).take(1).isEmpty)
+                createHttpResponse(200, """{"task": "test-task-1"}""")
+            }
+            // Should receive a status request; returns succeeded.
+            (httpClientMock.execute(_: HttpUriRequest)).expects(*).returning(
+                createHttpResponse(200, """{"status": {"status": 
"SUCCEEDED"}}""")
+            )
+        }
+        createDftd().start().await()
+    }
+
+    it should "request for status check with correct method and url" in {
+        inSequence {
+            // Should recevie an ingestion request; returns ingestion task id.
+            (httpClientMock.execute(_: HttpUriRequest)).expects(*).returning(
+                createHttpResponse(200, """{"task": "test-task-1"}""")
+            )
+            // Should receive a status request; checks method and url, returns 
succeeded.
+            (httpClientMock.execute(_: HttpUriRequest)).expects(*).onCall { 
request: HttpUriRequest =>
+                val uri = request.getURI()
+                assert(uri.getHost() == "test.druid.host")
+                assert(uri.getPort() == 8090)
+                assert(uri.getPath() == 
"/druid/indexer/v1/task/test-task-1/status")
+                assert(request.getMethod() == "GET")
+                createHttpResponse(200, """{"status": {"status": 
"SUCCEEDED"}}""")
+            }
+        }
+        createDftd().start().await()
+    }
+
+    it should "delete the temporary file" in {
+        inSequence {
+            // Should recevie an ingestion request; returns ingestion task id.
+            (httpClientMock.execute(_: HttpUriRequest)).expects(*).returning(
+                createHttpResponse(200, """{"task": "test-task-1"}""")
+            )
+            // Should receive a status request; returns succeeded.
+            (httpClientMock.execute(_: HttpUriRequest)).expects(*).returning(
+                createHttpResponse(200, """{"status": {"status": 
"SUCCESS"}}""")
+            )
+        }
+        createDftd().start().await()
+
+        // Check that the temp file has been deleted.
+        val path = new Path(testTempFile)
+        val fs = FileSystem.get(sc.hadoopConfiguration)
+        assert(!fs.exists(path))
+    }
+
+    it should "call the user callback once finished" in {
+        inSequence {
+            // Should recevie an ingestion request; returns ingestion task id.
+            (httpClientMock.execute(_: HttpUriRequest)).expects(*).returning(
+                createHttpResponse(200, """{"task": "test-task-1"}""")
+            )
+            // Should receive a status request; returns succeeded.
+            (httpClientMock.execute(_: HttpUriRequest)).expects(*).returning(
+                createHttpResponse(200, """{"status": {"status": 
"SUCCESS"}}""")
+            )
+        }
+
+        // Check that callback is actually executed.
+        var callbackExecuted = false
+        createDftd().start(Some((status: IngestionStatus.Value) => {
+            assert(status == IngestionStatus.Done)
+            callbackExecuted = true
+        })).await()
+        assert(callbackExecuted)
+    }
+}
diff --git a/refinery-job/pom.xml b/refinery-job/pom.xml
index 21d4f6e..fd9a438 100644
--- a/refinery-job/pom.xml
+++ b/refinery-job/pom.xml
@@ -19,19 +19,6 @@
             <artifactId>refinery-core</artifactId>
         </dependency>
 
-        <!-- 
-             adding explicit dep for snappy, otherwise
-             spark assumes is on the java.library.path
-             see: 
https://github.com/rvs/snappy-java/blob/master/src/main/resources/org/xerial/snappy/SnappyNativeLoader.java#L47
-        -->
-        <!-- https://mvnrepository.com/artifact/org.xerial.snappy/snappy-java 
-->
-        <dependency>
-            <groupId>org.xerial.snappy</groupId>
-            <artifactId>snappy-java</artifactId>
-            <version>1.1.2.5</version>
-             <scope>test</scope>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/EventLoggingToDruid.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/EventLoggingToDruid.scala
new file mode 100644
index 0000000..e990927
--- /dev/null
+++ 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/EventLoggingToDruid.scala
@@ -0,0 +1,286 @@
+package org.wikimedia.analytics.refinery.job
+
+import org.apache.log4j.LogManager
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.types.{IntegerType, LongType, FloatType, 
DoubleType, StructField, StructType}
+import org.joda.time.DateTime
+import org.joda.time.format.DateTimeFormat
+import org.wikimedia.analytics.refinery.core.{DataFrameToDruid, 
IngestionStatus}
+import scopt.OptionParser
+
+
+object EventLoggingToDruid {
+
+    val log = LogManager.getLogger("EventLoggingToDruid")
+    val DateFormatter = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH")
+
+    case class Params(
+        table: String = "",
+        startDate: DateTime = new DateTime(0),
+        endDate: DateTime = new DateTime(0),
+        database: String = "event",
+        metrics: (StructField) => Boolean = (f) => false,
+        blacklist: Seq[String] = Seq.empty,
+        segmentGranularity: String = "hour",
+        queryGranularity: String = "minute",
+        numShards: Int = 2,
+        reduceMemory: String = "8192",
+        hadoopQueue: String = "default",
+        druidHost: String = "druid1001.eqiad.wmnet",
+        druidPort: String = "8090",
+        dryRun: Boolean = false
+    )
+
+    // Field filters to help selecting fields as metrics.
+    object FieldFilter {
+        def isNumber(field: StructField): Boolean = {
+            field.dataType match {
+                case IntegerType | LongType | FloatType | DoubleType => true
+                case _ => false
+            }
+        }
+        def hasMetricAffix(field: StructField): Boolean = {
+            val withAffix = "(?i).*metric.*".r
+            field.name match {
+                case withAffix() => true
+                case _ => false
+            }
+        }
+    }
+
+    // Support implicit DateTime conversion from CLI opt.
+    // The opt can either be given in integer hours ago, or
+    // as an ISO-8601 date time.
+    implicit val scoptDateTimeRead: scopt.Read[DateTime] =
+        scopt.Read.reads { s => {
+            if (s.forall(Character.isDigit))
+                DateTime.now.minusHours(s.toInt)
+            else
+                DateTime.parse(s, DateFormatter)
+        }
+    }
+
+    val argsParser = new OptionParser[Params](
+        "spark-submit --class 
org.wikimedia.analytics.refinery.job.EventLoggingToDruid refinery-job.jar"
+    ) {
+        head("""
+            |Hive tables -> Druid data sets
+            |
+            |Example:
+            |  spark-submit --class 
org.wikimedia.analytics.refinery.job.EventLoggingToDruid refinery-job.jar \
+            |   --database         event \
+            |   --table            NavigationTiming \
+            |   --start-date       2017-09-29T03 \
+            |   --end-date         2017-12-15T21 \
+            |   --metrics          numbers \
+            |   --blacklist        pageId,namespaceId,revId \
+            |   --dry-run
+            |
+            |""".stripMargin, "")
+
+        note("""NOTE: You may pass all of the described CLI options to this 
job in a single
+               |      string with --options '<options>' flag.\n""".stripMargin)
+
+        help("help") text "Prints this usage text and exit."
+
+        opt[String]('T', "table").required().valueName("<table>").action { (x, 
p) =>
+            p.copy(table = x)
+        }.text("Hive input table.")
+
+        opt[DateTime]('S', 
"start-date").required().valueName("<YYYY-MM-DDTHH>").action { (x, p) =>
+            p.copy(startDate = new DateTime(x))
+        }.text("Start date of the interval to load (inclusive).")
+
+        opt[DateTime]('E', 
"end-date").required().valueName("<YYYY-MM-DDTHH>").action { (x, p) =>
+            p.copy(endDate = new DateTime(x))
+        }.text("End date of the interval to load (exclusive).")
+
+        opt[String]('D', "database").optional().valueName("<database>").action 
{ (x, p) =>
+            p.copy(database = x)
+        }.text("Hive input database. Default: event.")
+
+        opt[String]('m', "metrics").optional().valueName("<filter>").action { 
(x, p) =>
+            p.copy(metrics = x match {
+                case "number" => FieldFilter.isNumber
+                case "affix" => FieldFilter.hasMetricAffix
+            })
+        }.text("Filter that will select metric columns (number|affix): " +
+               "'number' selects all columns that have a numeric type. " +
+               "'affix' selects all columns with the word 'metric' in them. " +
+               "eventCount will always be a metric in the loaded data set.")
+
+        opt[Seq[String]]('b', 
"blacklist").optional().valueName("<column1>,<column2>...").action { (x, p) =>
+            p.copy(blacklist = x)
+        }.text("List of columns that are not to be loaded. For struct columns, 
" +
+               "passing the column name will blacklist all data, whereas " +
+               "column_subField will only blacklist this sub-field.")
+
+        opt[String]('g', 
"segment-granularity").optional().valueName("<granularity>").action { (x, p) =>
+            p.copy(segmentGranularity = x)
+        }.text("Granularity for Druid segments (quarter|month|week|day|hour). 
Default: hour.")
+
+        opt[String]('q', 
"query-granularity").optional().valueName("<granularity>").action { (x, p) =>
+            p.copy(queryGranularity = x)
+        }.text("Granularity for Druid queries (week|day|hour|minute|second). 
Default: minute.")
+
+        opt[Int]('x', "num-shards").optional().valueName("<N>").action { (x, 
p) =>
+            p.copy(numShards = x)
+        }.text("Number of shards for Druid ingestion. Default: 2.")
+
+        opt[Int]('x', "reduce-memory").optional().valueName("<N>").action { 
(x, p) =>
+            p.copy(reduceMemory = x.toString())
+        }.text("Memory to be used by Hadoop for reduce operations. Default: 
8192.")
+
+        opt[String]('h', "hadoop-queue").optional().valueName("<N>").action { 
(x, p) =>
+            p.copy(hadoopQueue = x)
+        }.text("Hadoop queue where to execute the loading. Default: default.")
+
+        opt[String]('d', "druid-host").optional().valueName("<host>").action { 
(x, p) =>
+            p.copy(druidHost = x)
+        }.text("Druid host to load the data to. Default: 
druid1001.eqiad.wmnet.")
+
+        opt[Int]('p', "druid-port").optional().valueName("<port>").action { 
(x, p) =>
+            p.copy(druidPort = x.toString())
+        }.text("Druid port to load the data to. Default: 8090.")
+
+        opt[Unit]('n', "dry-run").optional().action { (x, p) =>
+            p.copy(dryRun = true)
+        }.text("Do not execute any loading, only check and print parameters.")
+    }
+
+    val blacklistedHiveFields = Set("year", "month", "day", "hour")
+    val blacklistedCapsuleFields = Set("schema", "seqId", "uuid", "userAgent", 
"clientValidated",
+        "isTruncated", "clientIp")
+    val legitCapsuleFields = Set("wiki", "webHost", "revision", "topic", 
"recvFrom")
+
+    // Entry point
+    def main(args: Array[String]): Unit = {
+        val params = args.headOption match {
+            case Some("--options") =>
+                // If job options are given as a single string.
+                // Split them before passing them to argsParser.
+                argsParser.parse(args(1).split("\\s+"), 
Params()).getOrElse(sys.exit(1))
+            case _ =>
+                argsParser.parse(args, Params()).getOrElse(sys.exit(1))
+        }
+
+        if (apply(params)) sys.exit(0)
+        else sys.exit(1)
+    }
+
+    // This will be called after command line parameters have been parsed and 
checked.
+    def apply(params: Params): Boolean = {
+
+        log.info(s"Starting process for ${params.database}_${params.table}.")
+        log.info(s"Querying Hive for intervals: " + Seq((params.startDate, 
params.endDate)).toString())
+
+        // Initialize sqlContext.
+        val sc = new SparkContext(new SparkConf())
+        val hiveContext = new HiveContext(sc)
+        val sqlContext = hiveContext.asInstanceOf[SQLContext]
+
+        // Get data already filtered by time range.
+        val comparisonFormat = "yyyyMMddHH"
+        val comparisonStartDate = params.startDate.toString(comparisonFormat)
+        val comparisonEndDate = params.endDate.toString(comparisonFormat)
+        val concatTimestamp = "CONCAT(year, LPAD(month, 2, '0'), LPAD(day, 2, 
'0'), LPAD(hour, 2, '0'))"
+        val df = sqlContext.sql(s"""
+            SELECT *
+            FROM ${params.database}.${params.table}
+            WHERE ${concatTimestamp} >= ${comparisonStartDate}
+            AND ${concatTimestamp} < ${comparisonEndDate}
+        """)
+
+        log.info("Preparing dimensions and metrics.")
+
+        // Flatten nested fields.
+        val flatColumns = getFlatColumns(df.schema)
+        val flatDf = df.select(flatColumns:_*)
+
+        // Remove blacklisted fields.
+        val cleanColumns = getCleanColumns(flatDf.schema, params.blacklist)
+        val finalDf = flatDf.select(cleanColumns:_*)
+
+        // Get dimensions and metrics.
+        val (dimensionFields, metricFields) = 
getDimensionsAndMetrics(finalDf.schema, params.metrics)
+
+        log.info("Dimensions: " + dimensionFields.mkString(", "))
+        log.info("Metrics: " + metricFields.mkString(", "))
+
+        if (params.dryRun) {
+            log.info("Dry run finished: no data was loaded.")
+            true
+        } else {
+            // Execute loading process.
+            log.info("Launching DataFrameToDruid process.")
+            val dftd = new DataFrameToDruid(
+                sc = sc,
+                dataSource = s"${params.database}_${params.table}",
+                inputDf = finalDf,
+                dimensions = dimensionFields,
+                metrics = metricFields,
+                intervals = Seq((params.startDate, params.endDate)),
+                timestampColumn = "dt",
+                timestampFormat = "auto",
+                segmentGranularity = params.segmentGranularity,
+                queryGranularity = params.queryGranularity,
+                numShards = params.numShards,
+                reduceMemory = params.reduceMemory,
+                hadoopQueue = params.hadoopQueue,
+                druidHost = params.druidHost,
+                druidPort = params.druidPort
+            ).start().await()
+            log.info("Done.")
+
+            // Return whether the process was successful.
+            (dftd.status() == IngestionStatus.Done)
+        }
+    }
+
+    def getFlatColumns(schema: StructType, prefix: String = null): Seq[Column] 
= {
+        // HACK: This map corrects casing for capsule fields, given that Hive 
kills camelCase.
+        val capsuleFields = legitCapsuleFields.union(blacklistedCapsuleFields)
+        val capsuleCaseMap = capsuleFields.map(f => (f.toLowerCase(), f)).toMap
+
+        schema.fields.flatMap(field => {
+            val columnName = if (prefix == null) field.name else (prefix + "." 
+ field.name)
+            val columnAlias = columnName.split("\\.").map(n => 
capsuleCaseMap.getOrElse(n, n)).mkString("_")
+
+            field.dataType match {
+                case struct: StructType => getFlatColumns(struct, columnName)
+                case _ => Seq(col(columnName).as(columnAlias))
+            }
+        })
+    }
+
+    def getCleanColumns(schema: StructType, blacklist: Seq[String]): 
Seq[Column] = {
+        val blacklistNames = blacklist.toSet
+            .union(blacklistedCapsuleFields)
+            .union(blacklistedHiveFields)
+        val fieldNames = schema.fields.map(f => f.name)
+        val withPrefix = "([^_]*)_.*".r
+        fieldNames.filter(f => (
+            !blacklistNames.contains(f) &&
+            (f match {
+                case withPrefix(prefix) => !blacklistNames.contains(prefix)
+                case _ => true
+            })
+        )).map(col(_))
+    }
+
+    def getDimensionsAndMetrics(
+        schema: StructType,
+        metrics: (StructField) => Boolean
+    ): (Seq[String], Seq[String]) = {
+        val allFields = schema.fields.filter(f => f.name != "dt")
+        val metricFields = allFields
+            .filter((f) => !legitCapsuleFields.contains(f.name))
+            .filter(metrics)
+        val dimensionFields = allFields.filter(!metricFields.contains(_))
+        (dimensionFields.map(_.name), metricFields.map(_.name))
+    }
+}
diff --git 
a/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestEventLoggingToDruid.scala
 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestEventLoggingToDruid.scala
new file mode 100644
index 0000000..d9bed98
--- /dev/null
+++ 
b/refinery-job/src/test/scala/org/wikimedia/analytics/refinery/job/TestEventLoggingToDruid.scala
@@ -0,0 +1,104 @@
+package org.wikimedia.analytics.refinery.job
+
+import com.holdenkarau.spark.testing.SharedSparkContext
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.types.{StructType, StringType, IntegerType, 
LongType}
+import org.scalatest.{FlatSpec, Matchers, BeforeAndAfterEach}
+
+
+class TestEventLoggingToDruid extends FlatSpec
+    with Matchers with BeforeAndAfterEach with SharedSparkContext {
+
+    var testDf: DataFrame = null.asInstanceOf[DataFrame]
+
+    override def beforeEach(): Unit = {
+        val testSchema: StructType = (new StructType)
+            .add("event", (new StructType)
+                .add("pageId", StringType)
+                .add("action", StringType)
+                .add("seconds", IntegerType))
+            .add("dt", StringType)
+            .add("webhost", StringType)
+            .add("useragent", StringType)
+            .add("year", IntegerType)
+
+        val testRDD = sc.parallelize(Seq(
+            Row(Row("page1", "edit", 10), "2017-01-01T00:00:00", 
"en.wikimedia.org", "UA1", 2017),
+            Row(Row("page1", "read", 20), "2017-01-01T00:00:01", 
"en.wikimedia.org", "UA2", 2017),
+            Row(Row("page2", "edit", 30), "2017-01-01T00:00:02", 
"es.wikimedia.org", "UA3", 2017),
+            Row(Row("page3", "read", 40), "2017-01-01T00:00:03", 
"es.wikimedia.org", "UA4", 2017),
+            Row(Row("page3", "read", 50), "2017-01-01T00:00:04", 
"es.wikimedia.org", "UA5", 2017)
+        ))
+
+        val sqlContext = new SQLContext(sc)
+        testDf = sqlContext.createDataFrame(testRDD, testSchema)
+    }
+
+
+    it should "flatten the event part of the schema" in {
+        val flatColumns = EventLoggingToDruid.getFlatColumns(testDf.schema)
+        val flatFields = testDf.select(flatColumns:_*).schema.map(f => f.name)
+
+        assert(flatFields.contains("event_pageId"))
+        assert(flatFields.contains("event_action"))
+        assert(flatFields.contains("event_seconds"))
+    }
+
+    it should "correct case for the capsule fields" in {
+        val flatColumns = EventLoggingToDruid.getFlatColumns(testDf.schema)
+        val flatFields = testDf.select(flatColumns:_*).schema.map(f => f.name)
+
+        assert(flatFields.contains("webHost"))
+        assert(flatFields.contains("userAgent"))
+    }
+
+    it should "blacklist whole columns properly" in {
+        val blacklist = Seq("event", "webHost")
+        val flatColumns = EventLoggingToDruid.getFlatColumns(testDf.schema)
+        val flatDf = testDf.select(flatColumns:_*)
+        val cleanColumns = EventLoggingToDruid.getCleanColumns(flatDf.schema, 
blacklist)
+        val cleanFields = flatDf.select(cleanColumns:_*).schema.map(f => 
f.name)
+
+        assert(cleanFields.length == 1)
+        assert(cleanFields.contains("dt"))
+    }
+
+    it should "blacklist struct column subfields properly" in {
+        val blacklist = Seq("event_pageId", "event_seconds")
+        val flatColumns = EventLoggingToDruid.getFlatColumns(testDf.schema)
+        val flatDf = testDf.select(flatColumns:_*)
+        val cleanColumns = EventLoggingToDruid.getCleanColumns(flatDf.schema, 
blacklist)
+        val cleanFields = flatDf.select(cleanColumns:_*).schema.map(f => 
f.name)
+
+        assert(cleanFields.length == 3)
+        assert(cleanFields.contains("event_action"))
+        assert(cleanFields.contains("dt"))
+        assert(cleanFields.contains("webHost"))
+    }
+
+    it should "select dimensions and metrics properly" in {
+        val blacklist = Seq("event_pageId")
+        val flatColumns = EventLoggingToDruid.getFlatColumns(testDf.schema)
+        val flatDf = testDf.select(flatColumns:_*)
+        val cleanColumns = EventLoggingToDruid.getCleanColumns(flatDf.schema, 
blacklist)
+        val cleanDf = flatDf.select(cleanColumns:_*)
+
+        val (dimensionFields, metricFields) = 
EventLoggingToDruid.getDimensionsAndMetrics(
+            cleanDf.schema,
+            (f) => f.dataType match {
+                case IntegerType => true
+                case _ => false
+            }
+        )
+
+        assert(dimensionFields.length == 2)
+        assert(dimensionFields.contains("event_action"))
+        assert(dimensionFields.contains("webHost"))
+
+        assert(metricFields.length == 1)
+        assert(metricFields.contains("event_seconds"))
+    }
+}

-- 
To view, visit https://gerrit.wikimedia.org/r/386882
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I30c374c3dcba44bbd0608e72ae0162bcc442cd0f
Gerrit-PatchSet: 25
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Mforns <mfo...@wikimedia.org>
Gerrit-Reviewer: Joal <j...@wikimedia.org>
Gerrit-Reviewer: Mforns <mfo...@wikimedia.org>
Gerrit-Reviewer: Milimetric <dandree...@wikimedia.org>
Gerrit-Reviewer: Nuria <nu...@wikimedia.org>
Gerrit-Reviewer: Ottomata <ao...@wikimedia.org>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to