DCausse has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/384850 )

Change subject: Drop proof of concept code
......................................................................


Drop proof of concept code

This POC stuff was held over initially as a guide for initial
portions of mjolnir. That is now complete and this is just
extra unnecessary stuff now. Git history will forever remember
it so no point it keeping it in the live tree.

Change-Id: I21a504b34f3f22a4cc907041b838d584585602d2
---
D poc/README
D poc/config.py
D poc/data_prepare.py
D poc/data_process_features.py
D poc/gen_splits.py
D poc/merge_vector_data.py
D poc/parse_feature_logs.py
D poc/utils/__init__.py
D poc/utils/os_utils.py
D poc/utils/spark_utils.py
10 files changed, 0 insertions(+), 723 deletions(-)

Approvals:
  DCausse: Verified; Looks good to me, approved



diff --git a/poc/README b/poc/README
deleted file mode 100644
index 2fe181d..0000000
--- a/poc/README
+++ /dev/null
@@ -1,32 +0,0 @@
-== Proof of Concept
-
-This directory contains the proof of concept that was built out prior to 
starting this
-library. This code is *not* intended to live on, but instead is here to help 
inform
-the work of building the library. As we build out the related pieces of the 
library
-code should disapear from this part of the repository, until the poc/ directory
-is completely gone.
-
-== How to run
-
-# Collect the raw data to work with
-spark-submit --master yarn --deploy-mode client --jars 
/mnt/hdfs/wmf/refinery/current/artifacts/refinery-hive.jar data_prepare.py
-
-# Train DBN for relevance estimates
-spark-submit --master yarn --deploy-mode client --py-files 
/a/ebernhardson/feature_log/lib/python2.7/site-packages/clickmodels-1.0.2-py2.7.egg
 data_process_dbn.py
-# Collect feature vectors for queries (can run in parallel with DBN)
-spark-submit --master yarn --deploy-mode client --py-files 
/home/ebernhardson/kafka_python-1.3.4.dev-py2.7.egg data_process_features.py
-
-# Copy es queries to somewhere that can send them into relforge
-tar -cvf es_queries.tar -C 
/mnt/hdfs/user/ebernhardson/ltr/en.wikipedia_10S_150000Q es_queries
-# run them, collect feature logs, ship them back here, copy to hdfs
-zcat part-00*.gz | pv -l | parallel -j 32 curl -s 
http://localhost:9200/enwikibm25perfield_content/page/_search -d {} > /dev/null
-...
-
-# Parse logs
-pyspark --master yarn parse_feature_logs.py
-
-# Join the various bits of data into a final data frame
-pyspark --master yarn merge_vector_data.py
-
-# (TODO) Convert the vector data df to appropriate formats for ranklib, 
xgboost, lightgbm
-
diff --git a/poc/config.py b/poc/config.py
deleted file mode 100644
index 8b18fa7..0000000
--- a/poc/config.py
+++ /dev/null
@@ -1,76 +0,0 @@
-import os
-
-from utils import os_utils
-
-# Config for sourcing clicks
-WIKI_PROJECT = 'en.wikipedia'
-MIN_NUM_SEARCHES = 10
-MAX_QUERIES = 150000
-
-# Config for training DBN
-DBN_CONFIG = {
-    'MAX_ITERATIONS': 40,
-    'DEBUG': False,
-    'PRETTY_LOG': True,
-    'MIN_DOCS_PER_QUERY': 10,
-    'MAX_DOCS_PER_QUERY': 20,
-    'SERP_SIZE': 20,
-    'QUERY_INDEPENDENT_PAGER': False,
-    'DEFAULT_REL': 0.5,
-}
-
-# Hadoop Directories
-HADOOP_PREFIX = 'hdfs://analytics-hadoop/user/ebernhardson/ltr/%s_%dS_%dQ' % (
-    WIKI_PROJECT, MIN_NUM_SEARCHES, MAX_QUERIES)
-HADOOP_PREFIX_LOCAL = '/mnt/hdfs/user/ebernhardson/ltr/%s_%dS_%dQ' % (
-    WIKI_PROJECT, MIN_NUM_SEARCHES, MAX_QUERIES)
-
-# Initial result from sql queries sourcing click data
-CLICK_DATA = '%s/click_data' % (HADOOP_PREFIX)
-
-# feature logs as dataframe
-FEATURE_LOGS = '%s/feature_logs' % (HADOOP_PREFIX)
-
-# Data to feed into DBN
-DBN_INPUT = '%s/dbn_input' % (HADOOP_PREFIX)
-DBN_INPUT_LOCAL = '%s/dbn_input' % (HADOOP_PREFIX_LOCAL)
-# Result data from DBN
-DBN_RELEVANCE = '%s/dbn_output' % (HADOOP_PREFIX)
-
-# Feature vectors sourced from elasticsearch ltr plugin about
-# all the (query, hit_page_id) pairs in CLICK_DATA, merged
-# with relevance scores from RELEVANCE_DATA
-VECTOR_DATA = "%s/vector_data" % (HADOOP_PREFIX)
-
-# Local Directories
-ROOT_DIR = "../data/%s_%dS_%dQ" % (WIKI_PROJECT, MIN_NUM_SEARCHES, MAX_QUERIES)
-DATA_DIR = os.path.join(ROOT_DIR, 'data')
-TMP_DIR = os.path.join(ROOT_DIR, 'tmp')
-
-# Data files
-
-# Train/test/validation splits in svmrank format
-SVMRANK_TRAIN_DATA = os.path.join(DATA_DIR, 'svmrank_train.txt')
-SVMRANK_TEST_DATA = os.path.join(DATA_DIR, 'svmrank_test.txt')
-SVMRANK_VALI_DATA = os.path.join(DATA_DIR, 'svmrank_vali.txt')
-
-# Train/test/validation splits in lightgbm format
-LIGHTGBM_TRAIN_DATA = os.path.join(HADOOP_PREFIX, 'lightgbm/train')
-LIGHTGBM_TEST_DATA = os.path.join(HADOOP_PREFIX, 'lightgbm/test')
-LIGHTGBM_VALI_DATA = os.path.join(HADOOP_PREFIX, 'lightgbm/vali')
-
-# Train/test/validation splits for xgboost on hdfs
-XGBOOST_TRAIN_DATA = os.path.join(HADOOP_PREFIX, 'xgboost/train')
-XGBOOST_TEST_DATA = os.path.join(HADOOP_PREFIX, 'xgboost/test')
-XGBOOST_VALI_DATA = os.path.join(HADOOP_PREFIX, 'xgboost/vali')
-
-# Train/test/validations splits in a pickled dicct
-VECTOR_DATA_SPLIT = os.path.join(DATA_DIR, 'vector_splits.pkl')
-
-# Map from feature id to feature name
-SVMRANK_LABELS = os.path.join(DATA_DIR, 'svmrank_labels.txt')
-
-os_utils._create_dirs([
-    DATA_DIR,
-    TMP_DIR
-])
diff --git a/poc/data_prepare.py b/poc/data_prepare.py
deleted file mode 100644
index 98cfcb3..0000000
--- a/poc/data_prepare.py
+++ /dev/null
@@ -1,109 +0,0 @@
-import config
-from utils import spark_utils
-
-def prepare_data(hive):
-    hive.sql("CREATE TEMPORARY FUNCTION stemmer AS 
'org.wikimedia.analytics.refinery.hive.StemmerUDF'")
-
-    # Choose a random selection of queries that have more than N results
-    rand_selection = hive.sql("""
-       SELECT
-           x.project, x.norm_query
-       FROM (
-           SELECT
-               project,
-               STEMMER(query, SUBSTR(wikiid, 1, 2)) as norm_query,
-               count(distinct year, month, day, session_id) as num_searchs
-           FROM
-               discovery.query_clicks_daily
-           WHERE
-               year >= 2016
-               AND project = '%s'
-           GROUP BY
-               project,
-               STEMMER(query, SUBSTR(wikiid, 1, 2))
-           ) x
-       WHERE
-           x.num_searchs >= %d
-       DISTRIBUTE BY
-           rand()
-       SORT BY
-           rand()
-       LIMIT
-           %d
-    """ % (config.WIKI_PROJECT, config.MIN_NUM_SEARCHES, config.MAX_QUERIES))
-
-    hive.registerDataFrameAsTable(rand_selection, "rand_selection")
-
-    # Find all the data for our random set of queries
-    rand_set = hive.sql("""
-        SELECT
-            TRIM(query_clicks_daily.query) as query,
-            STEMMER(query_clicks_daily.query, 
SUBSTR(query_clicks_daily.wikiid, 1, 2)) AS norm_query,
-            query_clicks_daily.timestamp as search_timestamp,
-            query_clicks_daily.wikiid,
-            query_clicks_daily.hits,
-            query_clicks_daily.clicks,
-            CONCAT_WS('_', query_clicks_daily.session_id, CAST(year AS 
string), CAST(month AS string), CAST(day AS string)) AS session_id
-        FROM
-            discovery.query_clicks_daily
-        JOIN
-            rand_selection
-        ON
-            rand_selection.norm_query = STEMMER(query_clicks_daily.query, 
SUBSTR(query_clicks_daily.wikiid, 1, 2))
-            AND rand_selection.project = query_clicks_daily.project
-        WHERE
-            year >= 2016
-    """)
-    hive.registerDataFrameAsTable(rand_set, "rand_set")
-
-    # Break down the data into per-hit data. This is needed because a single 
session
-    # can see the same hit for a query multiple times, as we re-run the search 
when a
-    # user does: click -> read, unsatisfied -> go back -> click different 
result
-    per_hit = hive.sql("""
-        SELECT
-            rand_set.query,
-            rand_set.norm_query,
-            rand_set.search_timestamp,
-            rand_set.wikiid,
-            rand_set.session_id,
-            click.pageid as click_page_id,
-            click.timestamp as click_timestamp,
-            hit.title as hit_title,
-            hit.pageid as hit_page_id,
-            hit.score as hit_score,
-            hit_position
-        FROM
-            rand_set
-        LATERAL VIEW
-            EXPLODE(rand_set.clicks) c as click
-        LATERAL VIEW
-            POSEXPLODE(rand_set.hits) h as hit_position, hit
-    """)
-    hive.registerDataFrameAsTable(per_hit, "per_hit")
-
-    # re-group the per-hit data into sessions, this time with a single data 
point per-hit
-    return hive.sql("""
-        SELECT
-            query,
-            norm_query,
-            session_id,
-            hit_page_id,
-            hit_title,
-            AVG(hit_score) AS hit_score,
-            AVG(hit_position) AS hit_position,
-            ARRAY_CONTAINS(COLLECT_LIST(click_page_id), hit_page_id) AS clicked
-        FROM
-            per_hit
-        GROUP BY
-            query,
-            norm_query,
-            session_id,
-            hit_page_id,
-            hit_title
-    """)
-
-if __name__ == "__main__":
-    sc, hive = spark_utils._init("LTR: prepare")
-
-    # Write out a raw copy of data we will work with
-    prepare_data(hive).write.parquet(config.CLICK_DATA)
diff --git a/poc/data_process_features.py b/poc/data_process_features.py
deleted file mode 100644
index c7d0beb..0000000
--- a/poc/data_process_features.py
+++ /dev/null
@@ -1,273 +0,0 @@
-import json
-import os
-import subprocess
-import base64
-
-from pyspark.streaming.kafka import KafkaUtils, OffsetRange
-import kafka
-import kafka.common
-
-import config
-from utils import spark_utils
-
-KAFKA_BOOTSTRAP = 'kafka1012.eqiad.wmnet:9092'
-KAFKA_WORK_LOG = 'relforge_queries'
-KAFKA_RESULT_LOG = 'relforge_results'
-
-RELFORGE_INDEX='{"index": "crosswiki_enwiki_content"}'
-
-def wrap_with_page_ids(hit_page_ids, should):
-    if type(should) is not list:
-        should = [should]
-    if len(hit_page_ids) >= 9999:
-        raise ValueError("Too many page ids: %d" % (len(hit_page_ids)))
-    return json.dumps({
-        "_source": False,
-        "from": 0,
-        "size": 9999,
-        "query": {
-            "bool": {
-                "filter": {
-                    'ids': {
-                        'values': map(str, set(hit_page_ids)),
-                    }
-                },
-                "should": should,
-                "disable_coord": True,
-            }
-        }
-    })
-
-class ScriptFeature(object):
-    def __init__(self, name, script, lang='expression'):
-        self.name = name
-        self.script = script
-        self.lang = lang
-
-    def make_query(self, query):
-        return {
-            "function_score": {
-                "score_mode": "sum",
-                "boost_mode": "sum",
-                "functions": [
-                    {
-                        "script_score": {
-                            "script": {
-                                "inline": self.script,
-                                "lang": self.lang,
-                            }
-                        }
-                    }
-                ]
-            }
-        }
-
-
-class MultiMatchFeature(object):
-    def __init__(self, name, fields, minimum_should_match=1, 
match_type="most_fields"):
-        self.name = name
-        assert len(fields) > 0
-        self.fields = fields
-        self.minimum_should_match = minimum_should_match
-        self.match_type = match_type
-
-    def make_query(self, query):
-        return {
-            "multi_match": {
-                "query": query,
-                "minimum_should_match": self.minimum_should_match,
-                "type": self.match_type,
-                "fields": self.fields,
-            }
-        }
-
-class DisMaxFeature(object):
-    def __init__(self, name, features):
-        self.name = name
-        assert len(features) > 0
-        self.features = features
-
-    def make_query(self, query):
-        return {
-            "dis_max": {
-                "queries": [f.make_query(query) for f in self.features]
-            }
-        }
-
-def gen_produce_partition(features, run_id):
-    def f(rows):
-        producer = kafka.KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP)
-        i = 0
-        for row in rows:
-            bulk_query = []
-            try:
-                for feature in features:
-                    bulk_query.append(RELFORGE_INDEX)
-                    bulk_query.append(wrap_with_page_ids(row.hit_page_ids, 
feature.make_query(row.query)))
-            except ValueError:
-                # TODO How to send error logs back?
-                continue
-            producer.send(KAFKA_WORK_LOG, json.dumps({
-                'run_id': run_id,
-                'req_id': row.query,
-                'request': "\n".join(bulk_query)
-            }))
-            i += 1
-            if i % 100 == 0:
-                producer.flush()
-    return f
-
-
-def produce_queries(hive, features, run_id):
-    """
-    Dump out the click data as es queries, to be sent to relforge
-    and the feature logs collected
-    """
-
-    query = """
-        SELECT query, COLLECT_SET(hit_page_id) as hit_page_ids
-        FROM click_data
-        GROUP BY query
-    """
-    hive.read.parquet(config.CLICK_DATA).registerTempTable('click_data')
-    hive.sql(query).foreachPartition(gen_produce_partition(features, run_id))
-    # Send a sigil value to indicate this run is complete. The consumer will 
copy this
-    # into KAFKA_RESULT_LOG so we know it's done.
-    # TODO: Certainly only works with a single partition and consumer. Do we 
also have
-    # potential timing issues?
-    producer = kafka.KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP)
-    future = producer.send(KAFKA_WORK_LOG, json.dumps({
-        'run_id': run_id,
-        'complete': True,
-    }))
-    record = future.get()
-    print("Sendt end sigil at offset %d" % (record.offset))
-
-
-
-def get_offset_start():
-    # boldly assume single partition, for now
-    kafkaClient = kafka.SimpleClient([KAFKA_BOOTSTRAP])
-    # -1 means 'the offset of the next coming message'
-    reqs = [kafka.common.OffsetRequestPayload(KAFKA_RESULT_LOG, 0, -1, 1)]
-    resps = kafkaClient.send_offset_request(reqs)
-    offsets = {}
-    for resp in resps:
-        return resp.offsets[0]
-    return None
-
-
-def get_offset_end(offset_start, run_id):
-    # boldly assume single partition, for now
-    # Start up a consumer to wait for our completion sigil
-    consumer = kafka.KafkaConsumer(bootstrap_servers=[KAFKA_BOOTSTRAP],
-                                   auto_offset_reset='latest',
-                                   value_deserializer=json.loads)
-    tp = kafka.TopicPartition(KAFKA_RESULT_LOG, 0)
-    consumer.assign([tp])
-    consumer.seek(tp, offset_start)
-    for message in consumer:
-        if 'run_id' not in message.value:
-            pass
-        if message.value['run_id'] == run_id and 'complete' in message.value:
-            return message.offset
-    return None
-
-
-def gen_receive(features, run_id):
-    expected_count = len(features)
-    # Receives messages sent back from relforge
-    def f(parsed):
-        if not parsed['run_id'] == run_id:
-            # Some other process working at same time as us
-            return []
-        if not 'status_code' in parsed:
-            # Some other kind of message, perhaps end sigil?
-            return []
-        if not parsed['status_code'] == 200:
-            # TODO: error handling
-            return []
-        if not 'responses' in parsed:
-            # TODO: error handling
-            return []
-        query = parsed['req_id']
-        # Parse result of elasticsearch into a feature vector
-        features = {}
-        for response in parsed['responses']:
-            # TODO: Check response['status'] ? if not 200 then what?
-            for hit in response['hits']['hits']:
-                page_id = hit['_id']
-                if page_id not in features:
-                    features[page_id] = []
-                features[page_id].append(hit['_score'])
-        for page_id in features.keys():
-            assert len(features[page_id]) == expected_count
-        return [[int(page_id), query] + v for (page_id, v) in 
features.iteritems()]
-    return f
-
-
-def collect_results(sc, offset_start, offset_end, features, run_id):
-    offset_ranges = [OffsetRange(KAFKA_RESULT_LOG, 0, offset_start, 
offset_end)]
-    kafka_params = {"metadata.broker.list": KAFKA_BOOTSTRAP}
-    col_names = ['hit_page_id', 'query'] + ['feature_%s' % (f.name) for f in 
features]
-    return (KafkaUtils.createRDD(sc, kafka_params, offset_ranges)
-        .map(lambda (k, v): json.loads(v))
-        .flatMap(gen_receive(features, run_id))
-        .toDF(col_names)
-    )
-
-
-def main():
-    sc, hive = spark_utils._init("LTR: process")
-
-    # Build the set of features we will collect vectors for
-    features = [
-        MultiMatchFeature('title', ["title.plain^1", "title^3"]),
-        MultiMatchFeature('category', ["category.plain^1", "category^3"]),
-        MultiMatchFeature('heading', ["heading.plain^1", "heading^3"]),
-        MultiMatchFeature('auxiliary_text', ["auxiliary_text.plain^1", 
"auxiliary_text^3"]),
-        MultiMatchFeature('file_text', ["file_text.plain^1", "file_text^3"]),
-        DisMaxFeature('redirect_or_suggest_dismax', [
-            MultiMatchFeature(None, ["redirect.title.plain^1", 
"redirect.title^3"]),
-            MultiMatchFeature(None, ["suggest"]),
-        ]),
-        DisMaxFeature('text_or_opening_text_dismax', [
-            MultiMatchFeature(None, ["text.plain^1", "text^3"]),
-            MultiMatchFeature(None, ["opening_text.plain^1", 
"opening_text^3"]),
-        ]),
-        MultiMatchFeature('all_near_match', ["all_near_match^2"]),
-        ScriptFeature("popularity_score", "pow(doc['popularity_score'].value , 
0.8) / ( pow(doc['popularity_score'].value, 0.8) + pow(8.0E-6,0.8))"),
-        ScriptFeature("incoming_links", "pow(doc['incoming_links'].value , 
0.7) / ( pow(doc['incoming_links'].value, 0.7) + pow(30,0.7))"),
-    ]
-
-    # Generate a random ID for this run
-    run_id = base64.b64encode(os.urandom(16))
-    print("Starting up feature collection with run_id: %s" % (run_id))
-
-    offset_start = get_offset_start()
-    if offset_start is None:
-        print("Failed to find starting offset ?!?!?")
-        return
-    print("Start offset for results: %d" % (offset_start))
-
-    # start up the producer to send all our records to the kafka work log
-    print("Producing queries to kafka")
-    produce_queries(hive, features, run_id)
-    print("Finished producing queries, waiting for completion")
-
-    # start up a consumer to detect the end offset
-    offset_end = get_offset_end(offset_start, run_id)
-    if offset_end is None:
-        print("Failed to find ending offset ?!?!?")
-        return
-    print("End offset for results: %d" % (offset_end))
-    print("Collecting results")
-    df = collect_results(sc, offset_start, offset_end, features, run_id)
-    print("Writing out collected features to hdfs")
-    df.write.parquet(config.FEATURE_LOGS)
-
-
-
-if __name__ == "__main__":
-    main()
-
diff --git a/poc/gen_splits.py b/poc/gen_splits.py
deleted file mode 100644
index f5723f7..0000000
--- a/poc/gen_splits.py
+++ /dev/null
@@ -1,144 +0,0 @@
-from pyspark.sql.types import IntegerType
-from pyspark.sql.functions import UserDefinedFunction
-
-import config
-from utils import spark_utils
-
-MAX_EXECUTORS = 400
-NUM_PARTITIONS = 800
-
-def rel_to_label(relevance):
-    if relevance > 0.9:
-        return 4
-    elif relevance > 0.75:
-        return 3
-    elif relevance > 0.5:
-        return 2
-    elif relevance > 0.3:
-        return 1
-    else:
-        return 0
-
-def split_partition(iterator):
-    train_count = 0
-    test_count = 0
-    vali_count = 0
-    splits = []
-    # starting at 1 prevents div by zero
-    processed = 1
-    for row in iterator:
-        # count is an unusable name directly because there is also a count
-        # function on tuple, which Row extends.
-        count = row.asDict()['count']
-        if float(train_count) / processed < .6:
-            splits.append((row.norm_query, 0))
-            train_count += count
-        elif float(test_count) / processed < .2:
-            splits.append((row.norm_query, 1))
-            test_count += count
-        elif float(vali_count) / processed < .2:
-            splits.append((row.norm_query, 2))
-            vali_count += count
-        else:
-            splits.append((row.norm_query, 0))
-            train_count += count
-        processed += count
-    return splits
-
-def write_split(split, fname):
-    codec = 'org.apache.hadoop.io.compress.GzipCodec'
-    def write_one(col):
-        (split
-            .map(lambda row: row.__getattr__(col))
-            .saveAsTextFile("%s/%s" % (fname, col), codec)
-        )
-
-    write_one('data')
-    write_one('weight')
-    write_one('qid')
-
-def main():
-    sc, hive = spark_utils._init("LTR: gen splits", {
-        "spark.sql.shuffle.partitions": NUM_PARTITIONS,
-        "spark.dynamicAllocation.maxExecutors": MAX_EXECUTORS,
-    })
-
-    vector_data = (hive.read.parquet(config.VECTOR_DATA)
-        .repartition(NUM_PARTITIONS)
-        # This persist saves a couple shuffles and disk reads.
-        # minor win but seems worthwhile
-        .persist()
-    )
-
-    # We don't want randomized splits, we want to make sure that each
-    # norm_query is only within a single split, which makes this a bit
-    # annoying. Additionally we would like our 60/20/20 split to have an
-    # equal number of rows, rather than an equal number of norm_query
-    # values.
-    splits = (vector_data
-        .groupBy('norm_query')
-        .count()
-        # Sorting ensures consistent results for future runs
-        .sort('norm_query')
-        # partitions need to be a minimum size for splits to be balanced
-        .coalesce(100)
-        .mapPartitions(split_partition)
-        .toDF(['norm_query', 'split'])
-    )
-
-    query_ids = (vector_data
-        .select('query')
-        .drop_duplicates()
-        .rdd.map(lambda r: r.query)
-        .zipWithUniqueId()
-        .toDF(['query', 'query_id'])
-    )
-
-    # sort feature cols so we have a simple way to figure out the
-    # feature name -> feature id map later
-    feature_cols = sorted(vector_data.columns)
-    feature_cols.remove('query')
-    feature_cols.remove('norm_query')
-    feature_cols.remove('hit_page_id')
-    feature_cols.remove('relevance')
-    feature_cols.remove('weight')
-
-    pre_split = (vector_data
-        .join(query_ids, 'query', how='inner')
-        .join(splits, 'norm_query', how='inner')
-        .map(lambda row: ((
-            "%d %s" % (
-                rel_to_label(row.relevance),
-                " ".join(['%d:%f' % (1+i, row.__getattr__(feat))
-                          for i, feat in enumerate(feature_cols)])),
-            row.weight,
-            row.query_id,
-            row.split
-        )))
-        .toDF(['data', 'weight', 'qid', 'split'])
-        # rows with same qid need to be output together, sorting seems
-        # a reasonable way to make that happen.
-        .sort('qid')
-        # The data size isn't actually that big by this point, cut down to
-        # 20 partitions to write out to disk.
-        # @TODO All packages we feed this into only support a single input
-        # file, perhaps instead we should use .toLocalIterator() and write
-        # them all out locally?
-        .coalesce(20)
-        # we need to write out 9 separate things from this one result, persist
-        # so we don't re-do the above work which is pretty expensive
-        .persist()
-    )
-
-    train = pre_split.filter(pre_split.split == 0)
-    test = pre_split.filter(pre_split.split == 1)
-    vali = pre_split.filter(pre_split.split == 2)
-
-    write_split(train, config.LIGHTGBM_TRAIN_DATA)
-    write_split(test, config.LIGHTGBM_TEST_DATA)
-    write_split(vali, config.LIGHTGBM_VALI_DATA)
-
-
-if __name__ == "__main__":
-    main()
-
diff --git a/poc/merge_vector_data.py b/poc/merge_vector_data.py
deleted file mode 100644
index 008c41b..0000000
--- a/poc/merge_vector_data.py
+++ /dev/null
@@ -1,36 +0,0 @@
-import pyspark.sql.functions as F
-
-import config
-from utils import spark_utils
-
-def main():
-    sc, hive = spark_utils._init("LTR: Merge")
-
-    dfSource = hive.read.parquet(config.CLICK_DATA)
-    # Convert duplicates into weights
-    dfWeight = (dfSource
-        .groupBy('query', 'hit_page_id')
-        .agg(
-            dfSource.query, 
-            dfSource.hit_page_id,
-            F.count('query').alias('weight')
-        )
-    )
-
-    dfFeature = hive.read.parquet(config.FEATURE_LOGS)
-    dfRelevance = hive.read.parquet(config.DBN_RELEVANCE)
-
-    (dfSource
-        .select('query', 'norm_query', 'hit_page_id')
-        .dropDuplicates()
-        .join(dfRelevance, on=['norm_query', 'hit_page_id'], how='inner')
-        .join(dfWeight, on=['query', 'hit_page_id'], how='inner')
-        .join(dfFeature, on=['query', 'hit_page_id'], how='inner')
-        .write.parquet(config.VECTOR_DATA)
-    )
-
-
-
-
-if __name__ == "__main__":
-    main()
diff --git a/poc/parse_feature_logs.py b/poc/parse_feature_logs.py
deleted file mode 100644
index a5426f3..0000000
--- a/poc/parse_feature_logs.py
+++ /dev/null
@@ -1,29 +0,0 @@
-import json
-
-import config
-from utils import spark_utils
-
-
-PREFIX_LEN = len("[2017-01-11 19:15:15,020][INFO 
][org.wikimedia.search.ltr.FeatureLogger] ")
-
-def main():
-    sc, hive = spark_utils._init("LTR: Parse Logs")
-
-    rawFeatureLogs = (sc.textFile(config.RAW_FEATURE_LOGS)
-        # Strip the prefix and parse the json
-        .map(lambda line: json.loads(line[PREFIX_LEN:]))
-    )
-
-    featureList = rawFeatureLogs.take(1)[0]['vec'].keys()
-
-    (rawFeatureLogs.map(lambda log: [
-            int(log['_id']),
-            log['_marker'],
-        ] + [log['vec'][featName] for featName in featureList])
-        .toDF(['hit_page_id', 'query'] + featureList)
-        .write.parquet(config.FEATURE_LOGS)
-    )
-
-
-if __name__ == "__main__":
-    main()
diff --git a/poc/utils/__init__.py b/poc/utils/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/poc/utils/__init__.py
+++ /dev/null
diff --git a/poc/utils/os_utils.py b/poc/utils/os_utils.py
deleted file mode 100644
index 6968873..0000000
--- a/poc/utils/os_utils.py
+++ /dev/null
@@ -1,9 +0,0 @@
-import os
-import time
-import shutil
-
-def _create_dirs(dirs):
-    for dir in dirs:
-        if not os.path.exists(dir):
-            os.makedirs(dir)
-
diff --git a/poc/utils/spark_utils.py b/poc/utils/spark_utils.py
deleted file mode 100644
index 99a78e0..0000000
--- a/poc/utils/spark_utils.py
+++ /dev/null
@@ -1,15 +0,0 @@
-from pyspark import SparkContext, SparkConf
-from pyspark.sql import HiveContext
-
-def _init(appName, conf=None):
-    if type(conf) == dict:
-        properties = conf
-        conf = SparkConf()
-        for k, v in properties.iteritems():
-            conf.set(k, v)
-    sc = SparkContext(appName=appName, conf=conf)
-    sc.setLogLevel("WARN")
-    sc.addFile('hdfs://analytics-hadoop/user/hive/hive-site.xml')
-    hive = HiveContext(sc)
-
-    return (sc, hive)

-- 
To view, visit https://gerrit.wikimedia.org/r/384850
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I21a504b34f3f22a4cc907041b838d584585602d2
Gerrit-PatchSet: 1
Gerrit-Project: search/MjoLniR
Gerrit-Branch: master
Gerrit-Owner: EBernhardson <[email protected]>
Gerrit-Reviewer: DCausse <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to