[Spark Core] Does Spark support parquet predicate pushdown for big lists?

2021-12-16 Thread Amin Borjian
Hello all,

We use Apache Spark 3.2.0 and our data stored on Apache Hadoop with parquet 
format.

One of the advantages of the parquet format is the presence of the predicate 
pushdown filter feature, which allows only the necessary data to be read. This 
feature is well provided by Spark. For example, in the following query, Spark 
detects the predicate condition and causes the least amount of data to be read 
using parquet metadata:

Select * from files where fieldA in (1, 2);
Predicate pushdown: fieldA in (1, 2) # filter parquet column pages with the 
help of metadata of column

This feature also exists when we use the ` Column.isInCollection()` function, 
and again Spark deduces the predicate pushdown filters correctly. However, the 
larger the list value in the ` isInCollection` function, the larger the query 
text that is created, making it impossible to serialize it and send it to the 
workers to execute the command in practice. There is an open issue to improve 
it too: https://issues.apache.org/jira/browse/SPARK-29048

Unfortunately, we have problems with the list of more than 1 million members, 
and it works very slowly for values close to it.  However, the list size is 
such that it can fit perfectly in the memory of all workers (even if it has 
nearly 1 billion members)

However, it might be said that the appropriate method here is to broadcast this 
list as a variable and use it later. By trying this method, we still saw that 
the list members are in the query text and it is impossible to send the query 
to other workers due to its large size. (Maybe we made a mistake in using this 
feature and I would be grateful if someone who specializes in this field would 
guide us)

Another suggestion might be to save this list as a data source and use the join 
operation when querying. But the problem with this method is that we lose the 
`predicate pushdown’ advantage because Spark reads the whole data and then does 
the joining operation. (We need the information in this list to be used when 
reading parquets, not to be applied after reading. The DPP feature also did not 
help us because that feature is only for partitioned data and has nothing to do 
with parquet metadata.)

As a result, we are looking to have a large list that can be placed in workers 
memory (It is possible to guarantee it based on our resources), and then when 
reading the parquets, we filter their row-group and columns page through this 
list in worker memory (by using parquet metadata). Is there a way to do this in 
Spark? I would be very grateful if you could help me with this.



Re: Unsubscribe

2021-12-16 Thread Piper H
please send an empty email to:
user-unsubscr...@spark.apache.org
to unsubscribe yourself from the list.

On Fri, Dec 17, 2021 at 11:14 AM Ankit Maloo 
wrote:

> Please do unsubscribe me from your mailing list.
>


Unsubscribe

2021-12-16 Thread Ankit Maloo
Please do unsubscribe me from your mailing list.


Re: class instance variable in PySpark used in lambda function

2021-12-16 Thread Mich Talebzadeh
Many thanks Pol.

As it happens I was doing a work around with numRows = 10.  In general it
is bad practice to hard code the constants within the code. For the same
reason we ought not put URLs embedded in the PySpark program itself.

What I did was to add numRows to the yaml file which is read at the
beginning. It is basically number of rows of random numbers to be generated
in PySpark and posted to the database. That was the solution I adopted.

Cheers,

Mich



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 15 Dec 2021 at 12:03, Pol Santamaria  wrote:

> To me it looks like you are accessing "self" on the workers by using
> "self.numRows" inside the map. As a consequence, "self" needs to be
> serialized which has an attribute referencing the "sparkContext", thus
> trying to serialize the context and failing.
>
> It can be solved in different ways, for instance by avoiding the use of
> "self" in the map, as you did in the last snippet, or by saving the spark
> context / session in a different class than "numRows".
>
> Bests,
>
> Pol Santamaria
>
>
> On Wed, Dec 15, 2021 at 12:24 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>>
>> Hi,
>>
>> I define class instance variable self.numRows = 10 to be available to
>> all methods of this cls as below
>>
>> class RandomData:
>> def __init__(self, spark_session, spark_context):
>> self.spark = spark_session
>> self.sc = spark_context
>> self.config = config
>> self.values = dict()
>> *self.numRows = 10*
>>
>> In another method of the same class, I use lambda function to generate
>> random values
>>
>> def generateRamdomData(self):
>>   rdd = self.sc.parallelize(Range). \
>> map(lambda x: (x, uf.clustered(x, *self.numRows*), \
>>
>> This fails with the error below
>>
>> Could not serialize object: Exception: It appears that you are attempting
>> to reference SparkContext from a broadcast variable, action, or
>> transformation. SparkContext can only be used on the driver, not in code
>> that it run on workers. For more information, see SPARK-5063.
>>
>> However this works if I assign self.numRows to a local variable in the
>> that method as below
>>
>>
>>*numRows = self.numRows*
>>  rdd = self.sc.parallelize(Range). \
>> map(lambda x: (x, uf.clustered(x, *numRows*), \
>>
>>
>>
>> Any better explanation
>>
>>
>> Thanks
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>