Cristian Consonni created SPARK-24324:
-----------------------------------------

             Summary: UserDefinedFunction mixes column labels
                 Key: SPARK-24324
                 URL: https://issues.apache.org/jira/browse/SPARK-24324
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 2.3.0
         Environment: Python (using virtualenv):
{noformat}
$ python --version                 
Python 3.6.5
{noformat}
Modules installed:
{noformat}
arrow==0.12.1
backcall==0.1.0
bleach==2.1.3
chardet==3.0.4
decorator==4.3.0
entrypoints==0.2.3
findspark==1.2.0
html5lib==1.0.1
ipdb==0.11
ipykernel==4.8.2
ipython==6.3.1
ipython-genutils==0.2.0
ipywidgets==7.2.1
jedi==0.12.0
Jinja2==2.10
jsonschema==2.6.0
jupyter==1.0.0
jupyter-client==5.2.3
jupyter-console==5.2.0
jupyter-core==4.4.0
MarkupSafe==1.0
mistune==0.8.3
nbconvert==5.3.1
nbformat==4.4.0
notebook==5.5.0
numpy==1.14.3
pandas==0.22.0
pandocfilters==1.4.2
parso==0.2.0
pbr==3.1.1
pexpect==4.5.0
pickleshare==0.7.4
progressbar2==3.37.1
prompt-toolkit==1.0.15
ptyprocess==0.5.2
pyarrow==0.9.0
Pygments==2.2.0
python-dateutil==2.7.2
python-utils==2.3.0
pytz==2018.4
pyzmq==17.0.0
qtconsole==4.3.1
Send2Trash==1.5.0
simplegeneric==0.8.1
six==1.11.0
SQLAlchemy==1.2.7
stevedore==1.28.0
terminado==0.8.1
testpath==0.3.1
tornado==5.0.2
traitlets==4.3.2
virtualenv==15.1.0
virtualenv-clone==0.2.6
virtualenvwrapper==4.7.2
wcwidth==0.1.7
webencodings==0.5.1
widgetsnbextension==3.2.1
{noformat}
 

Java:
{noformat}
$ java -version 
 java version "1.8.0_171"
 Java(TM) SE Runtime Environment (build 1.8.0_171-b11)
 Java HotSpot(TM) 64-Bit Server VM (build 25.171-b11, mixed mode){noformat}
System:
{noformat}
$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 16.04.4 LTS
Release:        16.04
Codename:       xenial
{noformat}
            Reporter: Cristian Consonni


I am working on Wikipedia page views (see [task T188041 on Wikimedia's 
Pharicator|[https://phabricator.wikimedia.org/T188041]). For simplicity, let's 
say that these are the data:
{noformat}
<lang> <page> <timestamp> <views>
{noformat}

For each combination of (lang, page, day) I need to transform the  views for 
each hour:
{noformat}
00:00 -> A
01:00 -> B
...
{noformat}

I have written a UDF called {code:python}concat_hours{code}.

However, the function is mixing the columns and I am not sure what is going on.

{code:python}
#!/usr/bin/env python3
# coding: utf-8

input_data = b"""en Albert_Camus 20071210-000000 150
en Albert_Camus 20071210-010000 148
en Albert_Camus 20071210-020000 197
en Albert_Camus 20071211-200000 145
en Albert_Camus 20071211-210000 131
en Albert_Camus 20071211-220000 154
en Albert_Camus 20071211-230001 142
en Albert_Caquot 20071210-020000 1
en Albert_Caquot 20071210-020000 1
en Albert_Caquot 20071210-040001 1
en Albert_Caquot 20071211-060000 1
en Albert_Caquot 20071211-080000 1
en Albert_Caquot 20071211-150000 3
en Albert_Caquot 20071211-210000 1"""

import tempfile

fp = tempfile.NamedTemporaryFile()
fp.write(input_data)

fp.seek(0)

import findspark
findspark.init()

import pyspark
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, IntegerType, TimestampType
from pyspark.sql import functions

sc = pyspark.SparkContext(appName="udf_example")
sqlctx = pyspark.SQLContext(sc)

schema = StructType([StructField("lang", StringType(), False),
                     StructField("page", StringType(), False),
                     StructField("timestamp", TimestampType(), False),
                     StructField("views", IntegerType(), False)])

df = sqlctx.read.csv(fp.name,
                     header=False,
                     schema=schema,
                     timestampFormat="yyyyMMdd-HHmmss",
                     sep=' ')

df.count()
df.dtypes
df.show()

new_schema = StructType([StructField("lang", StringType(), False),
                         StructField("page", StringType(), False),
                         StructField("day", StringType(), False),
                         StructField("enc", StringType(), False)])

from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd

hour_to_letter = ['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O',
                  'P','Q','R','S','T','U','V','W','X']

@pandas_udf(new_schema, PandasUDFType.GROUPED_MAP)
def concat_hours(x):
    view_hours = x['hour'].tolist()
    view_views = x['views'].tolist()

    view_hours_letters = [hour_to_letter[h] for h in view_hours]

    encoded_views = [l + str(h)
                     for l, h in sorted(zip(view_hours_letters,view_views))]
    encoded_views_string = ''.join(encoded_views)

    # return pd.DataFrame({'enc': x.page, 'day': x.lang, 'lang': x.day,
    #                      'page': encoded_views_string}, index=[x.index[0]])

    return pd.DataFrame({'page': x.page, 'lang': x.lang,'day': x.day,
                         'enc': encoded_views_string}, index=[x.index[0]])


from pyspark.sql import functions
grouped_df = (df.select(['lang',
                         'page',
                         functions.date_format('timestamp','yyyy-MM-dd')\
                                  .alias('day'), 
                         functions.hour('timestamp').alias('hour'), 
                         'views'
                         ])
                .groupby(['lang','page','day'])
                )

grouped_df = (grouped_df.apply(concat_hours)
                        .dropDuplicates()
                        )

grouped_df.show()
{code}
 
This is what I am getting:
{noformat}
$ ./udf_example.py
2018-05-20 05:13:23 WARN  Utils:66 - Your hostname, inara resolves to a 
loopback address: 127.0.1.1; using 10.109.49.111 instead (on interface wlp2s0)
2018-05-20 05:13:23 WARN  Utils:66 - Set SPARK_LOCAL_IP if you need to bind to 
another address
2018-05-20 05:13:23 WARN  NativeCodeLoader:62 - Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
2018-05-20 05:13:24 WARN  Utils:66 - Service 'SparkUI' could not bind on port 
4040. Attempting port 4041.
+----+-------------+-------------------+-----+
|lang|         page|          timestamp|views|
+----+-------------+-------------------+-----+
|  en| Albert_Camus|2007-12-10 00:00:00|  150|
|  en| Albert_Camus|2007-12-10 01:00:00|  148|
|  en| Albert_Camus|2007-12-10 02:00:00|  197|
|  en| Albert_Camus|2007-12-11 20:00:00|  145|
|  en| Albert_Camus|2007-12-11 21:00:00|  131|
|  en| Albert_Camus|2007-12-11 22:00:00|  154|
|  en| Albert_Camus|2007-12-11 23:00:01|  142|
|  en|Albert_Caquot|2007-12-10 02:00:00|    1|
|  en|Albert_Caquot|2007-12-10 02:00:00|    1|
|  en|Albert_Caquot|2007-12-10 04:00:01|    1|
|  en|Albert_Caquot|2007-12-11 06:00:00|    1|
|  en|Albert_Caquot|2007-12-11 08:00:00|    1|
|  en|Albert_Caquot|2007-12-11 15:00:00|    3|
|  en|Albert_Caquot|2007-12-11 21:00:00|    1|
+----+-------------+-------------------+-----+

+----------+----------------+---+-------------+                                 
|      lang|            page|day|          enc|
+----------+----------------+---+-------------+
|2007-12-10|    A150B148C197| en| Albert_Camus|
|2007-12-11|        G1I1P3V1| en|Albert_Caquot|
|2007-12-10|          C1C1E1| en|Albert_Caquot|
|2007-12-11|U145V131W154X142| en| Albert_Camus|
+----------+----------------+---+-------------+
{noformat}

Of course what I am expecting is:
{noformat}
+----+-------------+----------+----------------+                                
|lang|         page|       day|             enc|
+----+-------------+----------+----------------+
|  en|Albert_Caquot|2007-12-11|        G1I1P3V1|
|  en|Albert_Caquot|2007-12-10|          C1C1E1|
|  en| Albert_Camus|2007-12-10|    A150B148C197|
|  en| Albert_Camus|2007-12-11|U145V131W154X142|
+----+-------------+----------+----------------+
{noformat}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to