[ https://issues.apache.org/jira/browse/SPARK-24324?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bryan Cutler updated SPARK-24324: --------------------------------- Summary: Pandas Grouped Map UserDefinedFunction mixes column labels (was: UserDefinedFunction mixes column labels) > Pandas Grouped Map UserDefinedFunction mixes column labels > ---------------------------------------------------------- > > Key: SPARK-24324 > URL: https://issues.apache.org/jira/browse/SPARK-24324 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.3.0 > Environment: Python (using virtualenv): > {noformat} > $ python --version > Python 3.6.5 > {noformat} > Modules installed: > {noformat} > arrow==0.12.1 > backcall==0.1.0 > bleach==2.1.3 > chardet==3.0.4 > decorator==4.3.0 > entrypoints==0.2.3 > findspark==1.2.0 > html5lib==1.0.1 > ipdb==0.11 > ipykernel==4.8.2 > ipython==6.3.1 > ipython-genutils==0.2.0 > ipywidgets==7.2.1 > jedi==0.12.0 > Jinja2==2.10 > jsonschema==2.6.0 > jupyter==1.0.0 > jupyter-client==5.2.3 > jupyter-console==5.2.0 > jupyter-core==4.4.0 > MarkupSafe==1.0 > mistune==0.8.3 > nbconvert==5.3.1 > nbformat==4.4.0 > notebook==5.5.0 > numpy==1.14.3 > pandas==0.22.0 > pandocfilters==1.4.2 > parso==0.2.0 > pbr==3.1.1 > pexpect==4.5.0 > pickleshare==0.7.4 > progressbar2==3.37.1 > prompt-toolkit==1.0.15 > ptyprocess==0.5.2 > pyarrow==0.9.0 > Pygments==2.2.0 > python-dateutil==2.7.2 > python-utils==2.3.0 > pytz==2018.4 > pyzmq==17.0.0 > qtconsole==4.3.1 > Send2Trash==1.5.0 > simplegeneric==0.8.1 > six==1.11.0 > SQLAlchemy==1.2.7 > stevedore==1.28.0 > terminado==0.8.1 > testpath==0.3.1 > tornado==5.0.2 > traitlets==4.3.2 > virtualenv==15.1.0 > virtualenv-clone==0.2.6 > virtualenvwrapper==4.7.2 > wcwidth==0.1.7 > webencodings==0.5.1 > widgetsnbextension==3.2.1 > {noformat} > > Java: > {noformat} > $ java -version > java version "1.8.0_171" > Java(TM) SE Runtime Environment (build 1.8.0_171-b11) > Java HotSpot(TM) 64-Bit Server VM (build 25.171-b11, mixed mode){noformat} > System: > {noformat} > $ lsb_release -a > No LSB modules are available. > Distributor ID: Ubuntu > Description: Ubuntu 16.04.4 LTS > Release: 16.04 > Codename: xenial > {noformat} > Reporter: Cristian Consonni > Priority: Major > > I am working on Wikipedia page views (see [task T188041 on Wikimedia's > Pharicator|https://phabricator.wikimedia.org/T188041]). For simplicity, let's > say that these are the data: > {noformat} > <lang> <page> <timestamp> <views> > {noformat} > For each combination of (lang, page, day(timestamp)) I need to transform the > views for each hour: > {noformat} > 00:00 -> A > 01:00 -> B > ... > {noformat} > and concatenate the number of views for that hour. So, if a page got 5 views > at 00:00 and 7 views at 01:00 it would become: > {noformat} > A5B7 > {noformat} > > I have written a UDF called {code:python}concat_hours{code} > However, the function is mixing the columns and I am not sure what is going > on. I wrote here a minimal complete example that reproduces the issue on my > system (the details of my environment are above). > {code:python} > #!/usr/bin/env python3 > # coding: utf-8 > input_data = b"""en Albert_Camus 20071210-000000 150 > en Albert_Camus 20071210-010000 148 > en Albert_Camus 20071210-020000 197 > en Albert_Camus 20071211-200000 145 > en Albert_Camus 20071211-210000 131 > en Albert_Camus 20071211-220000 154 > en Albert_Camus 20071211-230001 142 > en Albert_Caquot 20071210-020000 1 > en Albert_Caquot 20071210-020000 1 > en Albert_Caquot 20071210-040001 1 > en Albert_Caquot 20071211-060000 1 > en Albert_Caquot 20071211-080000 1 > en Albert_Caquot 20071211-150000 3 > en Albert_Caquot 20071211-210000 1""" > import tempfile > fp = tempfile.NamedTemporaryFile() > fp.write(input_data) > fp.seek(0) > import findspark > findspark.init() > import pyspark > from pyspark.sql.types import StructType, StructField > from pyspark.sql.types import StringType, IntegerType, TimestampType > from pyspark.sql import functions > sc = pyspark.SparkContext(appName="udf_example") > sqlctx = pyspark.SQLContext(sc) > schema = StructType([StructField("lang", StringType(), False), > StructField("page", StringType(), False), > StructField("timestamp", TimestampType(), False), > StructField("views", IntegerType(), False)]) > df = sqlctx.read.csv(fp.name, > header=False, > schema=schema, > timestampFormat="yyyyMMdd-HHmmss", > sep=' ') > df.count() > df.dtypes > df.show() > new_schema = StructType([StructField("lang", StringType(), False), > StructField("page", StringType(), False), > StructField("day", StringType(), False), > StructField("enc", StringType(), False)]) > from pyspark.sql.functions import pandas_udf, PandasUDFType > import pandas as pd > hour_to_letter = ['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O', > 'P','Q','R','S','T','U','V','W','X'] > @pandas_udf(new_schema, PandasUDFType.GROUPED_MAP) > def concat_hours(x): > view_hours = x['hour'].tolist() > view_views = x['views'].tolist() > view_hours_letters = [hour_to_letter[h] for h in view_hours] > encoded_views = [l + str(h) > for l, h in sorted(zip(view_hours_letters,view_views))] > encoded_views_string = ''.join(encoded_views) > # return pd.DataFrame({'enc': x.page, 'day': x.lang, 'lang': x.day, > # 'page': encoded_views_string}, index=[x.index[0]]) > return pd.DataFrame({'page': x.page, 'lang': x.lang,'day': x.day, > 'enc': encoded_views_string}, index=[x.index[0]]) > from pyspark.sql import functions > grouped_df = (df.select(['lang', > 'page', > functions.date_format('timestamp','yyyy-MM-dd')\ > .alias('day'), > functions.hour('timestamp').alias('hour'), > 'views' > ]) > .groupby(['lang','page','day']) > ) > grouped_df = (grouped_df.apply(concat_hours) > .dropDuplicates() > ) > grouped_df.show() > {code} > > This is what I am getting: > {noformat} > $ ./udf_example.py > 2018-05-20 05:13:23 WARN Utils:66 - Your hostname, inara resolves to a > loopback address: 127.0.1.1; using 10.109.49.111 instead (on interface wlp2s0) > 2018-05-20 05:13:23 WARN Utils:66 - Set SPARK_LOCAL_IP if you need to bind > to another address > 2018-05-20 05:13:23 WARN NativeCodeLoader:62 - Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > Setting default log level to "WARN". > To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use > setLogLevel(newLevel). > 2018-05-20 05:13:24 WARN Utils:66 - Service 'SparkUI' could not bind on port > 4040. Attempting port 4041. > +----+-------------+-------------------+-----+ > |lang| page| timestamp|views| > +----+-------------+-------------------+-----+ > | en| Albert_Camus|2007-12-10 00:00:00| 150| > | en| Albert_Camus|2007-12-10 01:00:00| 148| > | en| Albert_Camus|2007-12-10 02:00:00| 197| > | en| Albert_Camus|2007-12-11 20:00:00| 145| > | en| Albert_Camus|2007-12-11 21:00:00| 131| > | en| Albert_Camus|2007-12-11 22:00:00| 154| > | en| Albert_Camus|2007-12-11 23:00:01| 142| > | en|Albert_Caquot|2007-12-10 02:00:00| 1| > | en|Albert_Caquot|2007-12-10 02:00:00| 1| > | en|Albert_Caquot|2007-12-10 04:00:01| 1| > | en|Albert_Caquot|2007-12-11 06:00:00| 1| > | en|Albert_Caquot|2007-12-11 08:00:00| 1| > | en|Albert_Caquot|2007-12-11 15:00:00| 3| > | en|Albert_Caquot|2007-12-11 21:00:00| 1| > +----+-------------+-------------------+-----+ > +----------+----------------+---+-------------+ > > | lang| page|day| enc| > +----------+----------------+---+-------------+ > |2007-12-10| A150B148C197| en| Albert_Camus| > |2007-12-11| G1I1P3V1| en|Albert_Caquot| > |2007-12-10| C1C1E1| en|Albert_Caquot| > |2007-12-11|U145V131W154X142| en| Albert_Camus| > +----------+----------------+---+-------------+ > {noformat} > Of course what I am expecting is: > {noformat} > +----+-------------+----------+----------------+ > > |lang| page| day| enc| > +----+-------------+----------+----------------+ > | en|Albert_Caquot|2007-12-11| G1I1P3V1| > | en|Albert_Caquot|2007-12-10| C1C1E1| > | en| Albert_Camus|2007-12-10| A150B148C197| > | en| Albert_Camus|2007-12-11|U145V131W154X142| > +----+-------------+----------+----------------+ > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org