[Spark SQL] Catalyst ScalaReflection/ExpressionEncoder fail with relocated (shaded) classes

2018-09-14 Thread johkelly
Hello,

I'm trying to compile google's timestamp.proto protobuf to a scala case
class and use it as a field in another proto-derived case class as part of a
larger dataset schema.
(Although the SQL date type might be preferred in a schema, I encountered
this problem when I attempted to use Timestamp for compatibility with some
existing code)



To avoid the usual "spark/hadoop provide protobuf packages which conflict
with user code" problem, I relocated the com.google.protobuf.timestamp
package in my uberjar with the gradle-shadow plugin.

Unfortunately, this leads to a somewhat cryptic error message referencing
the original package name at runtime:


( https://gist.github.com/johkelly/0c99c7bf717adc610fc906296be02850 )

Relocating the Timestamp class' package appears to have broken the encoder
generation code. I don't understand the libraries involved well enough to
know where to file a bug report, however.

I put together a small gradle project that seems to demonstrate the problem
locally (albeit with a different error message):

https://gist.github.com/johkelly/ff78f6c80bcbe38e1e73c598b364395b

An explanation of which component (scala, spark, shadow, other?) is at fault
here so I can know where to direct a bug report (and possibly create a
workaround) would be appreciated.

Thanks,
Jack Kelly





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[SparkSQL] Count Distinct issue

2018-09-14 Thread Daniele Foroni
Hi all,

I am having some troubles in doing a count distinct over multiple columns.
This is an example of my data:
++++---+
|a   |b   |c   |d  |
++++---+
|null|null|null|1  |
|null|null|null|2  |
|null|null|null|3  |
|null|null|null|4  |
|null|null|null|5  |
|null|null|null|6  |
|null|null|null|7  |
++++---+
And my code:
val df: Dataset[Row] = …
val cols: List[Column] = df.columns.map(col).toList
df.agg(countDistinct(cols.head, cols.tail: _*))

So, in the example above, if I count the distinct “rows” I obtain 7 as result 
as expected (since the “d" column changes for every row).
However, with more columns (16) in EXACTLY the same situation (one incremental 
column and 15 columns filled with nulls) the result is 0.

I don’t understand why I am experiencing this problem.
Any solution?

Thanks,
---
Daniele


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: StackOverflow Error when run ALS with 100 iterations

2018-09-14 Thread LeoB
Just wanted to add a comment to the Jira ticket but I don't think I have
permission to do so, so answering here instead. I am encountering the same
issue with a stackOverflow Exception. 
I would like to point out that there is a  
localCheckpoint

  
method which does not require HDFS to be installed. We could use this
instead of Checkpoint to cut down the lineage. 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Python Dependencies Issue on EMR

2018-09-14 Thread Patrick McCarthy
You didn't say how you're zipping the dependencies, but I'm guessing you
either include .egg files or zipped up a virtualenv. In either case, the
extra C stuff that scipy and pandas rely upon doesn't get included.

An approach like this solved the last problem I had that seemed like this -
https://community.hortonworks.com/articles/58418/running-pyspark-with-conda-env.html

On Thu, Sep 13, 2018 at 10:08 PM, Jonas Shomorony 
wrote:

> Hey everyone,
>
>
> I am currently trying to run a Python Spark job (using YARN client mode)
> that uses multiple libraries, on a Spark cluster on Amazon EMR. To do that,
> I create a dependencies.zip file that contains all of the
> dependencies/libraries (installed through pip) for the job to run
> successfully, such as pandas, scipy, tqdm, psycopg2, etc. The
> dependencies.zip file is contained within an outside directory (let’s call
> it “project”) that contains all the code to run my Spark job. I then zip up
> everything within project (including dependencies.zip) into project.zip.
> Then, I call spark-submit on the master node in my EMR cluster as follows:
>
>
> `spark-submit --packages … --py-files project.zip --jars ...
> run_command.py`
>
>
> Within “run_command.py” I add dependencies.zip as follows:
>
> `self.spark.sparkContext.addPyFile("dependencies.zip”)`
>
>
> The run_command.py then uses other files within project.zip to complete
> the spark job, and within those files, I import various libraries (found in
> dependencies.zip).
>
>
> I am running into a strange issue where all of the libraries are imported
> correctly (with no problems) with the exception of scipy and pandas.
>
>
> For scipy I get the following error:
>
>
> `File "/mnt/tmp/pip-install-79wp6w/scipy/scipy/__init__.py", line 119, in
> 
>
>   File "/mnt/tmp/pip-install-79wp6w/scipy/scipy/_lib/_ccallback.py", line
> 1, in 
>
> ImportError: cannot import name _ccallback_c`
>
>
> And for pandas I get this error message:
>
>
> `File "/mnt/tmp/pip-install-79wp6w/pandas/pandas/__init__.py", line 35,
> in 
>
> ImportError: C extension: No module named tslib not built. If you want to
> import pandas from the source directory, you may need to run 'python
> setup.py build_ext --inplace --force' to build the C extensions first.`
>
>
> When I comment out the imports for these two libraries (and their use from
> within the code) everything works fine.
>
>
> Surprisingly, when I run the application locally (on master node) without
> passing in dependencies.zip, it picks and resolves the libraries from
> site-packages correctly and successfully runs to completion.
> dependencies.zip is created by zipping the contents of site-packages.
>
>
> Does anyone have any ideas as to what may be happening here? I would
> really appreciate it.
>
>
> pip version: 18.0
>
> spark version: 2.3.1
>
> python version: 2.7
>
>
> Thank you,
>
>
> Jonas
>
>


What is the best way for Spark to read HDF5@scale?

2018-09-14 Thread kathleen li
Hi,
Any Spark-connector for HDF5?

The following link does not work anymore?

https://www.hdfgroup.org/downloads/spark-connector/
down vo

Thanks,

Kathleen


Re: Unsubscribe

2018-09-14 Thread Mohan Palavancha
On Thu, Sep 13, 2018 at 7:47 PM Pekka Lehtonen  wrote:

>
>


Spark2 DynamicAllocation doesn't release executors that used cache

2018-09-14 Thread Sergejs Andrejevs
Hi,

We're starting to use Spark2 with usecases for Dynamic Allocation.
However, it was noticed it doesn't work as expected when dataset is 
cached (persist).
The cluster runs with:
CDH 5.15.0
Spark 2.3.0
Oracle Java 8.131

The following configs are passed to spark (as well as setup at cluster):
# Dynamic Allocation
spark.shuffle.service.enabled  true
spark.dynamicAllocation.enabledtrue

spark.dynamicAllocation.schedulerBacklogTimeout1
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout   1
spark.dynamicAllocation.executorIdleTimeout90

spark.dynamicAllocation.initialExecutors   1
spark.dynamicAllocation.minExecutors   1
spark.dynamicAllocation.maxExecutors   30

Cluster also has these configs enabled, as well as spark_shuffle is setup and 
YARN application classpath is populated. The executors' storage is freed upon 
application finish (based on: 
https://spark.apache.org/docs/latest/running-on-yarn.html#configuring-the-external-shuffle-service)
Here is the simplified code that reproduced the issue in our cluster (HA YARN).

When the following code is executed with "cache=false" - the executors are 
created, used and killed by idle timeout.
When "cache=true" - the executors are created, used, but not killed and they 
remain hanging.

The storage in both cases was cleaned up.

void run() {

List objList = new ArrayList<>();

for (long i = 0; i < 1000; i++) {

objList.add(new O1(i, "test"));

}



Dataset ds = sparkSession.createDataset(objList, 
Encoders.bean(O1.class));

ds = ds.repartition(4);



if (cache) {

ds.persist(StorageLevel.MEMORY_AND_DISK());

try {

ds.show(100, false);

} finally {

ds.unpersist();

}

} else {

ds.show(100, false);

}

}



//O1 POJO class:

public class O1 {

private Long transactionDate;

private String name;



public O1() {

}

public O1(Long transactionDate, String name) {

this.transactionDate = transactionDate;

this.name = name;

}



public Long getTransactionDate() {

return transactionDate;

}

public void setTransactionDate(Long transactionDate) {

this.transactionDate = transactionDate;

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

}

Moreover, when spark.dynamicAllocation.cachedExecutorIdleTimeout is set to some 
particular time, then the containers are killed successfully (even if they have 
used cache) (the check was inspired by: 
https://spark.apache.org/docs/latest/job-scheduling.html#graceful-decommission-of-executors
 )

Unfortunately, we will have in future containers that keep cache and might live 
for a long time, as well as containers that free the cache (unpersist) and are 
expected to be killed (along with idling executors).

Is it a bug or some configuration is missing?

Best regards,
Sergejs Andrejevs


DAGScheduler in SparkStreaming

2018-09-14 Thread Guillermo Ortiz
A question, if you use Spark Streaming, the DAG is calculated for each
microbatch? it's possible to calculate only the first time?


Is there any open source framework that converts Cypher to SparkSQL?

2018-09-14 Thread kant kodali
Hi All,

Is there any open source framework that converts Cypher to SparkSQL?

Thanks!


Re: Local vs Cluster

2018-09-14 Thread Apostolos N. Papadopoulos

Hi Aakash,

in the cluster you need to consider the total number of executors you 
are using. Please take a look in the following link


for an introduction.


https://spoddutur.github.io/spark-notes/distribution_of_executors_cores_and_memory_for_spark_application.html


regards,

Apostolos




On 14/09/2018 11:21 πμ, Aakash Basu wrote:

Hi,

What is the Spark cluster equivalent of standalone's local[N]. I mean, 
the value we set as a parameter of local as N, which parameter takes 
it in the cluster mode?


Thanks,
Aakash.


--
Apostolos N. Papadopoulos, Associate Professor
Department of Informatics
Aristotle University of Thessaloniki
Thessaloniki, GREECE
tel: ++0030312310991918
email: papad...@csd.auth.gr
twitter: @papadopoulos_ap
web: http://datalab.csd.auth.gr/~apostol


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Local vs Cluster

2018-09-14 Thread Mich Talebzadeh
Local only one JVM, runs on the host you submitted the job

${SPARK_HOME}/bin/spark-submit \
  --master local[N] \

 Standalone meaning using Spark own scheduler

${SPARK_HOME}/bin/spark-submit \
--master spark:// \

Where IP_ADDRESS is the host your Spark master started.

In Standalone mode you can have a master and multiple workers running on
master host + other in the cluster. You state master from

You start master from $SPARK_HOME/sbin

start-master.sh

Workers from

start-slaves.sh


For example in $SPARK_HOME/conf you have slaves file containing the
following:

# A Spark Worker will be started on each of the machines listed below.
rhes75
rhes75
rhes75
rhes75
rhes564

where the file start-slaves.sh will pickup the list of workers to start.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 14 Sep 2018 at 09:21, Aakash Basu 
wrote:

> Hi,
>
> What is the Spark cluster equivalent of standalone's local[N]. I mean, the
> value we set as a parameter of local as N, which parameter takes it in the
> cluster mode?
>
> Thanks,
> Aakash.
>


Local vs Cluster

2018-09-14 Thread Aakash Basu
Hi,

What is the Spark cluster equivalent of standalone's local[N]. I mean, the
value we set as a parameter of local as N, which parameter takes it in the
cluster mode?

Thanks,
Aakash.