jenkins-bot has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/405285 )

Change subject: Clean refinery-job from BannerImpressionStream job
......................................................................


Clean refinery-job from BannerImpressionStream job

The BannerImpressionStream job now leaves in the temporary module
refinery-job-spark-2.1 as it depends on spark-2.

Change-Id: Ic859e84b43e6c5dba66987432af048ac38c81655
---
M refinery-job/pom.xml
D 
refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/BannerImpressionsStream.scala
2 files changed, 1 insertion(+), 205 deletions(-)

Approvals:
  Ottomata: Looks good to me, but someone else must approve
  Joal: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/refinery-job/pom.xml b/refinery-job/pom.xml
index fd9a438..4091d23 100644
--- a/refinery-job/pom.xml
+++ b/refinery-job/pom.xml
@@ -64,25 +64,6 @@
         </dependency>
 
         <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-streaming-kafka_2.10</artifactId>
-            <version>${spark.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.spark</groupId>
-            <artifactId>spark-streaming_2.10</artifactId>
-            <version>${spark.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>com.netaporter</groupId>
-            <artifactId>scala-uri_2.10</artifactId>
-            <version>0.4.16</version>
-        </dependency>
-
-        <dependency>
             <groupId>com.twitter</groupId>
             <artifactId>algebird-core_2.10</artifactId>
             <version>0.10.2</version>
@@ -141,8 +122,7 @@
             <artifactId>graphframes</artifactId>
             <version>0.3.0-spark1.6-s_2.10</version>
         </dependency>
-
-
+        
         <dependency>
             <groupId>com.holdenkarau</groupId>
             <artifactId>spark-testing-base_2.10</artifactId>
diff --git 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/BannerImpressionsStream.scala
 
b/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/BannerImpressionsStream.scala
deleted file mode 100644
index 6086d3c..0000000
--- 
a/refinery-job/src/main/scala/org/wikimedia/analytics/refinery/job/BannerImpressionsStream.scala
+++ /dev/null
@@ -1,184 +0,0 @@
-package org.wikimedia.analytics.refinery.job
-
-import com.netaporter.uri.Uri
-import kafka.serializer.StringDecoder
-import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, 
ProducerConfig}
-import org.apache.spark.streaming.kafka.KafkaUtils
-import org.apache.spark.{SparkContext, SparkConf}
-import org.apache.spark.streaming.{Seconds, StreamingContext}
-import org.wikimedia.analytics.refinery.core.Webrequest
-import scopt.OptionParser
-import org.json4s._
-import org.json4s.jackson.JsonMethods._
-import org.json4s.JsonDSL._
-import scala.collection.JavaConverters._
-
-
-object BannerImpressionsStream {
-
-  def run(
-           @transient sc: SparkContext,
-           kafkaBrokers:String,
-           kafkaInputTopics: String,
-           kafkaOutputTopic: String,
-           batchDurationSeconds: Int,
-           checkpointDirectory: String,
-           noCheckpoint: Boolean
-         ): Unit = {
-
-    def newStreamingContext() = {
-      val ssc = new StreamingContext(sc, Seconds(batchDurationSeconds.toLong))
-      ssc.checkpoint(checkpointDirectory)
-
-      val kafkaInputTopicsSet = kafkaInputTopics.split(",").toSet
-      val KafkaInputParameters = Map[String, String]("metadata.broker.list" -> 
kafkaBrokers)
-
-      // Get kafka batches from input topics
-      val messageStream = KafkaUtils.createDirectStream[String, String, 
StringDecoder, StringDecoder](
-        ssc,
-        KafkaInputParameters,
-        kafkaInputTopicsSet
-      )
-
-      // Compute banner oriented filtering / conversion / aggregation
-      val bannerStream = messageStream.
-        // Extract the JSON message from the Kafka (Key, Value) message.
-        map { case (_, str) => parse(str) }.
-        filter(json => {
-          (json \ "uri_path").values.asInstanceOf[String] == 
"/beacon/impression" &&
-            (json \ 
"uri_query").values.asInstanceOf[String].contains("debug=false") &&
-            !Webrequest.getInstance().isSpider((json \ 
"user_agent").values.asInstanceOf[String])
-        }).
-        map(json => {
-          val uri_query = (json \\ "uri_query").values.asInstanceOf[String]
-          val uri: Uri = Uri.parse("http://bla.org/woo/"; + uri_query)
-          val minuteTs = (json \ 
"dt").values.asInstanceOf[String].replaceAll(":\\d\\d$", ":00")
-          val params: Map[String, Seq[String]] = uri.query.paramMap
-
-          ("dt" -> minuteTs) ~
-            ("campaign" -> params.getOrElse("campaign", 
List.empty[String]).headOption) ~
-            ("banner" -> params.getOrElse("banner", 
List.empty[String]).headOption) ~
-            ("project" -> params.getOrElse("project", 
List.empty[String]).headOption) ~
-            ("uselang" -> params.getOrElse("uselang", 
List.empty[String]).headOption) ~
-            ("bucket" -> params.getOrElse("bucket", 
List.empty[String]).headOption) ~
-            ("anonymous" -> (params.getOrElse("anonymous", 
List.empty[String]).headOption == Some("true"))) ~
-            ("status_code" -> params.getOrElse("statusCode", 
List.empty[String]).headOption) ~
-            ("country" -> params.getOrElse("country", 
List.empty[String]).headOption) ~
-            ("device" -> params.getOrElse("device", 
List.empty[String]).headOption) ~
-            ("sample_rate" -> params.getOrElse("recordImpressionSampleRate", 
List.empty[String]).headOption.map(_.toDouble))
-        }).
-        countByValue().
-        map { case (json, count) =>
-          val jsonSampleRate = json \ "sample_rate"
-          json merge (
-            ("request_count" -> count) ~
-              ("normalized_request_count" -> {
-                if (jsonSampleRate != JNothing) Some(count / 
jsonSampleRate.values.asInstanceOf[Double])
-                else None
-              })
-            )
-        }.
-        map(j => compact(render(j)))
-
-      // Output banners data back to kafka
-      bannerStream.foreachRDD(rdd => {
-        System.out.println("# events = " + rdd.count())
-
-        rdd.foreachPartition(partition => {
-          // Print statements in this section are shown in the executor's 
stdout logs
-          val props = Map[String, String](
-            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkaBrokers,
-            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> 
"org.apache.kafka.common.serialization.StringSerializer",
-            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> 
"org.apache.kafka.common.serialization.StringSerializer"
-          ).asInstanceOf[Map[String, Object]]
-
-          val producer = new KafkaProducer[String, String](props.asJava)
-          partition.foreach(record => {
-            val data = record.toString
-            val message = new ProducerRecord[String, String](kafkaOutputTopic, 
null, data)
-            producer.send(message)
-          })
-          producer.close()
-        })
-      })
-      ssc
-    }
-
-    val context = {
-      if (noCheckpoint) newStreamingContext()
-      else StreamingContext.getOrCreate(checkpointDirectory, 
newStreamingContext)
-    }
-
-    // Start the context
-    context.start()
-    context.awaitTermination()
-  }
-
-  /**
-    * Config class for CLI argument parser using scopt
-    */
-  case class Params(
-                     kafkaBrokers: String = Seq("12", "13", "14", "18", "20", 
"22").map("kafka10" + _ + ".eqiad.wmnet:9092").mkString(","),
-                     kafkaInputTopics: String = "webrequest_text",
-                     kafkaOutputTopic: String = "test_banner_impressions_joal",
-                     batchDurationSecs: Int = 10,
-                     checkpointDirectory: String = 
"hdfs://analytics-hadoop/tmp/spark/banner_impressions_stream_checkpoint",
-                     noCheckpoint: Boolean = false
-                   )
-
-  /**
-    * Define the command line options parser
-    */
-  val argsParser = new OptionParser[Params]("Banner Impressions Stream") {
-    head("Banner Impressions Stream", "")
-    note( "Extract banner impressions data from kafka webrequest stream and 
write it back to kafka")
-    help("help") text "Prints this usage text"
-
-    opt[String]('k', "kafka-brokers") optional() valueName 
"<broker1,...,brokerN>" action {
-      (x, p) => p.copy(kafkaBrokers = x)
-    } text "Kafka brokers to consume from. Defaults to 
kafka10[12|14|18|20|22].eqiad.wmnet:9092"
-
-    opt[String]('i', "kafka-input-topics") optional() valueName 
"<topic1,...,topicK>" action {
-      (x, p) => p.copy(kafkaInputTopics = x)
-    } text "Input topics to consume. Defaults to webrequest_text"
-
-    opt[String]('o', "kafka-output-topic") optional() valueName "<topic>" 
action {
-      (x, p) => p.copy(kafkaOutputTopic = x)
-    } text "Output topic to write to. Defaults to test_banner_impressions_joal"
-
-    opt[Int]("batch-duration-seconds") optional() action {
-      (x, p) => p.copy(batchDurationSecs = x)
-    } text "Batch duration in seconds. Defaults to 10."
-
-    opt[String]("checkpoint-dir") optional() valueName "<path>" action {
-      (x, p) => p.copy(checkpointDirectory = if (x.endsWith("/")) x else x + 
"/")
-    } text ("Temporary directory for check-pointing streaming job.\n\t" +
-      "Defaults to 
hdfs://analytics-hadoop/tmp/spark/banner_impressions_stream_checkpoint")
-
-    opt[Unit]("no-checkpoint") optional() action {
-      (_, p) => p.copy(noCheckpoint = true)
-    } text "Force NOT using checkpoint if exists (wipes existing checkpoint 
directory if any)."
-
-  }
-
-  def main(args: Array[String]) {
-    argsParser.parse(args, Params()) match {
-
-      case Some(params) =>
-        // Initial setup - Spark, SQLContext
-        val conf = new SparkConf().setAppName("BannerImpressionsStream")
-        val sc = new SparkContext(conf)
-        run(
-          sc,
-          params.kafkaBrokers,
-          params.kafkaInputTopics,
-          params.kafkaOutputTopic,
-          params.batchDurationSecs,
-          params.checkpointDirectory,
-          params.noCheckpoint
-        )
-
-      case None => sys.exit(1)
-    }
-  }
-}

-- 
To view, visit https://gerrit.wikimedia.org/r/405285
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ic859e84b43e6c5dba66987432af048ac38c81655
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Joal <j...@wikimedia.org>
Gerrit-Reviewer: Elukey <ltosc...@wikimedia.org>
Gerrit-Reviewer: Joal <j...@wikimedia.org>
Gerrit-Reviewer: Nuria <nu...@wikimedia.org>
Gerrit-Reviewer: Ottomata <ao...@wikimedia.org>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to