Fwd: Uncaught Exception Handler in master

2019-04-16 Thread Alessandro Liparoti
Hi everyone,

I have a spark libary where I would like to do some action before an
uncaught exception happens (log it, increment an error metric, ...). I
tried multiple times to use
setUncaughtExceptionHandler in the current Thread but this doesn't work. If
I spawn another thread this works fine. Any idea of what I can do?

*Alessandro Liparoti*


Uncaught Exception Handler in master

2019-03-28 Thread Alessandro Liparoti
Hi everyone,

I have a spark libary where I would like to do some action before an
uncaught exception happens (log it, increment an error metric, ...). I
tried multiple times to use
setUncaughtExceptionHandler in the current Thread but this doesn't work. If
I spawn another thread this works fine. Any idea of what I can do?

*Alessandro Liparoti*


Logging exception of custom spark app

2019-01-04 Thread Alessandro Liparoti
Hi everyone,

I have an application that uses spark to perform some computation. It can
be used both in a spark-shell or in a spark-submit. I want to log all
exceptions throw by my code inside a file in order to have some detailed
info when user have an error.

I tried with this

Thread.currentThread().setUncaughtExceptionHandler(new
Thread.UncaughtExceptionHandler() {
def uncaughtException(t: Thread, e: Throwable): Unit = {
  logger.error("exception logged")
  println("exception logged")
}
})

but it is not working. I saw that Spark already sets an
uncaughtExceptionHandler, so probably this code is not effective.
The other option would be to try-catch all public methods of my API, log an
exception when it happens and then throw it. But I think this is not
optimal.

Do you have any suggestion?
*Alessandro Liparoti*


pySpark and py4j: NoClassDefFoundError when upgrading a jar

2018-12-31 Thread Alessandro Liparoti
We developed a Scala library to run on spark called FV. We also built
wrappers in python for its public API using py4j as in spark. For example,
the main object is instantiated like this

self._java_obj = self._new_java_obj("com.example.FV", self.uid)

and the methods on the object are called in this way

def add(self, r):
self._java_obj.add(r)

We are experiencing an annoying issue when running pyspark with this
external library. We use to run the pyspark shell like this

pyspark --repositories  --packages


When we release a new version and we have some change in the Scala API,
things start to randomly break for some users. For example, in version 0.44
we had a class *DateUtils* (used by class *Utils*, which is used by class
*FV* in method *add*) that was dropped in version 0.45. When version 0.45
was released and a user called the method *add* in python API we got

java.lang.NoClassDefFoundError: Could not initialize class DateUtils

Basically, the python API is running the method *add* which contains a
reference to class *DateUtils*(v0.44) but when it is actually going to load
the needed class it doesn't find it, because the loaded jar is the v0.45
(as the ivy log shows when starting up the shell)

Do you have any idea of what the problem might be? Does maybe py4j cache
something so that when upgrading the classes we get this error?
*Alessandro Liparoti*


Package option gets outdated jar when running with "latest"

2018-12-28 Thread Alessandro Liparoti
Hi everyone,

I am encountering an annoying issue when running spark with external jar
dependency downloaded from maven. This is how we run it

spark-shell --repositories  --packages


When we release a new version and we have some big change in the API,
things start to randomly break for some users. For example, in version 0.44
we had a class DateUtils (used by class Utils) that was dropped in version
0.45. Running when version 0.45 was released (spark shows it is correctly
downloading it from maven) and using the class Utils some users got

NoClassDefFoundError for class DateUtils

To me this looks like a caching problem. Probably some node (master or an
executor) ClassLoader is still pointing to v0.44 and when loading Utils it
tries to find DateUtils class which has disappeared in newer jar. Not sure
how this can happen, this is only an intution.

Does anyone have any idea on how to solve this? It is also very hard to
debug since I couldn't find a pattern to reproduce it. It happens on every
release that changes a class name but not for everyone running the job
(that's why caching looked like a good hint to me).

Thanks,
*Alessandro Liparoti*


Checkpointing clarifications

2018-09-20 Thread Alessandro Liparoti
Good morning,

I have a large scale job that for certain size of input breaks so I am
trying to play with checkpointing to split the DAG and understand the
problematic point. I have some questions about checkpointing:

   1. What is the utility of non-eager checkpointing?
   2. How checkpointing is different than manually write a dataframe (or
   rdd) to hdfs? Also, doing that will allow to re-read the stored dataframe,
   while with chekpointing I don't see a simple way of re-reading them in a
   future job
   3. I read that checkpointing is different than persisting because the
   lineage is not stored, but I don't understand why persisting stores the
   lineage. The point of persisting is that next computation will start from
   the persisted data (either mem or mem+disk), so what is the advantage of
   having the lineage available? Am I missing some basic understanding of
   these 2 apparently different operations?

Thanks,
*Alessandro Liparoti*


Spark sql syntax checker

2018-08-03 Thread Alessandro Liparoti
Hi everyone,

I am building a framework on top of Spark in which users specify sql
queries and we analyze them in order to extract some metadata. Moreover,
sql queries can be composed, meaning that if a user writes a query X to
build a dataset, another user can use X in his own query to refer to it,
and we will take care of substituting X with the right data at the right
time.
I am using sqlContext parsePlan to extract where conditions and source
tables from a query. However, I saw that this component doesn't break if I
write a syntatically wrong query; "select * from" is parsed correctly with
no exception given. If I want to get syntatical errors it seems the only
way to do so is using sqlContext.sql method.
However, this method also checks for table existence and columns, which is
something I don't want, since my framework logic might substitute this
values at runtime.

Which approach do you suggest me to use? Is there a syntactical check in
the spark codebase that I can use for my use case?

*Alessandro Liparoti*


Spark FAIR Scheduler vs FIFO Scheduler

2018-06-18 Thread Alessandro Liparoti
Good morning,

I have a conceptual question. In an application I am working on, when I
write to HDFS some results (*action 1*), I use ~30 executors out of 200. I
would like to improve resource utilization in this case.
I am aware that repartitioning the df to 200 before action 1 would produce
200 tasks and full executors utilization, but for several reasons is not
what I want to do.
What I would like to do is using the other ~170 executors to work on the
actions (jobs) coming after action 1. The normal case would be that *action
2* starts after action 1 (FIFO), but here I want them to start at the same
time, using the idle executors.

My question is: is it something achievable with the FAIR scheduler approach
and if yes how?

As I read the fair scheduler needs a pool of jobs and then it schedules
their tasks in a round-robin fashion. If I submit action 1 and action 2 at
the same time (multi-threading) to a fair pool, which of the following
things happen?

   1. at every moment, all (or almost all) executors are used in parallel
   (30 for action 1, the rest for action 2)
   2. for a certain small amount of time X, 30 executors are used for
   action 1, then for another time X the other executors are used for action
   2, then again X unit of time for action 1 and so on...

Among the two, 1 will actually improve cluster utlization, while 2 will
allow only to have both jobs advancing at the same time. Can someone who
has knowledge about the FAIR scheduler help me understand how it works?

Thanks,
*Alessandro Liparoti*


Writing to HDFS and cluster utilization

2018-06-15 Thread Alessandro Liparoti
Hi,

I would like to briefly present you my use case and gather possible useful
suggestions from the community. I am developing a spark job which massively
read from and write to Hive. Usually, I use 200 executors with 12g memory
each and a parallelism level of 600. The main run of the application
consists of phases: read from hdfs, persist, small and simple aggregations,
write to hdfs. These steps are repeated a certain number of time.
When I write to Hive, I aim to have partitions of approximately 50/70mb,
therefore I repartition before writing in output in approximately 15 parts
(according to the data size). The writing phase takes around 1.5 minutes;
this means that for 1.5 minutes only 15 out of 600 possible active tasks
are running in parallel. This looks a big waste of resources. How would you
solve the problem?

I am trying to experiment with the FAIR scheduler and job pools, but it
seems not improving a lot; for some reasons, I cannot have more than 4
parallel jobs running. I am investigating this opportunity right now, maybe
I will provide more details about it afterwards.

I would like to know if this use case is normal, what would you do and if
in your opinion I am doing something wrong.

Thanks,
*Alessandro Liparoti*


Spark issue 20236 - overwrite a partitioned data srouce

2018-06-14 Thread Alessandro Liparoti
Good morning,

I am trying to see how this bug affects the write in spark 2.2.0, but I
cannot reproduce it. Is it ok then using the code
df.write.mode(SaveMode.Overwrite).insertInto("table_name")
?

Thank you,
*Alessandro Liparoti*