Re: PySpark - Expand rows into dataframes via function
Flatmap works too.. Explode function is a SQL/Dataframe way of one to many operation. Both should work. Thanks On Tue, Oct 3, 2017 at 8:30 AM Patrick McCarthywrote: > Thanks Sathish. > > Before you responded, I came up with this solution: > > # A function to take in one row and return the expanded ranges: > def processRow(x): > > ... > return zip(list_of_ip_ranges, list_of_registry_ids) > > # and then in spark, > > processed_rdds = spark_df_of_input_data.rdd.flatMap(lambda x: > processRow(x)) > > processed_df = > (processed_rdds.toDF().withColumnRenamed('_1','ip').withColumnRenamed('_2','registryid')) > > And then after that I split and subset the IP column into what I wanted. > > On Mon, Oct 2, 2017 at 7:52 PM, Sathish Kumaran Vairavelu < > vsathishkuma...@gmail.com> wrote: > >> It's possible with array function combined with struct construct. Below >> is a SQL example >> >> select Array(struct(ip1,hashkey), struct(ip2,hashkey)) >> from (select substr(col1,1,2) as ip1, substr(col1,3,3) as ip2, etc, >> hashkey from object) a >> >> If you want dynamic ip ranges; you need to dynamically construct structs >> based on the range values. Hope this helps. >> >> >> Thanks >> >> Sathish >> >> On Mon, Oct 2, 2017 at 9:01 AM Patrick McCarthy >> wrote: >> >>> Hello, >>> >>> I'm trying to map ARIN registry files into more explicit IP ranges. They >>> provide a number of IPs in the range (here it's 8192) and a starting IP, >>> and I'm trying to map it into all the included /24 subnets. For example, >>> >>> Input: >>> >>> array(['arin', 'US', 'ipv4', '23.239.160.0', 8192, 20131104.0, 'allocated', >>> >>>'ff26920a408f15613096aa7fe0ddaa57'], dtype=object) >>> >>> >>> Output: >>> >>> array([['23', '239', '160', 'ff26920a408f15613096aa7fe0ddaa57'], >>>['23', '239', '161', 'ff26920a408f15613096aa7fe0ddaa57'], >>>['23', '239', '162', 'ff26920a408f15613096aa7fe0ddaa57'], >>> >>> ... >>> >>> >>> I have the input lookup table in a pyspark DF, and a python function to do >>> the conversion into the mapped output. I think to produce the full mapping >>> I need a UDTF but this concept doesn't seem to exist in PySpark. What's the >>> best approach to do this mapping and recombine into a new DataFrame? >>> >>> >>> Thanks, >>> >>> Patrick >>> >>> >
Re: PySpark - Expand rows into dataframes via function
Thanks Sathish. Before you responded, I came up with this solution: # A function to take in one row and return the expanded ranges: def processRow(x): ... return zip(list_of_ip_ranges, list_of_registry_ids) # and then in spark, processed_rdds = spark_df_of_input_data.rdd.flatMap(lambda x: processRow(x)) processed_df = (processed_rdds.toDF().withColumnRenamed('_1','ip').withColumnRenamed('_2','registryid')) And then after that I split and subset the IP column into what I wanted. On Mon, Oct 2, 2017 at 7:52 PM, Sathish Kumaran Vairavelu < vsathishkuma...@gmail.com> wrote: > It's possible with array function combined with struct construct. Below is > a SQL example > > select Array(struct(ip1,hashkey), struct(ip2,hashkey)) > from (select substr(col1,1,2) as ip1, substr(col1,3,3) as ip2, etc, > hashkey from object) a > > If you want dynamic ip ranges; you need to dynamically construct structs > based on the range values. Hope this helps. > > > Thanks > > Sathish > > On Mon, Oct 2, 2017 at 9:01 AM Patrick McCarthy> wrote: > >> Hello, >> >> I'm trying to map ARIN registry files into more explicit IP ranges. They >> provide a number of IPs in the range (here it's 8192) and a starting IP, >> and I'm trying to map it into all the included /24 subnets. For example, >> >> Input: >> >> array(['arin', 'US', 'ipv4', '23.239.160.0', 8192, 20131104.0, 'allocated', >> >>'ff26920a408f15613096aa7fe0ddaa57'], dtype=object) >> >> >> Output: >> >> array([['23', '239', '160', 'ff26920a408f15613096aa7fe0ddaa57'], >>['23', '239', '161', 'ff26920a408f15613096aa7fe0ddaa57'], >>['23', '239', '162', 'ff26920a408f15613096aa7fe0ddaa57'], >> >> ... >> >> >> I have the input lookup table in a pyspark DF, and a python function to do >> the conversion into the mapped output. I think to produce the full mapping I >> need a UDTF but this concept doesn't seem to exist in PySpark. What's the >> best approach to do this mapping and recombine into a new DataFrame? >> >> >> Thanks, >> >> Patrick >> >>
Re: PySpark - Expand rows into dataframes via function
It's possible with array function combined with struct construct. Below is a SQL example select Array(struct(ip1,hashkey), struct(ip2,hashkey)) from (select substr(col1,1,2) as ip1, substr(col1,3,3) as ip2, etc, hashkey from object) a If you want dynamic ip ranges; you need to dynamically construct structs based on the range values. Hope this helps. Thanks Sathish On Mon, Oct 2, 2017 at 9:01 AM Patrick McCarthywrote: > Hello, > > I'm trying to map ARIN registry files into more explicit IP ranges. They > provide a number of IPs in the range (here it's 8192) and a starting IP, > and I'm trying to map it into all the included /24 subnets. For example, > > Input: > > array(['arin', 'US', 'ipv4', '23.239.160.0', 8192, 20131104.0, 'allocated', > >'ff26920a408f15613096aa7fe0ddaa57'], dtype=object) > > > Output: > > array([['23', '239', '160', 'ff26920a408f15613096aa7fe0ddaa57'], >['23', '239', '161', 'ff26920a408f15613096aa7fe0ddaa57'], >['23', '239', '162', 'ff26920a408f15613096aa7fe0ddaa57'], > > ... > > > I have the input lookup table in a pyspark DF, and a python function to do > the conversion into the mapped output. I think to produce the full mapping I > need a UDTF but this concept doesn't seem to exist in PySpark. What's the > best approach to do this mapping and recombine into a new DataFrame? > > > Thanks, > > Patrick > >
PySpark - Expand rows into dataframes via function
Hello, I'm trying to map ARIN registry files into more explicit IP ranges. They provide a number of IPs in the range (here it's 8192) and a starting IP, and I'm trying to map it into all the included /24 subnets. For example, Input: array(['arin', 'US', 'ipv4', '23.239.160.0', 8192, 20131104.0, 'allocated', 'ff26920a408f15613096aa7fe0ddaa57'], dtype=object) Output: array([['23', '239', '160', 'ff26920a408f15613096aa7fe0ddaa57'], ['23', '239', '161', 'ff26920a408f15613096aa7fe0ddaa57'], ['23', '239', '162', 'ff26920a408f15613096aa7fe0ddaa57'], ... I have the input lookup table in a pyspark DF, and a python function to do the conversion into the mapped output. I think to produce the full mapping I need a UDTF but this concept doesn't seem to exist in PySpark. What's the best approach to do this mapping and recombine into a new DataFrame? Thanks, Patrick