[
https://issues.apache.org/jira/browse/SPARK-33110?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17211769#comment-17211769
]
Andrea Viano commented on SPARK-33110:
--
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('Apriori').getOrCreate()
#https://stackoverflow.com/questions/21971449/how-do-i-increase-the-cell-width-of-the-jupyter-ipython-notebook-in-my-browser
from IPython.core.display import display, HTML
display(HTML(".container \{ width:105% !important; }"))
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import ArrayType
from pyspark.sql.types import BooleanType
from pyspark.sql.functions import array_contains
from pyspark.sql.functions import col
from pyspark.sql.functions import concat
from pyspark.sql.functions import collect_list
import pandas as pd
#https://community.cloudera.com/t5/Support-Questions/Pyspark-can-t-show-a-CSV-with-an-array/td-p/229618
string = "Coals"
from pyspark.sql.functions import udf
def str_to_arr(my_list):
my_list = my_list.split(",")
new_list=[]
for elem in my_list:
new_list.append(elem)
new_list.sort()
return new_list
def transaction_size(my_list):
return len(my_list)
'''def transaction_check(my_list):
for e in my_list:
if e != "Beer":
return True
break
else:
return False'''
def transaction_check(array):
return (any("Coals" in word for word in array))
#
https://stackoverflow.com/questions/37284077/combine-pyspark-dataframe-arraytype-fields-into-single-arraytype-field
def concat(type):
def concat_(*args):
return list(chain.from_iterable((arg if arg else [] for arg in args)))
return udf(concat_, ArrayType(type))
concat_string_arrays = concat(StringType())
##def make_list(my_list):
schema = StructType([StructField('TRANSACTIONS', StringType(),True)])
transaction_set = spark.read.option("delimiter",
";").csv('testApriori2.csv',schema=schema)
transaction_set.show()
#https://stackoverflow.com/questions/43406887/spark-dataframe-how-to-add-a-index-column-aka-distributed-data-index
from pyspark.sql.functions import monotonically_increasing_id
transaction_set = transaction_set.select("*").withColumn("id",
monotonically_increasing_id()+1)
transaction_set.show()
str_to_arr_udf = udf(str_to_arr,ArrayType(StringType()))
transaction_set=
transaction_set.withColumn('TRANSACTIONS_LIST',str_to_arr_udf(transaction_set["TRANSACTIONS"]))
transaction_set = transaction_set.drop("TRANSACTIONS")
transaction_set.show()
transaction_set.printSchema()
rep_trans_count=transaction_set.groupBy("TRANSACTIONS_LIST").count()
rep_trans_count.show()
rep_trans_count.printSchema()
from pyspark.sql.functions import size
#https://stackoverflow.com/questions/44541605/how-to-get-the-lists-length-in-one-column-in-dataframe-spark
count_for_transact =
rep_trans_count.select('*',size("TRANSACTIONS_LIST").alias("TRANSACTIONS_LIST_SIZE"))
count_for_transact.show()
#transaction_size.show()
transaction_check_udf = udf(transaction_check,BooleanType())
object_transaction=
count_for_transact.withColumn('TRANSACTIONS_LIST_BOOL',transaction_check_udf(count_for_transact["TRANSACTIONS_LIST"]))
object_transaction.show(80,False)
object_transaction_sel=object_transaction.filter(col("TRANSACTIONS_LIST_BOOL")==True)
object_transaction_sel.show(80,False)
element_to_compare =
object_transaction_sel.groupby("TRANSACTIONS_LIST_BOOL").agg(collect_list("TRANSACTIONS_LIST")).show(80,False)
element_to_compare =
element_to_compare.select(concat(col("TRANSACTIONS_LIST"))).show(80,False)
#element_to_compare.select(concat_string_arrays("collect_list(TRANSACTIONS_LIST)")).show(truncate=False)
beer=rep_trans_count.select('*').where(array_contains(rep_trans_count["TRANSACTIONS_LIST"],"Beer"))
beer.show(80,False)[^testApriori3.csv]
> array_contains doesn't pick element in the array but just at the end
>
>
> Key: SPARK-33110
> URL: https://issues.apache.org/jira/browse/SPARK-33110
> Project: Spark
> Issue Type: Bug
> Components: SQL
>Affects Versions: 2.4.7, 3.0.1
> Environment: Windows 10
> Python 3.7.4
> Java JDK 8
> hadoop 3.0.1
>Reporter: Andrea Viano
>Priority: Major
> Attachments: beer0.PNG, beer1.PNG, beer2.PNG, testApriori3.csv
>
>
> Hello,
> given a column of array of strings. I used array_contains to find all the
> array with the string "Beer". array_contains is recognise just the string
> "Beer" if it is at the end of the array but not if it is in the middle of the
> array.
> beer=rep_trans_count.withColumn("keep",
> array_contains(rep_trans_count.TRANSACTIONSLIST,"Beer")).filter(col("keep")==True)
>
>
--
This