[ 
https://issues.apache.org/jira/browse/SPARK-24324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bryan Cutler updated SPARK-24324:
---------------------------------
    Summary: Pandas Grouped Map UserDefinedFunction mixes column labels  (was: 
UserDefinedFunction mixes column labels)

> Pandas Grouped Map UserDefinedFunction mixes column labels
> ----------------------------------------------------------
>
>                 Key: SPARK-24324
>                 URL: https://issues.apache.org/jira/browse/SPARK-24324
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.3.0
>         Environment: Python (using virtualenv):
> {noformat}
> $ python --version                 
> Python 3.6.5
> {noformat}
> Modules installed:
> {noformat}
> arrow==0.12.1
> backcall==0.1.0
> bleach==2.1.3
> chardet==3.0.4
> decorator==4.3.0
> entrypoints==0.2.3
> findspark==1.2.0
> html5lib==1.0.1
> ipdb==0.11
> ipykernel==4.8.2
> ipython==6.3.1
> ipython-genutils==0.2.0
> ipywidgets==7.2.1
> jedi==0.12.0
> Jinja2==2.10
> jsonschema==2.6.0
> jupyter==1.0.0
> jupyter-client==5.2.3
> jupyter-console==5.2.0
> jupyter-core==4.4.0
> MarkupSafe==1.0
> mistune==0.8.3
> nbconvert==5.3.1
> nbformat==4.4.0
> notebook==5.5.0
> numpy==1.14.3
> pandas==0.22.0
> pandocfilters==1.4.2
> parso==0.2.0
> pbr==3.1.1
> pexpect==4.5.0
> pickleshare==0.7.4
> progressbar2==3.37.1
> prompt-toolkit==1.0.15
> ptyprocess==0.5.2
> pyarrow==0.9.0
> Pygments==2.2.0
> python-dateutil==2.7.2
> python-utils==2.3.0
> pytz==2018.4
> pyzmq==17.0.0
> qtconsole==4.3.1
> Send2Trash==1.5.0
> simplegeneric==0.8.1
> six==1.11.0
> SQLAlchemy==1.2.7
> stevedore==1.28.0
> terminado==0.8.1
> testpath==0.3.1
> tornado==5.0.2
> traitlets==4.3.2
> virtualenv==15.1.0
> virtualenv-clone==0.2.6
> virtualenvwrapper==4.7.2
> wcwidth==0.1.7
> webencodings==0.5.1
> widgetsnbextension==3.2.1
> {noformat}
>  
> Java:
> {noformat}
> $ java -version 
>  java version "1.8.0_171"
>  Java(TM) SE Runtime Environment (build 1.8.0_171-b11)
>  Java HotSpot(TM) 64-Bit Server VM (build 25.171-b11, mixed mode){noformat}
> System:
> {noformat}
> $ lsb_release -a
> No LSB modules are available.
> Distributor ID:       Ubuntu
> Description:  Ubuntu 16.04.4 LTS
> Release:      16.04
> Codename:     xenial
> {noformat}
>            Reporter: Cristian Consonni
>            Priority: Major
>
> I am working on Wikipedia page views (see [task T188041 on Wikimedia's 
> Pharicator|https://phabricator.wikimedia.org/T188041]). For simplicity, let's 
> say that these are the data:
> {noformat}
> <lang> <page> <timestamp> <views>
> {noformat}
> For each combination of (lang, page, day(timestamp)) I need to transform  the 
>  views for each hour:
> {noformat}
> 00:00 -> A
> 01:00 -> B
> ...
> {noformat}
> and concatenate the number of views for that hour.  So, if a page got 5 views 
> at 00:00 and 7 views at 01:00 it would become:
> {noformat}
> A5B7
> {noformat}
>  
> I have written a UDF called {code:python}concat_hours{code}
> However, the function is mixing the columns and I am not sure what is going 
> on. I wrote here a minimal complete example that reproduces the issue on my 
> system (the details of my environment are above).
> {code:python}
> #!/usr/bin/env python3
> # coding: utf-8
> input_data = b"""en Albert_Camus 20071210-000000 150
> en Albert_Camus 20071210-010000 148
> en Albert_Camus 20071210-020000 197
> en Albert_Camus 20071211-200000 145
> en Albert_Camus 20071211-210000 131
> en Albert_Camus 20071211-220000 154
> en Albert_Camus 20071211-230001 142
> en Albert_Caquot 20071210-020000 1
> en Albert_Caquot 20071210-020000 1
> en Albert_Caquot 20071210-040001 1
> en Albert_Caquot 20071211-060000 1
> en Albert_Caquot 20071211-080000 1
> en Albert_Caquot 20071211-150000 3
> en Albert_Caquot 20071211-210000 1"""
> import tempfile
> fp = tempfile.NamedTemporaryFile()
> fp.write(input_data)
> fp.seek(0)
> import findspark
> findspark.init()
> import pyspark
> from pyspark.sql.types import StructType, StructField
> from pyspark.sql.types import StringType, IntegerType, TimestampType
> from pyspark.sql import functions
> sc = pyspark.SparkContext(appName="udf_example")
> sqlctx = pyspark.SQLContext(sc)
> schema = StructType([StructField("lang", StringType(), False),
>                      StructField("page", StringType(), False),
>                      StructField("timestamp", TimestampType(), False),
>                      StructField("views", IntegerType(), False)])
> df = sqlctx.read.csv(fp.name,
>                      header=False,
>                      schema=schema,
>                      timestampFormat="yyyyMMdd-HHmmss",
>                      sep=' ')
> df.count()
> df.dtypes
> df.show()
> new_schema = StructType([StructField("lang", StringType(), False),
>                          StructField("page", StringType(), False),
>                          StructField("day", StringType(), False),
>                          StructField("enc", StringType(), False)])
> from pyspark.sql.functions import pandas_udf, PandasUDFType
> import pandas as pd
> hour_to_letter = ['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O',
>                   'P','Q','R','S','T','U','V','W','X']
> @pandas_udf(new_schema, PandasUDFType.GROUPED_MAP)
> def concat_hours(x):
>     view_hours = x['hour'].tolist()
>     view_views = x['views'].tolist()
>     view_hours_letters = [hour_to_letter[h] for h in view_hours]
>     encoded_views = [l + str(h)
>                      for l, h in sorted(zip(view_hours_letters,view_views))]
>     encoded_views_string = ''.join(encoded_views)
>     # return pd.DataFrame({'enc': x.page, 'day': x.lang, 'lang': x.day,
>     #                      'page': encoded_views_string}, index=[x.index[0]])
>     return pd.DataFrame({'page': x.page, 'lang': x.lang,'day': x.day,
>                          'enc': encoded_views_string}, index=[x.index[0]])
> from pyspark.sql import functions
> grouped_df = (df.select(['lang',
>                          'page',
>                          functions.date_format('timestamp','yyyy-MM-dd')\
>                                   .alias('day'), 
>                          functions.hour('timestamp').alias('hour'), 
>                          'views'
>                          ])
>                 .groupby(['lang','page','day'])
>                 )
> grouped_df = (grouped_df.apply(concat_hours)
>                         .dropDuplicates()
>                         )
> grouped_df.show()
> {code}
>  
> This is what I am getting:
> {noformat}
> $ ./udf_example.py
> 2018-05-20 05:13:23 WARN  Utils:66 - Your hostname, inara resolves to a 
> loopback address: 127.0.1.1; using 10.109.49.111 instead (on interface wlp2s0)
> 2018-05-20 05:13:23 WARN  Utils:66 - Set SPARK_LOCAL_IP if you need to bind 
> to another address
> 2018-05-20 05:13:23 WARN  NativeCodeLoader:62 - Unable to load native-hadoop 
> library for your platform... using builtin-java classes where applicable
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
> setLogLevel(newLevel).
> 2018-05-20 05:13:24 WARN  Utils:66 - Service 'SparkUI' could not bind on port 
> 4040. Attempting port 4041.
> +----+-------------+-------------------+-----+
> |lang|         page|          timestamp|views|
> +----+-------------+-------------------+-----+
> |  en| Albert_Camus|2007-12-10 00:00:00|  150|
> |  en| Albert_Camus|2007-12-10 01:00:00|  148|
> |  en| Albert_Camus|2007-12-10 02:00:00|  197|
> |  en| Albert_Camus|2007-12-11 20:00:00|  145|
> |  en| Albert_Camus|2007-12-11 21:00:00|  131|
> |  en| Albert_Camus|2007-12-11 22:00:00|  154|
> |  en| Albert_Camus|2007-12-11 23:00:01|  142|
> |  en|Albert_Caquot|2007-12-10 02:00:00|    1|
> |  en|Albert_Caquot|2007-12-10 02:00:00|    1|
> |  en|Albert_Caquot|2007-12-10 04:00:01|    1|
> |  en|Albert_Caquot|2007-12-11 06:00:00|    1|
> |  en|Albert_Caquot|2007-12-11 08:00:00|    1|
> |  en|Albert_Caquot|2007-12-11 15:00:00|    3|
> |  en|Albert_Caquot|2007-12-11 21:00:00|    1|
> +----+-------------+-------------------+-----+
> +----------+----------------+---+-------------+                               
>   
> |      lang|            page|day|          enc|
> +----------+----------------+---+-------------+
> |2007-12-10|    A150B148C197| en| Albert_Camus|
> |2007-12-11|        G1I1P3V1| en|Albert_Caquot|
> |2007-12-10|          C1C1E1| en|Albert_Caquot|
> |2007-12-11|U145V131W154X142| en| Albert_Camus|
> +----------+----------------+---+-------------+
> {noformat}
> Of course what I am expecting is:
> {noformat}
> +----+-------------+----------+----------------+                              
>   
> |lang|         page|       day|             enc|
> +----+-------------+----------+----------------+
> |  en|Albert_Caquot|2007-12-11|        G1I1P3V1|
> |  en|Albert_Caquot|2007-12-10|          C1C1E1|
> |  en| Albert_Camus|2007-12-10|    A150B148C197|
> |  en| Albert_Camus|2007-12-11|U145V131W154X142|
> +----+-------------+----------+----------------+
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to