Re: Reading parquet files in parallel on the cluster

2021-05-25 Thread Silvio Fiorito
Why not just read from Spark as normal? Do these files have different or 
incompatible schemas?

val df = spark.read.option(“mergeSchema”, “true”).load(listOfPaths)

From: Eric Beabes 
Date: Tuesday, May 25, 2021 at 1:24 PM
To: spark-user 
Subject: Reading parquet files in parallel on the cluster

I've a use case in which I need to read Parquet files in parallel from over 
1000+ directories. I am doing something like this:


   val df = list.toList.toDF()

df.foreach(c => {
  val config = getConfigs()
  doSomething(spark, config)
})



In the doSomething method, when I try to do this:

val df1 = spark.read.parquet(pathToRead).collect()



I get a NullPointer exception given below. It seems the 'spark.read' only works 
on the Driver not on the cluster. How can I do what I want to do? Please let me 
know. Thank you.



21/05/25 17:03:50 WARN TaskSetManager: Lost task 2.0 in stage 8.0 (TID 9, 
ip-10-0-5-3.us-west-2.compute.internal, executor 11): 
java.lang.NullPointerException



at 
org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:144)



at 
org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:142)



at 
org.apache.spark.sql.DataFrameReader.(DataFrameReader.scala:789)



at org.apache.spark.sql.SparkSession.read(SparkSession.scala:656)




Re: Poor performance caused by coalesce to 1

2021-02-03 Thread Silvio Fiorito
As I suggested, you need to use repartition(1) in place of coalesce(1)

That will give you a single file output without losing parallelization for the 
rest of the job.

From: James Yu 
Date: Wednesday, February 3, 2021 at 2:19 PM
To: Silvio Fiorito , user 
Subject: Re: Poor performance caused by coalesce to 1

Hi Silvio,

The result file is less than 50 MB in size so I think it is small and 
acceptable enough for one task to write.

Your suggestion sounds interesting. Could you guide us further on how to easily 
"add a stage boundary"?

Thanks
____
From: Silvio Fiorito 
Sent: Wednesday, February 3, 2021 11:05 AM
To: James Yu ; user 
Subject: Re: Poor performance caused by coalesce to 1


Coalesce is reducing the parallelization of your last stage, in your case to 1 
task. So, it’s natural it will give poor performance especially with large 
data. If you absolutely need a single file output, you can instead add a stage 
boundary and use repartition(1). This will give your query full parallelization 
during processing while at the end giving you a single task that writes data 
out. Note that if the file is large (e.g. in 1GB or more) you’ll probably still 
notice slowness while writing. You may want to reconsider the 1-file 
requirement for larger datasets.



From: James Yu 
Date: Wednesday, February 3, 2021 at 1:54 PM
To: user 
Subject: Poor performance caused by coalesce to 1



Hi Team,



We are running into this poor performance issue and seeking your suggestion on 
how to improve it:



We have a particular dataset which we aggregate from other datasets and like to 
write out to one single file (because it is small enough).  We found that after 
a series of transformations (GROUP BYs, FLATMAPs), we coalesced the final RDD 
to 1 partition before writing it out, and this coalesce degrade the 
performance, not that this additional coalesce operation took additional 
runtime, but it somehow dictates the partitions to use in the upstream 
transformations.



We hope there is a simple and useful way to solve this kind of issue which we 
believe is quite common for many people.





Thanks



James


Re: Poor performance caused by coalesce to 1

2021-02-03 Thread Silvio Fiorito
Coalesce is reducing the parallelization of your last stage, in your case to 1 
task. So, it’s natural it will give poor performance especially with large 
data. If you absolutely need a single file output, you can instead add a stage 
boundary and use repartition(1). This will give your query full parallelization 
during processing while at the end giving you a single task that writes data 
out. Note that if the file is large (e.g. in 1GB or more) you’ll probably still 
notice slowness while writing. You may want to reconsider the 1-file 
requirement for larger datasets.

From: James Yu 
Date: Wednesday, February 3, 2021 at 1:54 PM
To: user 
Subject: Poor performance caused by coalesce to 1

Hi Team,

We are running into this poor performance issue and seeking your suggestion on 
how to improve it:

We have a particular dataset which we aggregate from other datasets and like to 
write out to one single file (because it is small enough).  We found that after 
a series of transformations (GROUP BYs, FLATMAPs), we coalesced the final RDD 
to 1 partition before writing it out, and this coalesce degrade the 
performance, not that this additional coalesce operation took additional 
runtime, but it somehow dictates the partitions to use in the upstream 
transformations.

We hope there is a simple and useful way to solve this kind of issue which we 
believe is quite common for many people.


Thanks

James


Re: pandas_udf is very slow

2020-04-05 Thread Silvio Fiorito
Your 2 examples are doing different things.

The Pandas UDF is doing a grouped map, whereas your Python UDF is doing an 
aggregate.

I think you want your Pandas UDF to be PandasUDFType.GROUPED_AGG? Is your 
result the same?

From: Lian Jiang 
Date: Sunday, April 5, 2020 at 3:28 AM
To: user 
Subject: pandas_udf is very slow

Hi,

I am using pandas udf in pyspark 2.4.3 on EMR 5.21.0. pandas udf is favored 
over non pandas udf per 
https://www.twosigma.com/wp-content/uploads/Jin_-_Improving_Python__Spark_Performance_-_Spark_Summit_West.pdf.

My data has about 250M records and the pandas udf code is like:

def pd_udf_func(data):
return pd.DataFrame(["id"])

pd_udf = pandas_udf(
pd_udf_func,
returnType=("id int"),
functionType=PandasUDFType.GROUPED_MAP
)
df3 = df.groupBy("id").apply(pd_udf)
df3.explain()
"""
== Physical Plan ==
FlatMapGroupsInPandas [id#9L], pd_udf_func(id#9L, txt#10), [id#30]
+- *(2) Sort [id#9L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id#9L, 200)
  +- *(1) Project [id#9L, id#9L, txt#10]
 +- Scan ExistingRDD[id#9L,txt#10]
"""

As you can see, this pandas udf does nothing but returning a row having a 
pandas dataframe having None values. In reality, this pandas udf has 
complicated logic (e.g. iterate through the pandas dataframe rows and do some 
calculation). This simplification is to reduce noise caused by application 
specific logic. This pandas udf takes hours to run using 10 executors (14 cores 
and 64G mem each). On the other hand, below non-pandas udf can finish in 
minutes:

def udf_func(data_list):
return "hello"

udf = udf(udf_func, StringType())
df2 = 
df.groupBy("id").agg(F.collect_list('txt').alias('txt1')).withColumn('udfadd', 
udf('txt1'))
df2.explain()
"""
== Physical Plan ==
*(1) Project [id#9L, txt1#16, pythonUDF0#24 AS udfadd#20]
+- BatchEvalPython [udf_func(txt1#16)], [id#9L, txt1#16, pythonUDF0#24]
   +- ObjectHashAggregate(keys=[id#9L], functions=[collect_list(txt#10, 0, 0)])
  +- Exchange hashpartitioning(id#9L, 200)
 +- ObjectHashAggregate(keys=[id#9L], 
functions=[partial_collect_list(txt#10, 0, 0)])
+- Scan ExistingRDD[id#9L,txt#10]
"""

The physical plans show pandas udf uses sortAggregate (slower) while non-pandas 
udf uses objectHashAggregate (faster).

Below is what I have tried to improve the performance of pandas udf but none of 
them worked:
1. repartition before groupby. For example, df.repartition(140, 
"id").groupBy("id").apply(pd_udf). 140 is the same as 
spark.sql.shuffle.partitions. I hope groupby can benefit from the repartition 
but according to the execution plan the repartition seems to be ignored since 
groupby will do partitioning itself.


2. although this slowness is more likely caused by pandas udf instead of 
groupby, I still played with shuffle settings such as 
spark.shuffle.compress=True, spark.shuffle.spill.compress=True.


3. I played with serDe settings such as 
spark.serializer=org.apache.spark.serializer.KryoSerializer. Also I tried 
pyarrow settings such as spark.sql.execution.arrow.enabled=True and 
spark.sql.execution.arrow.maxRecordsPerBatch=10


4. I tried to replace the solution of "groupby + pandas udf " with 
combineByKey, reduceByKey, repartition + mapPartition. But it is not easy since 
the pandas udf has complicated logic.


My questions:

1. why pandas udf is so slow?
2. is there a way to improve the performance of pandas_udf?
3. in case it is a known issue of pandas udf, what other remedy I can use? I 
guess I need to think harder on combineByKey, reduceByKey, repartition + 
mapPartition. But want to know if I missed anything obvious.

Any clue is highly appreciated.

Thanks
Leon






Re: Parquet 'bucketBy' creates a ton of files

2019-07-10 Thread Silvio Fiorito
It really depends on the use case. Bucketing is storing the data already 
hash-partitioned. So, if you frequently perform aggregations or joins on the 
bucketing column(s) then it can save you a shuffle. You need to keep in mind 
that for joins to completely avoid a shuffle both tables would need to have the 
same bucketing.

Sorting the data may help with filtering assuming you’re using a file format 
like Parquet (e.g. if you frequently filter by account id). If you look at 
slide 11 in this talk I gave at Summit you can see a simple example: 
https://www.slideshare.net/databricks/lessons-from-the-field-episode-ii-applying-best-practices-to-your-apache-spark-applications-with-silvio-fiorito



From: Gourav Sengupta 
Date: Wednesday, July 10, 2019 at 3:14 AM
To: Silvio Fiorito 
Cc: Arwin Tio , "user@spark.apache.org" 

Subject: Re: Parquet 'bucketBy' creates a ton of files

yeah makes sense, also is there any massive performance improvement using 
bucketBy in comparison to sorting?

Regards,
Gourav

On Thu, Jul 4, 2019 at 1:34 PM Silvio Fiorito 
mailto:silvio.fior...@granturing.com>> wrote:
You need to first repartition (at a minimum by bucketColumn1) since each task 
will write out the buckets/files. If the bucket keys are distributed randomly 
across the RDD partitions, then you will get multiple files per bucket.

From: Arwin Tio mailto:arwin@hotmail.com>>
Date: Thursday, July 4, 2019 at 3:22 AM
To: "user@spark.apache.org<mailto:user@spark.apache.org>" 
mailto:user@spark.apache.org>>
Subject: Parquet 'bucketBy' creates a ton of files

I am trying to use Spark's **bucketBy** feature on a pretty large dataset.

```java
dataframe.write()
.format("parquet")
.bucketBy(500, bucketColumn1, bucketColumn2)
.mode(SaveMode.Overwrite)
.option("path", "s3://my-bucket")
.saveAsTable("my_table");
```

The problem is that my Spark cluster has about 500 partitions/tasks/executors 
(not sure the terminology), so I end up with files that look like:

```
part-1-{UUID}_1.c000.snappy.parquet
part-1-{UUID}_2.c000.snappy.parquet
...
part-1-{UUID}_00500.c000.snappy.parquet

part-2-{UUID}_1.c000.snappy.parquet
part-2-{UUID}_2.c000.snappy.parquet
...
part-2-{UUID}_00500.c000.snappy.parquet

part-00500-{UUID}_1.c000.snappy.parquet
part-00500-{UUID}_2.c000.snappy.parquet
...
part-00500-{UUID}_00500.c000.snappy.parquet
```

That's 500x500=25 bucketed parquet files! It takes forever for the 
`FileOutputCommitter` to commit that to S3.

Is there a way to generate **one file per bucket**, like in Hive? Or is there a 
better way to deal with this problem? As of now it seems like I have to choose 
between lowering the parallelism of my cluster (reduce number of writers) or 
reducing the parallelism of my parquet files (reduce number of buckets), which 
will lower the parallelism of my downstream jobs.

Thanks


Re: Parquet 'bucketBy' creates a ton of files

2019-07-04 Thread Silvio Fiorito
You need to first repartition (at a minimum by bucketColumn1) since each task 
will write out the buckets/files. If the bucket keys are distributed randomly 
across the RDD partitions, then you will get multiple files per bucket.

From: Arwin Tio 
Date: Thursday, July 4, 2019 at 3:22 AM
To: "user@spark.apache.org" 
Subject: Parquet 'bucketBy' creates a ton of files

I am trying to use Spark's **bucketBy** feature on a pretty large dataset.

```java
dataframe.write()
.format("parquet")
.bucketBy(500, bucketColumn1, bucketColumn2)
.mode(SaveMode.Overwrite)
.option("path", "s3://my-bucket")
.saveAsTable("my_table");
```

The problem is that my Spark cluster has about 500 partitions/tasks/executors 
(not sure the terminology), so I end up with files that look like:

```
part-1-{UUID}_1.c000.snappy.parquet
part-1-{UUID}_2.c000.snappy.parquet
...
part-1-{UUID}_00500.c000.snappy.parquet

part-2-{UUID}_1.c000.snappy.parquet
part-2-{UUID}_2.c000.snappy.parquet
...
part-2-{UUID}_00500.c000.snappy.parquet

part-00500-{UUID}_1.c000.snappy.parquet
part-00500-{UUID}_2.c000.snappy.parquet
...
part-00500-{UUID}_00500.c000.snappy.parquet
```

That's 500x500=25 bucketed parquet files! It takes forever for the 
`FileOutputCommitter` to commit that to S3.

Is there a way to generate **one file per bucket**, like in Hive? Or is there a 
better way to deal with this problem? As of now it seems like I have to choose 
between lowering the parallelism of my cluster (reduce number of writers) or 
reducing the parallelism of my parquet files (reduce number of buckets), which 
will lower the parallelism of my downstream jobs.

Thanks


Re: [pyspark 2.3+] Bucketing with sort - incremental data load?

2019-05-31 Thread Silvio Fiorito
Spark does allow appending new files to bucketed tables. When the data is read 
in, Spark will combine the multiple files belonging to the same buckets into 
the same partitions.

Having said that, you need to be very careful with bucketing especially as 
you’re appending to avoid generating lots of small files. So, you may need to 
consider periodically running a compaction job.

If you’re simply appending daily snapshots, then you could just consider using 
date partitions, instead?

From: Rishi Shah 
Date: Thursday, May 30, 2019 at 10:43 PM
To: "user @spark" 
Subject: [pyspark 2.3+] Bucketing with sort - incremental data load?

Hi All,

Can we use bucketing with sorting functionality to save data incrementally (say 
daily) ? I understand bucketing is supported in Spark only with saveAsTable, 
however can this be used with mode "append" instead of "overwrite"?

My understanding around bucketing was, you need to rewrite entire table every 
time, can someone help advice?

--
Regards,

Rishi Shah


Re: [Spark SQL] Does Spark group small files

2018-11-13 Thread Silvio Fiorito
Yes, it does bin-packing for small files which is a good thing so you avoid 
having many small partitions especially if you’re writing this data back out 
(e.g. it’s compacting as you read). The default partition size is 128MB with a 
4MB “cost” for opening files. You can configure this using the settings defined 
here: 
http://spark.apache.org/docs/latest/sql-performance-tuning.html#other-configuration-options

From: Yann Moisan 
Date: Tuesday, November 13, 2018 at 3:28 PM
To: "user@spark.apache.org" 
Subject: [Spark SQL] Does Spark group small files

Hello,

I'm using Spark 2.3.1.

I have a job that reads 5.000 small parquet files into s3.

When I do a mapPartitions followed by a collect, only 278 tasks are used (I 
would have expected 5000). Does Spark group small files ? If yes, what is the 
threshold for grouping ? Is it configurable ? Any link to corresponding source 
code ?

Rgds,

Yann.


Re: performance of IN clause

2018-10-17 Thread Silvio Fiorito
Have you run explain for each query? If you look at the physical query plan 
it’s most likely the same. If the inner-query/join-table is small enough it 
should end up as a broadcast join.

From: Jayesh Lalwani 
Date: Wednesday, October 17, 2018 at 5:03 PM
To: "user@spark.apache.org" 
Subject: performance of IN clause

Is there  a significant differrence in how a IN clause performs when compared 
to a JOIN?

Let's say I have 2 tables, A and B/ B has 50million rows and A has 1 million

Will this query?
Select * from A where join_key in (Select join_key from B)
perform much worse than
 Select * from A
INNER join on A.join_key = B.join_key

Will the first query always trigger a broadcast of B?



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: [Structured Streaming] Avoiding multiple streaming queries

2018-07-24 Thread Silvio Fiorito
Any way you go you’ll increase something... ☺

Even with foreachBatch you would have to cache the DataFrame before submitting 
each batch to each topic to avoid recomputing it (see 
https://docs.databricks.com/spark/latest/structured-streaming/foreach.html#reuse-existing-batch-data-sources-with-foreachbatch)

Nothing’s free! 

Since you’re just pushing all messages to kafka, might be easier on you to just 
explode the rows and let Spark do the rest for you.

From: kant kodali 
Date: Tuesday, July 24, 2018 at 1:04 PM
To: Silvio Fiorito 
Cc: Arun Mahadevan , chandan prakash 
, Tathagata Das , 
"ymaha...@snappydata.io" , "priy...@asperasoft.com" 
, "user @spark" 
Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

@Silvio Thought about duplicating rows but dropped the idea for increasing 
memory. forEachBatch sounds Interesting!

On Mon, Jul 23, 2018 at 6:51 AM, Silvio Fiorito 
mailto:silvio.fior...@granturing.com>> wrote:
Using the current Kafka sink that supports routing based on topic column, you 
could just duplicate the rows (e.g. explode rows with different topic, key 
values). That way you’re only reading and processing the source once and not 
having to resort to custom sinks, foreachWriter, or multiple queries.

In Spark 2.4 there will be a foreachBatch method that will give you a DataFrame 
and let you write to the sink as you wish.

From: kant kodali mailto:kanth...@gmail.com>>
Date: Monday, July 23, 2018 at 4:43 AM
To: Arun Mahadevan mailto:ar...@apache.org>>
Cc: chandan prakash 
mailto:chandanbaran...@gmail.com>>, Tathagata Das 
mailto:tathagata.das1...@gmail.com>>, 
"ymaha...@snappydata.io<mailto:ymaha...@snappydata.io>" 
mailto:ymaha...@snappydata.io>>, 
"priy...@asperasoft.com<mailto:priy...@asperasoft.com>" 
mailto:priy...@asperasoft.com>>, "user @spark" 
mailto:user@spark.apache.org>>

Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

understand each row has a topic column but can we write one row to multiple 
topics?

On Thu, Jul 12, 2018 at 11:00 AM, Arun Mahadevan 
mailto:ar...@apache.org>> wrote:
What I meant was the number of partitions cannot be varied with ForeachWriter 
v/s if you were to write to each sink using independent queries. Maybe this is 
obvious.

I am not sure about the difference you highlight about the performance part. 
The commit happens once per micro batch and "close(null)" is invoked. You can 
batch your writes in the process and/or in the close. The guess the writes can 
still be atomic and decided by if “close” returns successfully or throws an 
exception.

Thanks,
Arun

From: chandan prakash 
mailto:chandanbaran...@gmail.com>>
Date: Thursday, July 12, 2018 at 10:37 AM
To: Arun Iyer mailto:ar...@apache.org>>
Cc: Tathagata Das 
mailto:tathagata.das1...@gmail.com>>, 
"ymaha...@snappydata.io<mailto:ymaha...@snappydata.io>" 
mailto:ymaha...@snappydata.io>>, 
"priy...@asperasoft.com<mailto:priy...@asperasoft.com>" 
mailto:priy...@asperasoft.com>>, "user @spark" 
mailto:user@spark.apache.org>>

Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

Thanks a lot Arun for your response.
I got your point that existing sink plugins like kafka, etc can not be used.
However I could not get the part : " you cannot scale the partitions for the 
sinks independently "
Can you please rephrase the above part ?

Also,
I guess :
using foreachwriter for multiple sinks will affect the performance because 
write will happen to a sink per record basis (after deciding a record belongs 
to which particular sink), where as in the current implementation all data 
under a RDD partition gets committed to the sink atomically in one go. Please 
correct me if I am wrong here.



Regards,
Chandan

On Thu, Jul 12, 2018 at 10:53 PM Arun Mahadevan 
mailto:ar...@apache.org>> wrote:
Yes ForeachWriter [1] could be an option If you want to write to different 
sinks. You can put your custom logic to split the data into different sinks.

The drawback here is that you cannot plugin existing sinks like Kafka and you 
need to write the custom logic yourself and you cannot scale the partitions for 
the sinks independently.

[1] 
https://spark.apache.org/docs/2.1.2/api/java/org/apache/spark/sql/ForeachWriter.html

From: chandan prakash 
mailto:chandanbaran...@gmail.com>>
Date: Thursday, July 12, 2018 at 2:38 AM
To: Tathagata Das 
mailto:tathagata.das1...@gmail.com>>, 
"ymaha...@snappydata.io<mailto:ymaha...@snappydata.io>" 
mailto:ymaha...@snappydata.io>>, 
"priy...@asperasoft.com<mailto:priy...@asperasoft.com>" 
mailto:priy...@asperasoft.com>>, "user @spark" 
mailto:user@spark.apache.org>>
Subject: Re: [Structured Streaming] Avoiding multiple str

Re: [Structured Streaming] Avoiding multiple streaming queries

2018-07-23 Thread Silvio Fiorito
Using the current Kafka sink that supports routing based on topic column, you 
could just duplicate the rows (e.g. explode rows with different topic, key 
values). That way you’re only reading and processing the source once and not 
having to resort to custom sinks, foreachWriter, or multiple queries.

In Spark 2.4 there will be a foreachBatch method that will give you a DataFrame 
and let you write to the sink as you wish.

From: kant kodali 
Date: Monday, July 23, 2018 at 4:43 AM
To: Arun Mahadevan 
Cc: chandan prakash , Tathagata Das 
, "ymaha...@snappydata.io" 
, "priy...@asperasoft.com" , 
"user @spark" 
Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

understand each row has a topic column but can we write one row to multiple 
topics?

On Thu, Jul 12, 2018 at 11:00 AM, Arun Mahadevan 
mailto:ar...@apache.org>> wrote:
What I meant was the number of partitions cannot be varied with ForeachWriter 
v/s if you were to write to each sink using independent queries. Maybe this is 
obvious.

I am not sure about the difference you highlight about the performance part. 
The commit happens once per micro batch and "close(null)" is invoked. You can 
batch your writes in the process and/or in the close. The guess the writes can 
still be atomic and decided by if “close” returns successfully or throws an 
exception.

Thanks,
Arun

From: chandan prakash 
mailto:chandanbaran...@gmail.com>>
Date: Thursday, July 12, 2018 at 10:37 AM
To: Arun Iyer mailto:ar...@apache.org>>
Cc: Tathagata Das 
mailto:tathagata.das1...@gmail.com>>, 
"ymaha...@snappydata.io" 
mailto:ymaha...@snappydata.io>>, 
"priy...@asperasoft.com" 
mailto:priy...@asperasoft.com>>, "user @spark" 
mailto:user@spark.apache.org>>

Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

Thanks a lot Arun for your response.
I got your point that existing sink plugins like kafka, etc can not be used.
However I could not get the part : " you cannot scale the partitions for the 
sinks independently "
Can you please rephrase the above part ?

Also,
I guess :
using foreachwriter for multiple sinks will affect the performance because 
write will happen to a sink per record basis (after deciding a record belongs 
to which particular sink), where as in the current implementation all data 
under a RDD partition gets committed to the sink atomically in one go. Please 
correct me if I am wrong here.



Regards,
Chandan

On Thu, Jul 12, 2018 at 10:53 PM Arun Mahadevan 
mailto:ar...@apache.org>> wrote:
Yes ForeachWriter [1] could be an option If you want to write to different 
sinks. You can put your custom logic to split the data into different sinks.

The drawback here is that you cannot plugin existing sinks like Kafka and you 
need to write the custom logic yourself and you cannot scale the partitions for 
the sinks independently.

[1] 
https://spark.apache.org/docs/2.1.2/api/java/org/apache/spark/sql/ForeachWriter.html

From: chandan prakash 
mailto:chandanbaran...@gmail.com>>
Date: Thursday, July 12, 2018 at 2:38 AM
To: Tathagata Das 
mailto:tathagata.das1...@gmail.com>>, 
"ymaha...@snappydata.io" 
mailto:ymaha...@snappydata.io>>, 
"priy...@asperasoft.com" 
mailto:priy...@asperasoft.com>>, "user @spark" 
mailto:user@spark.apache.org>>
Subject: Re: [Structured Streaming] Avoiding multiple streaming queries

Hi,
Did anyone of you thought  about writing a custom foreach sink writer which can 
decided which record should go to which sink (based on some marker in record, 
which we can possibly annotate during transformation) and then accordingly 
write to specific sink.
This will mean that:
1. every custom sink writer will have connections to as many sinks as many 
there are types of sink where records can go.
2.  every record will be read once in the single query but can be written to 
multiple sinks

Do you guys see any drawback in this approach ?
One drawback off course there is that sink is supposed to write the records as 
they are but we are inducing some intelligence here in the sink.
Apart from that any other issues do you see with this approach?

Regards,
Chandan


On Thu, Feb 15, 2018 at 7:41 AM Tathagata Das 
mailto:tathagata.das1...@gmail.com>> wrote:
Of course, you can write to multiple Kafka topics from a single query. If your 
dataframe that you want to write has a column named "topic" (along with "key", 
and "value" columns), it will write the contents of a row to the topic in that 
row. This automatically works. So the only thing you need to figure out is how 
to generate the value of that column.

This is documented - 
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka

Or am i misunderstanding the problem?

TD




On Tue, Feb 13, 2018 at 10:45 AM, Yogesh Mahajan 
mailto:ymaha...@snappydata.io>> wrote:
I had a similar issue and i think 

Re: Bulk / Fast Read and Write with MSSQL Server and Spark

2018-05-23 Thread Silvio Fiorito
Try this 
https://docs.microsoft.com/en-us/azure/sql-database/sql-database-spark-connector


From: Chetan Khatri 
Date: Wednesday, May 23, 2018 at 7:47 AM
To: user 
Subject: Bulk / Fast Read and Write with MSSQL Server and Spark

All,

I am looking for approach to do bulk read / write with MSSQL Server and Apache 
Spark 2.2 , please let me know if any library / driver for the same.

Thank you.
Chetan


Re: Custom metrics sink

2018-03-16 Thread Silvio Fiorito
Just set your custom sink in the org.apache.spark.metrics.sink namespace and 
configure metrics.properties. Use ConsoleSink as an example. Obviously since 
it’s private the API may change, but in the meantime that should work…

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala#L28


From: Christopher Piggott 
Date: Friday, March 16, 2018 at 4:09 PM
To: "user@spark.apache.org" 
Subject: Custom metrics sink

Just for fun, i want to make a stupid program that makes different frequency 
chimes as each worker becomes active.  That way you can 'hear' what the cluster 
is doing and how it's distributing work.

I thought to do this I would make a custom Sink, but the Sink and everything 
else in org.apache.spark.metrics.sink is private to spark.  What I was hoping 
to do was to just pick up the # of active workers in semi real time (once a 
second?) and have them send a UDP message somewhere... then each worker would 
be assigned to a different frequency chime.  It's just a toy, for fun.

How do you add a custom Sink when these classes don't seem to be exposed?

--C



Re: Writing a DataFrame is taking too long and huge space

2018-03-09 Thread Silvio Fiorito
Given you start with ~250MB but end up with 58GB seems like you’re generating 
quite a bit of data.

Whether you use coalesce or repartition, still writing out 58GB with one core 
is going to take a while.

Using Spark to do pre-processing but output a single file is not going to be 
very efficient since you’re asking Spark to limit its parallelization even if 
just the final stage to write data out.

What are you using downstream to read this file and why does it need to be a 
single 58GB file? Could you simply keep it in Spark to keep the pipeline 
optimized and avoid the data persistence step? For example, if you’re using R 
or Python to do some downstream processing you could just make that part of 
your pipeline vs writing it out and then reading it back in from another system.


From: Vadim Semenov 
Date: Friday, March 9, 2018 at 9:42 AM
To: "Md. Rezaul Karim" 
Cc: spark users 
Subject: Re: Writing a DataFrame is taking too long and huge space

because `coalesce` gets propagated further up in the DAG in the last stage, so 
your last stage only has one task.

You need to break your DAG so your expensive operations would be in a previous 
stage before the stage with `.coalesce(1)`

On Fri, Mar 9, 2018 at 5:23 AM, Md. Rezaul Karim 
> wrote:
Dear All,
I have a tiny CSV file, which is around 250MB. There are only 30 columns in the 
DataFrame. Now I'm trying to save the pre-processed DataFrame as an another CSV 
file on disk for later usage.
However, I'm getting pissed off as writing the resultant DataFrame is taking 
too long, which is about 4 to 5 hours. Nevertheless, the size of the file 
written on the disk is about 58GB!

Here's the sample code that I tried:
# Using repartition()
myDF.repartition(1).write.format("com.databricks.spark.csv").save("data/file.csv")

# Using coalesce()
myDF. coalesce(1).write.format("com.databricks.spark.csv").save("data/file.csv")

Any better suggestion?





Md. Rezaul Karim, BSc, MSc
Research Scientist, Fraunhofer FIT, Germany

Ph.D. Researcher, Information Systems, RWTH Aachen University, Germany

eMail: rezaul.ka...@fit.fraunhofer.de
Tel: +49 241 80-21527



--
Sent from my iPhone


Re: How to properly execute `foreachPartition` in Spark 2.2

2017-12-18 Thread Silvio Fiorito
Couldn’t you readStream from Kafka, do your transformations, map your rows from 
the transformed input into what you want need to send to Kafka, then 
writeStream to Kafka?


From: Liana Napalkova <liana.napalk...@eurecat.org>
Date: Monday, December 18, 2017 at 10:07 AM
To: Silvio Fiorito <silvio.fior...@granturing.com>, "user@spark.apache.org" 
<user@spark.apache.org>
Subject: Re: How to properly execute `foreachPartition` in Spark 2.2


I need to firstly read from Kafka queue into a DataFrame. Then I should perform 
some transformations with the data. Finally, for each row in the DataFrame I 
should conditionally apply KafkaProducer in order to send some data to Kafka.

So, I am both consuming and producing the data from/to Kafka.



____________
From: Silvio Fiorito <silvio.fior...@granturing.com>
Sent: 18 December 2017 16:00:39
To: Liana Napalkova; user@spark.apache.org
Subject: Re: How to properly execute `foreachPartition` in Spark 2.2


Why don’t you just use the Kafka sink for Spark 2.2?



https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#creating-a-kafka-sink-for-streaming-queries







From: Liana Napalkova <liana.napalk...@eurecat.org>
Date: Monday, December 18, 2017 at 9:45 AM
To: "user@spark.apache.org" <user@spark.apache.org>
Subject: How to properly execute `foreachPartition` in Spark 2.2



Hi,



I wonder how to properly execute `foreachPartition` in Spark 2.2. Below I 
explain the problem is details. I appreciate any help.



In Spark 1.6 I was doing something similar to this:



DstreamFromKafka.foreachRDD(session => {
session.foreachPartition { partitionOfRecords =>
  println("Setting the producer.")
  val producer = Utils.createProducer(mySet.value("metadataBrokerList"),

mySet.value("batchSize"),

mySet.value("lingerMS"))
  partitionOfRecords.foreach(s => {

 //...



However, I cannot find the proper way to do the similar thing in Spark 2.2. I 
tried to write my own class by extending `ForeachWriter`, but I get Task 
Serialization error when passing `KafkaProducer`.

class MyTestClass(
// val inputparams: String)
  extends Serializable
{

  val spark = SparkSession
.builder()
.appName("TEST")
//.config("spark.sql.warehouse.dir", kafkaData)
.enableHiveSupport()
.getOrCreate()

import spark.implicits._

val df: Dataset[String] = spark.readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", "localhost:9092")
 .option("subscribe", "test")
 .option("startingOffsets", "latest")
 .option("failOnDataLoss", "true")
 .load()
 .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, 
String)] // Kafka sends bytes
 .map(_._2)

val producer = // create KafkaProducer

val writer = new MyForeachWriter(producer: KafkaProducer[String,String])

val query = df
  .writeStream
  .foreach(writer)
  .start

query.awaitTermination()

spark.stop()


class MyForeachWriter extends ForeachWriter[String] with Serializable {

  var producer: KafkaProducer[String,String] = _

  def this(producer: KafkaProducer[String,String])
  {
this()
this.producer = producer
  }

  override def process(row: String): Unit =
  {
// ...
  }

  override def close(errorOrNull: Throwable): Unit = {}

  override def open(partitionId: Long, version: Long): Boolean = {
true
  }

}





Liana Napalkova, PhD

Big Data Analytics Unit







[http://cdn.eurecat.org/imgs/logomailEurecat.jpg]






T  +34 93 238 14 00 (ext. 1248)
M +34 633 426 677

liana.napalk...@eurecat.org





Carrer Camí Antic de València 54-56, Edifici A - 08005 - Barcelona
www.eurecat.org




Ascamm, BDigital, Barcelona Media i Cetemmsa ara som Eurecat





DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè no 
n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber immediatament a 
la següent adreça: le...@eurecat.org Si el destinatari d'aquest missatge no 
consent la utilització del correu electrònic via Internet i la gravació de 
missatges, li preguem que ens ho comuniqui immediatament.

DISCLAIMER: Este mensaje puede contener información confidencial. Si usted no 
es el destinatario del mensaje, por favor bórrelo y notifíquenoslo 
inmediatamente a la siguiente dirección: le...@eurecat.org Si el destinatario 
de este mensaje no consintiera la utilización del correo electrónico vía 
Internet y

Re: How to properly execute `foreachPartition` in Spark 2.2

2017-12-18 Thread Silvio Fiorito
Why don’t you just use the Kafka sink for Spark 2.2?

https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html#creating-a-kafka-sink-for-streaming-queries



From: Liana Napalkova 
Date: Monday, December 18, 2017 at 9:45 AM
To: "user@spark.apache.org" 
Subject: How to properly execute `foreachPartition` in Spark 2.2


Hi,



I wonder how to properly execute `foreachPartition` in Spark 2.2. Below I 
explain the problem is details. I appreciate any help.



In Spark 1.6 I was doing something similar to this:



DstreamFromKafka.foreachRDD(session => {
session.foreachPartition { partitionOfRecords =>
  println("Setting the producer.")
  val producer = Utils.createProducer(mySet.value("metadataBrokerList"),

mySet.value("batchSize"),

mySet.value("lingerMS"))
  partitionOfRecords.foreach(s => {

 //...



However, I cannot find the proper way to do the similar thing in Spark 2.2. I 
tried to write my own class by extending `ForeachWriter`, but I get Task 
Serialization error when passing `KafkaProducer`.

class MyTestClass(
// val inputparams: String)
  extends Serializable
{

  val spark = SparkSession
.builder()
.appName("TEST")
//.config("spark.sql.warehouse.dir", kafkaData)
.enableHiveSupport()
.getOrCreate()

import spark.implicits._

val df: Dataset[String] = spark.readStream
 .format("kafka")
 .option("kafka.bootstrap.servers", "localhost:9092")
 .option("subscribe", "test")
 .option("startingOffsets", "latest")
 .option("failOnDataLoss", "true")
 .load()
 .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, 
String)] // Kafka sends bytes
 .map(_._2)

val producer = // create KafkaProducer

val writer = new MyForeachWriter(producer: KafkaProducer[String,String])

val query = df
  .writeStream
  .foreach(writer)
  .start

query.awaitTermination()

spark.stop()


class MyForeachWriter extends ForeachWriter[String] with Serializable {

  var producer: KafkaProducer[String,String] = _

  def this(producer: KafkaProducer[String,String])
  {
this()
this.producer = producer
  }

  override def process(row: String): Unit =
  {
// ...
  }

  override def close(errorOrNull: Throwable): Unit = {}

  override def open(partitionId: Long, version: Long): Boolean = {
true
  }

}




Liana Napalkova, PhD

Big Data Analytics Unit




[http://cdn.eurecat.org/imgs/logomailEurecat.jpg]




T  +34 93 238 14 00 (ext. 1248)
M +34 633 426 677

liana.napalk...@eurecat.org




Carrer Camí Antic de València 54-56, Edifici A - 08005 - Barcelona
www.eurecat.org



Ascamm, BDigital, Barcelona Media i Cetemmsa ara som Eurecat



DISCLAIMER: Aquest missatge pot contenir informació confidencial. Si vostè no 
n'és el destinatari, si us plau, esborri'l i faci'ns-ho saber immediatament a 
la següent adreça: le...@eurecat.org Si el destinatari d'aquest missatge no 
consent la utilització del correu electrònic via Internet i la gravació de 
missatges, li preguem que ens ho comuniqui immediatament.

DISCLAIMER: Este mensaje puede contener información confidencial. Si usted no 
es el destinatario del mensaje, por favor bórrelo y notifíquenoslo 
inmediatamente a la siguiente dirección: le...@eurecat.org Si el destinatario 
de este mensaje no consintiera la utilización del correo electrónico vía 
Internet y la grabación de los mensajes, rogamos lo ponga en nuestro 
conocimiento de forma inmediata.

DISCLAIMER: Privileged/Confidential Information may be contained in this 
message. If you are not the addressee indicated in this message you should 
destroy this message, and notify us immediately to the following address: 
le...@eurecat.org. If the addressee of this message does not consent to the use 
of Internet e-mail and message recording, please notify us immediately.





Re: How to...UNION ALL of two SELECTs over different data sources in parallel?

2017-12-16 Thread Silvio Fiorito
Hi Jacek,

Just replied to the SO thread as well, but…

Yes, your first statement is correct. The DFs in the union are read in the same 
stage, so in your example where each DF has 8 partitions then you have a stage 
with 16 tasks to read the 2 DFs. There's no need to define the DF in a separate 
thread. You can verify this also in the Stage UI and looking at the Event 
Timeline. You should see the tasks across the DFs executing in parallel as 
expected.

Here’s the UI for the following example, in which case each DF only has 1 
partition (so we get a stage with 2 tasks):

spark.range(1, 100, 1, 1).write.save("/tmp/df1")
spark.range(101, 200, 1, 1).write.save("/tmp/df2")

spark.read.load("/tmp/df1").union(spark.read.load("/tmp/df2")).foreach { _ => }

[cid:image001.png@01D3766F.77848FA0]

From: Jacek Laskowski 
Date: Saturday, December 16, 2017 at 6:40 AM
To: "user @spark" 
Subject: How to...UNION ALL of two SELECTs over different data sources in 
parallel?

Hi,

I've been trying to find out the answer to the question about UNION ALL and 
SELECTs @ https://stackoverflow.com/q/47837955/1305344

> If I have Spark SQL statement of the form SELECT [...] UNION ALL SELECT 
> [...], will the two SELECT statements be executed in parallel? In my specific 
> use case the two SELECTs are querying two different database tables. In 
> contrast to what I would have expected, the Spark UI seems to suggest that 
> the two SELECT statements are performed sequentially.

How to know if the two separate SELECTs are executed in parallel or not? What 
are the tools to know it?

My answer was to use explain operator that would show...well...physical plan, 
but am not sure how to read it to know whether a query plan is going to be 
executed in parallel or not.

I then used the underlying RDD lineage (using rdd.toDebugString) hoping that 
gives me the answer, but...I'm not so sure.

For a query like the following:

val q = spark.range(1).union(spark.range(2))

I thought that since both SELECTs are codegen'ed they could be executed in 
parallel, but when switched to the RDD lineage I lost my confidence given 
there's just one single stage (!)

scala> q.rdd.toDebugString
res4: String =
(16) MapPartitionsRDD[17] at rdd at :26 []
 |   MapPartitionsRDD[16] at rdd at :26 []
 |   UnionRDD[15] at rdd at :26 []
 |   MapPartitionsRDD[11] at rdd at :26 []
 |   MapPartitionsRDD[10] at rdd at :26 []
 |   ParallelCollectionRDD[9] at rdd at :26 []
 |   MapPartitionsRDD[14] at rdd at :26 []
 |   MapPartitionsRDD[13] at rdd at :26 []
 |   ParallelCollectionRDD[12] at rdd at :26 []

What am I missing and how to be certain whether and what parts of a query are 
going to be executed in parallel?

Please help...

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


Re: Generating StructType from dataframe.printSchema

2017-10-16 Thread Silvio Fiorito
If you’re confident the schema of all files is consistent, then just infer the 
schema from a single file and reuse it when loading the whole data set:

val schema = spark.read.json(“/path/to/single/file.json”).schema

val wholeDataSet = spark.read.schema(schema).json(“/path/to/whole/datasets”)


Thanks,
Silvio

On 10/16/17, 10:20 AM, "Jeroen Miller"  wrote:

Hello Spark users,

Does anyone know if there is a way to generate the Scala code for a complex 
structure just from the output of dataframe.printSchema?

I have to analyse a significant volume of data and want to explicitly set 
the schema(s) to avoid having to read my (compressed) JSON files multiple 
times. What I am doing so far is to read a few files, print the schema, and 
manually write the code to define the corresponding StructType: tedious and 
error-prone.

I'm sure there is a much better way, but can't find anything about it.

Pointers anyone?

Jeroen


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org





Re: best spark spatial lib?

2017-10-10 Thread Silvio Fiorito
There’s a number of packages for geospatial analysis, depends on the features 
you need. Here are a few I know of and/or have used:

Magellan: https://github.com/harsha2010/magellan
MrGeo: https://github.com/ngageoint/mrgeo
GeoMesa: http://www.geomesa.org/documentation/tutorials/spark.html
GeoSpark (just heard of this): http://geospark.datasyslab.org/

Thanks,
Silvio

From: Imran Rajjad 
Date: Tuesday, October 10, 2017 at 5:22 AM
To: "user @spark" 
Subject: best spark spatial lib?

I need to have a location column inside my Dataframe so that I can do spatial 
queries and geometry operations. Are there any third-party packages that 
perform this kind of operations. I have seen a few like Geospark and megalan 
but they don't support operations where spatial and logical operators can be 
combined.

regards,
Imran

--
I.R


Re: [StructuredStreaming] multiple queries of the socket source: only one query works.

2017-08-13 Thread Silvio Fiorito
Hi Gerard,

Each query has its own distinct query plan and tracks offsets independently 
from other queries. Also, each query will generate a dynamic group id to ensure 
it gets all events and appears as a new consumer from Kafka’s perspective, 
that’s done internally to the Kafka source. That’s why there’s no option for 
group id as there was with DStreams.

https://github.com/apache/spark/blob/v2.2.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L75

Thanks,
Silvio

From: Gerard Maas 
Date: Sunday, August 13, 2017 at 9:50 AM
To: "Shixiong(Ryan) Zhu" 
Cc: Rick Moritz , user 
Subject: Re: [StructuredStreaming] multiple queries of the socket source: only 
one query works.

Hi Shixiong,

Thanks for the explanation.

In my view, this is different from the intuitive understanding of the 
Structured Streaming model [1], where incoming data is appended to an 
'unbounded table' and queries are run on that. I had expected that all queries 
would run on that 'unbounded table view', while I understand from your 
explanation that every query maintains its own 'unbounded table' view of the 
data stream. Is that correct?

How is that working in the case of Kafka? We have only one declared consumer, 
so we should observe a similar behavior. Yet, the Kafka source is able to 
deliver multiple output queries.
What is the difference?
Where could I learn more about the internal structured streaming model?

kind regards, Gerard.



[1] 
https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html#basic-concepts

On Sun, Aug 13, 2017 at 1:22 AM, Shixiong(Ryan) Zhu 
> wrote:
Spark creates one connection for each query. The behavior you observed is 
because how "nc -lk" works. If you use `netstat` to check the tcp connections, 
you will see there are two connections when starting two queries. However, "nc" 
forwards the input to only one connection.

On Fri, Aug 11, 2017 at 10:59 PM, Rick Moritz 
> wrote:
Hi Gerard, hi List,

I think what this would entail is for Source.commit to change its 
funcationality. You would need to track all streams' offsets there. Especially 
in the socket source, you already have a cache (haven't looked at Kafka's 
implementation to closely yet), so that shouldn't be the issue, if at 
start-time all streams subscribed to a source are known.
What I worry about is, that this may need an API-change, to pass a stream-ID 
into commit. Since different streams can use different Triggers, you can have 
any number of unforeseeable results, when multiple threads call commit.

I'll look into that, since I am in the progress of building a TwitterSource 
based on the socket source's general functionality, and due to the API 
restrictions there, it's even more important for multiple streams using one 
source.

What I did observe was that every query did initialize a separate source. This 
won't work so well with socket, since the socket is in use, once you try to set 
up a second one. It also won't work so well with Twitter, since usually an API 
key is limited in how often it can be used somultaneously (likely at 2).

An alternative to the socket source issue would be to open a new free socket, 
but then the user has to figure out where the source is listening.

I second Gerard's request for additional information, and confirmation of my 
theories!

Thanks,

Rick

On Fri, Aug 11, 2017 at 2:53 PM, Gerard Maas 
> wrote:
Hi,

I've been investigating this SO question: 
https://stackoverflow.com/questions/45618489/executing-separate-streaming-queries-in-spark-structured-streaming

TL;DR: when using the Socket source, trying to create multiple queries does not 
work properly, only one the first query in the start order will receive data.

This minimal example reproduces the issue:

val lines = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", "")
.option("includeTimestamp", true)
.load()

val q1 = lines.writeStream
  .outputMode("append")
  .format("console")
  .start()

val q2 = lines.withColumn("foo", lit("foo")).writeStream
  .outputMode("append")
  .format("console")
  .start()

Sample output (spark shell):

Batch: 0
---
+-+---+
|value|  timestamp|
+-+---+
|  aaa|2017-08-11 23:37:59|
+-+---+

---
Batch: 1
---
+-+---+
|value|  timestamp|
+-+---+
|  aaa|2017-08-11 23:38:00|
+-+---+

q1.stop

scala> ---
Batch: 0
---

Re: Does spark 2.1.0 structured streaming support jdbc sink?

2017-04-09 Thread Silvio Fiorito
JDBC sink is not in 2.1. You can see here for an example implementation using 
the ForEachWriter sink instead: 
https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html


From: Hemanth Gudela 
Date: Sunday, April 9, 2017 at 4:30 PM
To: "user@spark.apache.org" 
Subject: Does spark 2.1.0 structured streaming support jdbc sink?

Hello Everyone,
I am new to Spark, especially spark streaming.

I am trying to read an input stream from Kafka, perform windowed aggregations 
in spark using structured streaming, and finally write aggregates to a sink.

-  MySQL as an output sink doesn’t seem to be an option, because this 
block of code throws an error

streamingDF.writeStream.format("jdbc").start("jdbc:mysql…”)

ava.lang.UnsupportedOperationException: Data source jdbc does not support 
streamed writing

This is strange because, 
this 
document shows that jdbc is supported as an output sink!



-  Parquet doesn’t seem to be an option, because it doesn’t support 
“complete” output mode, but “append” only. As I’m preforming windows 
aggregations in spark streaming, the output mode has to be complete, and cannot 
be “append”


-  Memory and console sinks are good for debugging, but are not 
suitable for production jobs.

So, please correct me if I’m missing something in my code to enable jdbc output 
sink.
If jdbc output sink is not option, please suggest me an alternative output sink 
that suits my needs better.

Or since structured streaming is still ‘alpha’, should I resort to spark 
dstreams to achieve my use case described above.
Please suggest.

Thanks in advance,
Hemanth


Re: TDD in Spark

2017-01-15 Thread Silvio Fiorito
You should check out Holden’s excellent spark-testing-base package: 
https://github.com/holdenk/spark-testing-base


From: A Shaikh 
Date: Sunday, January 15, 2017 at 1:14 PM
To: User 
Subject: TDD in Spark

Whats the most popular Testing approach for Spark App. I am looking something 
in the line of TDD.


Re: How to connect Tableau to databricks spark?

2017-01-09 Thread Silvio Fiorito
Also, meant to add the link to the docs: 
https://docs.databricks.com/user-guide/faq/tableau.html


From: Silvio Fiorito <silvio.fior...@granturing.com>
Date: Monday, January 9, 2017 at 2:59 PM
To: Raymond Xie <xie3208...@gmail.com>, user <user@spark.apache.org>
Subject: Re: How to connect Tableau to databricks spark?

Hi Raymond,

Are you using a Spark 2.0 or 1.6 cluster? With Spark 2.0 it’s just a matter of 
entering the hostname of your Databricks environment, the HTTP path from the 
cluster page, and your Databricks credentials.

Thanks,
Silvio

From: Raymond Xie <xie3208...@gmail.com>
Date: Sunday, January 8, 2017 at 10:30 PM
To: user <user@spark.apache.org>
Subject: How to connect Tableau to databricks spark?

I want to do some data analytics work by leveraging Databricks spark platform 
and connect my Tableau desktop to it for data visualization.

Does anyone ever make it? I've trying to follow the instruction below but not 
successful?

https://docs.cloud.databricks.com/docs/latest/databricks_guide/01%20Databricks%20Overview/14%20Third%20Party%20Integrations/01%20Setup%20JDBC%20or%20ODBC.html


I got an error message in Tableau's attempt to connect:

Unable to connect to the server 
"ec2-35-160-128-113.us-west-2.compute.amazonaws.com<http://ec2-35-160-128-113.us-west-2.compute.amazonaws.com>".
 Check that the server is running and that you have access privileges to the 
requested database.

"ec2-35-160-128-113.us-west-2.compute.amazonaws.com<http://ec2-35-160-128-113.us-west-2.compute.amazonaws.com>"
 is the hostname of a EC2 instance I just created on AWS, I may have some 
missing there though as I am new to AWS.

I am not sure that is related to account issue, I was using my Databricks 
account in Tableau to connect it.

Thank you very much. Any clue is appreciated.


Sincerely yours,


Raymond


Re: How to connect Tableau to databricks spark?

2017-01-09 Thread Silvio Fiorito
Hi Raymond,

Are you using a Spark 2.0 or 1.6 cluster? With Spark 2.0 it’s just a matter of 
entering the hostname of your Databricks environment, the HTTP path from the 
cluster page, and your Databricks credentials.

Thanks,
Silvio

From: Raymond Xie 
Date: Sunday, January 8, 2017 at 10:30 PM
To: user 
Subject: How to connect Tableau to databricks spark?

I want to do some data analytics work by leveraging Databricks spark platform 
and connect my Tableau desktop to it for data visualization.

Does anyone ever make it? I've trying to follow the instruction below but not 
successful?

https://docs.cloud.databricks.com/docs/latest/databricks_guide/01%20Databricks%20Overview/14%20Third%20Party%20Integrations/01%20Setup%20JDBC%20or%20ODBC.html


I got an error message in Tableau's attempt to connect:

Unable to connect to the server 
"ec2-35-160-128-113.us-west-2.compute.amazonaws.com".
 Check that the server is running and that you have access privileges to the 
requested database.

"ec2-35-160-128-113.us-west-2.compute.amazonaws.com"
 is the hostname of a EC2 instance I just created on AWS, I may have some 
missing there though as I am new to AWS.

I am not sure that is related to account issue, I was using my Databricks 
account in Tableau to connect it.

Thank you very much. Any clue is appreciated.


Sincerely yours,


Raymond


Re: Cluster deploy mode driver location

2016-11-22 Thread Silvio Fiorito
Hi Saif!


Unfortunately, I don't think this is possible for YARN driver-cluster mode. 
Regarding the JARs you're referring to, can you place them on HDFS so you can 
then have them in a central location and refer to them that way for 
dependencies?


http://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management


Thanks,

Silvio


From: saif.a.ell...@wellsfargo.com 
Sent: Monday, November 21, 2016 2:04:06 PM
To: user@spark.apache.org
Subject: Cluster deploy mode driver location

Hello there,

I have a Spark program in 1.6.1, however, when I submit it to cluster, it 
randomly picks the driver.

I know there is a driver specification option, but along with it it is 
mandatory to define many other options I am not familiar with. The trouble is, 
the .jars I am launching need to be available at the driver host, and I would 
like to have this jars in just a specific host, which I like it to be the 
driver.

Any help?

Thanks!
Saif



Re: Joining to a large, pre-sorted file

2016-11-13 Thread Silvio Fiorito
Hi Stuart,

Yes that's the query plan but if you take a look at my screenshot it skips the 
first stage since the datasets are co-partitioned.

Thanks,
Silvio


From: Stuart White <stuart.whi...@gmail.com>
Sent: Saturday, November 12, 2016 11:20:28 AM
To: Silvio Fiorito
Cc: user@spark.apache.org
Subject: Re: Joining to a large, pre-sorted file

Hi Silvio,

Thanks very much for the response!

I'm pretty new at reading explain plans, so maybe I'm misunderstanding what I'm 
seeing.

Remember my goal is to sort master, write it out, later read it back in and 
have Spark "remember" that it's sorted, so I can do joins and Spark will not 
sort it again.

Looking at the explain plan for the example job you provided, it looks to me 
like Spark is re-sorted master after reading it back in.  See the attachment 
for the Sort step I'm referring to.

Am I misunderstanding the explain plan?

Thanks again!

On Sat, Nov 12, 2016 at 9:34 AM, Silvio Fiorito 
<silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>> wrote:

Hi Stuart,



You don’t need the sortBy or sortWithinPartitions.



https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/594325853373464/879901972425732/6861830365114179/latest.html





This is what the job should look like:



[cid:image001.png@01D23CD0.56997F50]

On 11/12/16, 8:40 AM, "Stuart White" 
<stuart.whi...@gmail.com<mailto:stuart.whi...@gmail.com>> wrote:



Thanks for the reply.



I understand that I need to use bucketBy() to write my master file,

but I still can't seem to make it work as expected.  Here's a code

example for how I'm writing my master file:



Range(0, 100)

  .map(i => (i, s"master_$i"))

  .toDF("key", "value")

  .write

  .format("json")

  .bucketBy(3, "key")

  .sortBy("key")

  .saveAsTable("master")



And here's how I'm reading it later and attempting to join to a

transaction dataset:



val master = spark

  .read

  .format("json")

  .json("spark-warehouse/master")

  .cache



val transaction = Range(0, 100)

  .map(i => (i, s"transaction_$i"))

  .toDF("key", "value")

  .repartition(3, 'key)

  .sortWithinPartitions('key)

  .cache



val results = master.join(transaction, "key")



When I call results.explain(), I see that it is sorting both datasets

before sending them through SortMergeJoin.



== Physical Plan ==

*Project [key#0L, value#1, value#53]

+- *SortMergeJoin [key#0L], [cast(key#52 as bigint)], Inner

  :- *Sort [key#0L ASC], false, 0

   :  +- Exchange hashpartitioning(key#0L, 200)

   : +- *Filter isnotnull(key#0L)

   :+- InMemoryTableScan [key#0L, value#1], [isnotnull(key#0L)]

   :   :  +- InMemoryRelation [key#0L, value#1], true, 1,

StorageLevel(disk, memory, deserialized, 1 replicas)

   :   : :  +- *Scan json [key#0L,value#1] Format: JSON,

InputPaths: file:/work/spark-warehouse/master, PartitionFilters: [],

PushedFilters: [], ReadSchema: struct<key:bigint,value:string>

   +- *Sort [cast(key#52 as bigint) ASC], false, 0

  +- Exchange hashpartitioning(cast(key#52 as bigint), 200)

 +- InMemoryTableScan [key#52, value#53]

:  +- InMemoryRelation [key#52, value#53], true, 1,

StorageLevel(disk, memory, deserialized, 1 replicas)

: :  +- *Sort [key#52 ASC], false, 0

: : +- Exchange hashpartitioning(key#52, 3)

: :+- LocalTableScan [key#52, value#53]



Here are my thoughts:

1. I think I'm probably reading the master file back into memory

incorrectly.  I think maybe I should be reading it as a Hive table

rather than just a plain json file, but I can't seem to figure out how

to do that.

2. I don't understand exactly when partition counts/bucket counts are

important.  For example, in this example, at the time it's written,

master has 1 partition and is written into 3 buckets, resulting in 3

files being written out.  Later when I generated my transaction

dataset, I repartitioned it into 3 partitions.  Was that the correct

thing to do (3 transaction partitions == 3 master buckets)?  Or should

I have repartitioned master into 3 partitions before writing

(resulting in 9 files if I still create 3 buckets)?  Basically, I

don't understand how partitions and buckets should be handled.



So, I feel like I'm close, but there are a few ways in which I don't

understand how these pieces are supposed to fit together.  If this is

explained somewhere, with a simple example, that would be great.





Re: Joining to a large, pre-sorted file

2016-11-12 Thread Silvio Fiorito
Hi Stuart,



You don’t need the sortBy or sortWithinPartitions.



https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/594325853373464/879901972425732/6861830365114179/latest.html





This is what the job should look like:



[cid:image001.png@01D23CD0.56997F50]

On 11/12/16, 8:40 AM, "Stuart White"  wrote:



Thanks for the reply.



I understand that I need to use bucketBy() to write my master file,

but I still can't seem to make it work as expected.  Here's a code

example for how I'm writing my master file:



Range(0, 100)

  .map(i => (i, s"master_$i"))

  .toDF("key", "value")

  .write

  .format("json")

  .bucketBy(3, "key")

  .sortBy("key")

  .saveAsTable("master")



And here's how I'm reading it later and attempting to join to a

transaction dataset:



val master = spark

  .read

  .format("json")

  .json("spark-warehouse/master")

  .cache



val transaction = Range(0, 100)

  .map(i => (i, s"transaction_$i"))

  .toDF("key", "value")

  .repartition(3, 'key)

  .sortWithinPartitions('key)

  .cache



val results = master.join(transaction, "key")



When I call results.explain(), I see that it is sorting both datasets

before sending them through SortMergeJoin.



== Physical Plan ==

*Project [key#0L, value#1, value#53]

+- *SortMergeJoin [key#0L], [cast(key#52 as bigint)], Inner

  :- *Sort [key#0L ASC], false, 0

   :  +- Exchange hashpartitioning(key#0L, 200)

   : +- *Filter isnotnull(key#0L)

   :+- InMemoryTableScan [key#0L, value#1], [isnotnull(key#0L)]

   :   :  +- InMemoryRelation [key#0L, value#1], true, 1,

StorageLevel(disk, memory, deserialized, 1 replicas)

   :   : :  +- *Scan json [key#0L,value#1] Format: JSON,

InputPaths: file:/work/spark-warehouse/master, PartitionFilters: [],

PushedFilters: [], ReadSchema: struct

   +- *Sort [cast(key#52 as bigint) ASC], false, 0

  +- Exchange hashpartitioning(cast(key#52 as bigint), 200)

 +- InMemoryTableScan [key#52, value#53]

:  +- InMemoryRelation [key#52, value#53], true, 1,

StorageLevel(disk, memory, deserialized, 1 replicas)

: :  +- *Sort [key#52 ASC], false, 0

: : +- Exchange hashpartitioning(key#52, 3)

: :+- LocalTableScan [key#52, value#53]



Here are my thoughts:

1. I think I'm probably reading the master file back into memory

incorrectly.  I think maybe I should be reading it as a Hive table

rather than just a plain json file, but I can't seem to figure out how

to do that.

2. I don't understand exactly when partition counts/bucket counts are

important.  For example, in this example, at the time it's written,

master has 1 partition and is written into 3 buckets, resulting in 3

files being written out.  Later when I generated my transaction

dataset, I repartitioned it into 3 partitions.  Was that the correct

thing to do (3 transaction partitions == 3 master buckets)?  Or should

I have repartitioned master into 3 partitions before writing

(resulting in 9 files if I still create 3 buckets)?  Basically, I

don't understand how partitions and buckets should be handled.



So, I feel like I'm close, but there are a few ways in which I don't

understand how these pieces are supposed to fit together.  If this is

explained somewhere, with a simple example, that would be great.




Re: Joining to a large, pre-sorted file

2016-11-10 Thread Silvio Fiorito
You want to look at the bucketBy option when you save the master file out. That 
way it will be pre-partitioned by the join column, eliminating the shuffle on 
the larger file.



From: Stuart White 
Date: Thursday, November 10, 2016 at 8:39 PM
To: Jörn Franke 
Cc: "user@spark.apache.org" 
Subject: Re: Joining to a large, pre-sorted file

Yes.  In my original question, when I said I wanted to pre-sort the master 
file, I should have said "pre-sort and pre-partition the file".
Years ago, I did this with Hadoop MapReduce.  I pre-sorted/partitioned the 
master file into N partitions.  Then, when a transaction file would arrive, I 
would sort/partition the transaction file on the join key into N partitions.  
Then I could perform what was called a mapside join.
Basically, I want to do the same thing in Spark.  And it looks like all the 
pieces to accomplish this exist, but I can't figure out how to connect all the 
dots.  It seems like this functionality is pretty new so there aren't a lot of 
examples available.

On Thu, Nov 10, 2016 at 7:33 PM, Jörn Franke 
> wrote:
Can you split the files beforehand in several files (e.g. By the column you do 
the join on?) ?

On 10 Nov 2016, at 23:45, Stuart White 
> wrote:
I have a large "master" file (~700m records) that I frequently join smaller 
"transaction" files to.  (The transaction files have 10's of millions of 
records, so too large for a broadcast join).
I would like to pre-sort the master file, write it to disk, and then, in 
subsequent jobs, read the file off disk and join to it without having to 
re-sort it.  I'm using Spark SQL, and my understanding is that the Spark 
Catalyst Optimizer will choose an optimal join algorithm if it is aware that 
the datasets are sorted.  So, the trick is to make the optimizer aware that the 
master file is already sorted.
I think SPARK-12394 provides 
this functionality, but I can't seem to put the pieces together for how to use 
it.
Could someone possibly provide a simple example of how to:

  1.  Sort a master file by a key column and write it to disk in such a way 
that its "sorted-ness" is preserved.
  2.  In a later job, read a transaction file, sort/partition it as necessary.  
Read the master file, preserving its sorted-ness.  Join the two DataFrames in 
such a way that the master rows are not sorted again.
Thanks!



Re: Physical plan for windows and joins - how to know which is faster?

2016-11-09 Thread Silvio Fiorito
Hi Jacek,


I haven't played with 2.1.0 yet, so not sure how much more optimized Window 
functions are compared to 1.6 and 2.0.


However, one thing I do see in the self-join is a broadcast. So there's going 
to be a need broadcast the results of the groupBy out to the executors before 
it can do the join. In both cases it's shuffling the data (for the groupBy or 
the Window).


Have you tried running both queries to see? Would be interesting to test it on 
varying data volumes as well (e.g. what if there's no broadcast).


Thanks,

Silvio


From: Jacek Laskowski 
Sent: Wednesday, November 9, 2016 7:36:47 AM
To: user
Subject: Physical plan for windows and joins - how to know which is faster?

Hi,

While playing around with Spark 2.1.0-SNAPSHOT (built today) and
explain'ing two queries with WindowSpec and inner join I found the
following plans and am wondering if you could help me to judge which
query could be faster.

What else would you ask for to be able to answer the question of one
being more efficient than the other?

Just by looking at the Spark's "stack traces" of the queries one could
say that windowed variant (first one) is gonna be faster (as there are
less physical operators) yet top-level Window operator is not
codegened so it might be misleading.

I'd appreciate your help to get me better at reading such trees. Thanks!

scala> mydf.withColumn("sum(id)", sum('id) over byId3).explain
== Physical Plan ==
Window [sum(cast(id#15 as bigint)) windowspecdefinition(ID % 3#60,
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS
sum(id)#665L], [ID % 3#60]
+- *Sort [ID % 3#60 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(ID % 3#60, 200)
  +- LocalTableScan [id#15, multiplied#16, ID % 3#60]

scala> mydf.join(mydf.groupBy("ID % 3").sum("id"), "ID % 3").explain
== Physical Plan ==
*Project [ID % 3#60, id#15, multiplied#16, sum(id)#677L]
+- *BroadcastHashJoin [ID % 3#60], [ID % 3#681], Inner, BuildRight
   :- *Project [_1#12 AS id#15, _2#13 AS multiplied#16, (_1#12 % 3) AS
ID % 3#60]
   :  +- *Filter isnotnull((_1#12 % 3))
   : +- LocalTableScan [_1#12, _2#13]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0,
int, true] as bigint)))
  +- *HashAggregate(keys=[ID % 3#681], functions=[sum(cast(id#15
as bigint))])
 +- Exchange hashpartitioning(ID % 3#681, 200)
+- *HashAggregate(keys=[ID % 3#681],
functions=[partial_sum(cast(id#15 as bigint))])
   +- *Project [_1#12 AS id#15, (_1#12 % 3) AS ID % 3#681]
  +- *Filter isnotnull((_1#12 % 3))
 +- LocalTableScan [_1#12, _2#13]

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How do I convert a data frame to broadcast variable?

2016-11-03 Thread Silvio Fiorito
Hi Nishit,


Yes the JDBC connector supports predicate pushdown and column pruning. So any 
selection you make on the dataframe will get materialized in the query sent via 
JDBC.


You should be able to verify this by looking at the physical query plan:


val df = sqlContext.jdbc().select($"col1", $"col2")

df.explain(true)


Or if you can easily log queries submitted to your database then you can view 
the specific query.


Thanks,

Silvio


From: Jain, Nishit 
Sent: Thursday, November 3, 2016 12:32:48 PM
To: Denny Lee; user@spark.apache.org
Subject: Re: How do I convert a data frame to broadcast variable?

Thanks Denny! That does help. I will give that a shot.

Question: If I am going this route, I am wondering how can I only read few 
columns of a table (not whole table) from JDBC as data frame.
This function from data frame reader does not give an option to read only 
certain columns:
def jdbc(url: String, table: String, predicates: Array[String], 
connectionProperties: Properties): DataFrame

On the other hand if I want to create a JDBCRdd I can specify a select query 
(instead of full table):
new JdbcRDD(sc: SparkContext, getConnection: () ? Connection, sql: String, 
lowerBound: Long, upperBound: Long, numPartitions: Int, mapRow: (ResultSet) ? T 
= JdbcRDD.resultSetToObjectArray)(implicit arg0: ClassTag[T])

May be if I do df.select(col1, col2)  on data frame created via a table, will 
spark be smart enough to fetch only two columns not entire table?
Any way to test this?


From: Denny Lee >
Date: Thursday, November 3, 2016 at 10:59 AM
To: "Jain, Nishit" >, 
"user@spark.apache.org" 
>
Subject: Re: How do I convert a data frame to broadcast variable?

If you're able to read the data in as a DataFrame, perhaps you can use a 
BroadcastHashJoin so that way you can join to that table presuming its small 
enough to distributed?  Here's a handy guide on a BroadcastHashJoin: 
https://docs.cloud.databricks.com/docs/latest/databricks_guide/index.html#04%20SQL,%20DataFrames%20%26%20Datasets/05%20BroadcastHashJoin%20-%20scala.html

HTH!


On Thu, Nov 3, 2016 at 8:53 AM Jain, Nishit 
> wrote:
I have a lookup table in HANA database. I want to create a spark broadcast 
variable for it.
What would be the suggested approach? Should I read it as an data frame and 
convert data frame into broadcast variable?

Thanks,
Nishit


Re: DataFrame defined within conditional IF ELSE statement

2016-09-18 Thread Silvio Fiorito
Oh, sorry it was supposed to be sys.error, not sys.err



From: Mich Talebzadeh <mich.talebza...@gmail.com>
Date: Sunday, September 18, 2016 at 5:23 PM
To: Silvio Fiorito <silvio.fior...@granturing.com>
Cc: "user @spark" <user@spark.apache.org>
Subject: Re: DataFrame defined within conditional IF ELSE statement

Thanks Silvio.

This is what I ended up with

val df = option match {
  case 1 => {
println("option = 1")
val df = spark.read.option("header", 
false).csv("hdfs://rhes564:9000/data/prices/prices.*")
val df2 = df.map(p => columns(p(0).toString.toInt,p(1).toString, 
p(2).toString,p(3).toString))
df2
  }
  case 2 => {
println("option = 2")
val df2 = 
spark.table("test.marketData").select('TIMECREATED,'SECURITY,'PRICE)
df2
  }
  case 3 => {
println("option = 3")
spark.table("test.marketDataParquet").select('TIMECREATED,'SECURITY,'PRICE)
df2
  }
  case _ => {
println ("No valid option provide")
sys.exit
  }
}

For one reason or other the following

case _ => sys.err(“no valid option provided”)

Threw error!




Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.



On 18 September 2016 at 21:04, Silvio Fiorito 
<silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>> wrote:
Hi Mich,

That’s because df2 is only within scope in the if statements.

Try this:

val df = option match {
  case 1 => {
println("option = 1")
val df = spark.read.option("header", 
false).csv("hdfs://rhes564:9000/data/prices/prices.*")
val df2 = df.map(p => columns(p(0).toString.toInt,p(1).toString, 
p(2).toString,p(3).toString))
df2
  }
  case 2 => spark.table("test.marketData").select('TIMECREATED,'SECURITY,'PRICE)
  case 3 => 
spark.table("test.marketDataParquet").select('TIMECREATED,'SECURITY,'PRICE)
  case _ => sys.err(“no valid option provided”)
}

df.printSchema()


Thanks,
Silvio

From: Mich Talebzadeh 
<mich.talebza...@gmail.com<mailto:mich.talebza...@gmail.com>>
Date: Saturday, September 17, 2016 at 4:18 PM
To: "user @spark" <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: DataFrame defined within conditional IF ELSE statement

In Spark 2 this gives me an error in a conditional  IF ELSE statement

I recall seeing the same in standard SQL

I am doing a test for different sources (text file, ORC or Parquet) to be read 
in dependent on value of var option

I wrote this

import org.apache.spark.sql.functions._
import java.util.Calendar
import org.joda.time._
var option = 1
val today = new DateTime()
val minutes = -15
val  minutesago =  today.plusMinutes(minutes).toString.toString.substring(11,19)
val date = java.time.LocalDate.now.toString
val hour = java.time.LocalTime.now.toString
case class columns(INDEX: Int, TIMECREATED: String, SECURITY: String, PRICE: 
String)

if(option == 1 ) {
   println("option = 1")
   val df = spark.read.option("header", 
false).csv("hdfs://rhes564:9000/data/prices/prices.*")
   val df2 = df.map(p => columns(p(0).toString.toInt,p(1).toString, 
p(2).toString,p(3).toString))
   df2.printSchema
} else if (option == 2) {
val df2 = 
spark.table("test.marketData").select('TIMECREATED,'SECURITY,'PRICE)
} else if (option == 3) {
val df2 = 
spark.table("test.marketDataParquet").select('TIMECREATED,'SECURITY,'PRICE)
} else {
println("no valid option provided")
sys.exit(0)
}

With option 1 selected it goes through and shows this

option = 1
root
 |-- INDEX: integer (nullable = true)
 |-- TIMECREATED: string (nullable = true)
 |-- SECURITY: string (nullable = true)
 |-- PRICE: string (nullable = true)

But when I try to do df2.printSchema OUTSEDE of the LOOP, it comes back with 
error

scala> df2.printSchema
:31: error: not found: value df2
   df2.printSchema
   ^
I can define a stud df2 before IF ELSE statement. Is that the best way of 
dealing with it?

Thanks


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.





Re: DataFrame defined within conditional IF ELSE statement

2016-09-18 Thread Silvio Fiorito
Hi Mich,

That’s because df2 is only within scope in the if statements.

Try this:

val df = option match {
  case 1 => {
println("option = 1")
val df = spark.read.option("header", 
false).csv("hdfs://rhes564:9000/data/prices/prices.*")
val df2 = df.map(p => columns(p(0).toString.toInt,p(1).toString, 
p(2).toString,p(3).toString))
df2
  }
  case 2 => spark.table("test.marketData").select('TIMECREATED,'SECURITY,'PRICE)
  case 3 => 
spark.table("test.marketDataParquet").select('TIMECREATED,'SECURITY,'PRICE)
  case _ => sys.err(“no valid option provided”)
}

df.printSchema()


Thanks,
Silvio

From: Mich Talebzadeh 
Date: Saturday, September 17, 2016 at 4:18 PM
To: "user @spark" 
Subject: DataFrame defined within conditional IF ELSE statement

In Spark 2 this gives me an error in a conditional  IF ELSE statement

I recall seeing the same in standard SQL

I am doing a test for different sources (text file, ORC or Parquet) to be read 
in dependent on value of var option

I wrote this

import org.apache.spark.sql.functions._
import java.util.Calendar
import org.joda.time._
var option = 1
val today = new DateTime()
val minutes = -15
val  minutesago =  today.plusMinutes(minutes).toString.toString.substring(11,19)
val date = java.time.LocalDate.now.toString
val hour = java.time.LocalTime.now.toString
case class columns(INDEX: Int, TIMECREATED: String, SECURITY: String, PRICE: 
String)

if(option == 1 ) {
   println("option = 1")
   val df = spark.read.option("header", 
false).csv("hdfs://rhes564:9000/data/prices/prices.*")
   val df2 = df.map(p => columns(p(0).toString.toInt,p(1).toString, 
p(2).toString,p(3).toString))
   df2.printSchema
} else if (option == 2) {
val df2 = 
spark.table("test.marketData").select('TIMECREATED,'SECURITY,'PRICE)
} else if (option == 3) {
val df2 = 
spark.table("test.marketDataParquet").select('TIMECREATED,'SECURITY,'PRICE)
} else {
println("no valid option provided")
sys.exit(0)
}

With option 1 selected it goes through and shows this

option = 1
root
 |-- INDEX: integer (nullable = true)
 |-- TIMECREATED: string (nullable = true)
 |-- SECURITY: string (nullable = true)
 |-- PRICE: string (nullable = true)

But when I try to do df2.printSchema OUTSEDE of the LOOP, it comes back with 
error

scala> df2.printSchema
:31: error: not found: value df2
   df2.printSchema
   ^
I can define a stud df2 before IF ELSE statement. Is that the best way of 
dealing with it?

Thanks


Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




Re: what contribute to Task Deserialization Time

2016-07-22 Thread Silvio Fiorito
Are you referencing member variables or other objects of your driver in your 
transformations? Those would have to be serialized and shipped to each executor 
when that job kicks off.

On 7/22/16, 8:54 AM, "Jacek Laskowski"  wrote:

Hi,

I can't specifically answer your question, but my understanding of
Task Deserialization Time is that it's time to deserialize a
serialized task from the driver before it gets run. See
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L236
and on.

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark http://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski


On Thu, Jul 21, 2016 at 11:35 AM, patcharee  wrote:
> Hi,
>
> I'm running a simple job (reading sequential file and collect data at the
> driver) with yarn-client mode. When looking at the history server UI, Task
> Deserialization Time of tasks are quite different (5 ms to 5 s). What
> contribute to this Task Deserialization Time?
>
> Thank you in advance!
>
> Patcharee
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org





Re: Deploying ML Pipeline Model

2016-07-01 Thread Silvio Fiorito
Hi Rishabh,

My colleague, Richard Garris from Databricks, actually just gave a talk last 
night at the Bay Area Spark Meetup on ML model deployment. The slides and 
recording should be up soon, you should be able to find a link here: 
http://www.meetup.com/spark-users/events/231574440/

Thanks,
Silvio

From: Rishabh Bhardwaj 
Date: Friday, July 1, 2016 at 7:54 AM
To: user 
Cc: "d...@spark.apache.org" 
Subject: Deploying ML Pipeline Model

Hi All,

I am looking for ways to deploy a ML Pipeline model in production .
Spark has already proved to be a one of the best framework for model training 
and creation, but once the ml pipeline model is ready how can I deploy it 
outside spark context ?
MLlib model has toPMML method but today Pipeline model can not be saved to 
PMML. There are some frameworks like MLeap which are trying to abstract 
Pipeline Model and provide ML Pipeline Model deployment outside spark 
context,but currently they don't have most of the ml transformers and 
estimators.
I am looking for related work going on this area.
Any pointers will be helpful.

Thanks,
Rishabh.


Re: Strategies for propery load-balanced partitioning

2016-06-03 Thread Silvio Fiorito
Hi Saif!

When you say this happens with spark-csv, are the files gzipped by any chance? 
GZip is non-splittable so if you’re seeing skew simply from loading data it 
could be you have some extremely large gzip files. So for a single stage job 
you will have those tasks lagging compared to the smaller gzips. As you already 
said, the option there would be to repartition at the expense of shuffling. If 
you’re seeing this with parquet files, what do the individual part-* files look 
like (size, compression type, etc.)?

Thanks,
Silvio

From: "saif.a.ell...@wellsfargo.com" 
Date: Friday, June 3, 2016 at 8:31 AM
To: "user@spark.apache.org" 
Subject: Strategies for propery load-balanced partitioning

Hello everyone!

I was noticing that, when reading parquet files or actually any kind of source 
data frame data (spark-csv, etc), default partinioning is not fair.
Action tasks usually act very fast on some partitions and very slow on some 
others, and frequently, even fast on all but last partition (which looks like 
it reads +50% of the data input size).

I notice that each task is loading some portion of the data, say 1024MB chunks, 
and some task loading 20+GB of data.

Applying repartition strategies solve this issue properly and general 
performance is increased considerably, but for very large dataframes, 
repartitioning is a costly process.

In short, what are the available strategies or configurations that help reading 
from disk or hdfs with proper executor-data-distribution??

If this needs to be more specific, I am strictly focused on PARQUET files rom 
HDFS. I know there are some MIN

Really appreciate,
Saif



Re: HiveContext standalone => without a Hive metastore

2016-05-26 Thread Silvio Fiorito
Hi Gerard,

I’ve never had an issue using the HiveContext without a hive-site.xml 
configured. However, one issue you may have is if multiple users are starting 
the HiveContext from the same path, they’ll all be trying to store the default 
Derby metastore in the same location. Also, if you want them to be able to 
persist permanent table metadata for SparkSQL then you’ll want to set up a true 
metastore.

The other thing it could be is Hive dependency collisions from the classpath, 
but that shouldn’t be an issue since you said it’s standalone (not a Hadoop 
distro right?).

Thanks,
Silvio

From: Gerard Maas 
Date: Thursday, May 26, 2016 at 5:28 AM
To: spark users 
Subject: HiveContext standalone => without a Hive metastore

Hi,

I'm helping some folks setting up an analytics cluster with  Spark.
They want to use the HiveContext to enable the Window functions on 
DataFrames(*) but they don't have any Hive installation, nor they need one at 
the moment (if not necessary for this feature)

When we try to create a Hive context, we get the following error:

> val sqlContext = new org.apache.spark.sql.hive.HiveContext(sparkContext)
java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate 
org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
   at 
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)

Is my HiveContext failing b/c it wants to connect to an unconfigured  Hive 
Metastore?

Is there  a way to instantiate a HiveContext for the sake of Window support 
without an underlying Hive deployment?

The docs are explicit in saying that that is should be the case: [1]

"To use a HiveContext, you do not need to have an existing Hive setup, and all 
of the data sources available to aSQLContext are still available. HiveContext 
is only packaged separately to avoid including all of Hive’s dependencies in 
the default Spark build."

So what is the right way to address this issue? How to instantiate a 
HiveContext with spark running on a HDFS cluster without Hive deployed?


Thanks a lot!

-Gerard.

(*) The need for a HiveContext to use Window functions is pretty obscure. The 
only documentation of this seems to be a runtime exception: 
"org.apache.spark.sql.AnalysisException: Could not resolve window function 
'max'. Note that, using window functions currently requires a HiveContext;"

[1] 
http://spark.apache.org/docs/latest/sql-programming-guide.html#getting-started


Re: Using HiveContext.set in multipul threads

2016-05-24 Thread Silvio Fiorito
If you’re using DataFrame API you can achieve that by simply using (or not) the 
“partitionBy” method on the DataFrameWriter:

val originalDf = ….

val df1 = originalDf….
val df2 = originalDf…

df1.write.partitionBy(”col1”).save(…)

df2.write.save(…)

From: Amir Gershman 
Date: Tuesday, May 24, 2016 at 7:01 AM
To: "user@spark.apache.org" 
Subject: Using HiveContext.set in multipul threads

Hi,

I have a DataFrame I compute from a long chain of transformations.
I cache it, and then perform two additional transformations on it.
I use two Futures - each Future will insert the content of one of the above 
Dataframe to a different hive table.
One Future must SET hive.exec.dynamic.partition=true and the other must set it 
to false.



How can I run both INSERT commands in parallel, but guarantee each runs with 
its own settings?



If I don't use the same HiveContext then the initial long chain of 
transformations which I cache is not reusable between HiveContexts. If I use 
the same HiveContext, race conditions between threads my cause one INSERT to 
execute with the wrong config.



Re: "collecting" DStream data

2016-05-15 Thread Silvio Fiorito
Hi Daniel,

Given your example, “arr” is defined on the driver, but the “foreachRDD” 
function is run on the executors. If you want to collect the results of the 
RDD/DStream down to the driver you need to call RDD.collect. You have to be 
careful though that you have enough memory on the driver JVM to hold the 
results, otherwise you’ll have an OOM exception. Also, you can’t update the 
value of a broadcast variable, since it’s immutable.

Thanks,
Silvio

From: Daniel Haviv 
>
Date: Sunday, May 15, 2016 at 6:23 AM
To: user >
Subject: "collecting" DStream data

Hi,
I have a DStream I'd like to collect and broadcast it's values.
To do so I've created a mutable HashMap which i'm filling with foreachRDD but 
when I'm checking it, it remains empty. If I use ArrayBuffer it works as 
expected.

This is my code:

val arr = scala.collection.mutable.HashMap.empty[String,String]
MappedVersionsToBrodcast.foreachRDD(r=> { r.foreach(r=> { arr+=r})   } )


What am I missing here?

Thank you,
Daniel



RE: Apache Flink

2016-04-17 Thread Silvio Fiorito
Actually there were multiple responses to it on the GitHub project, including a 
PR to improve the Spark code, but they weren’t acknowledged.


From: Ovidiu-Cristian MARCU
Sent: Sunday, April 17, 2016 7:48 AM
To: andy petrella
Cc: Mich Talebzadeh; Ascot 
Moss; Ted Yu; user 
@spark
Subject: Re: Apache Flink

You probably read this benchmark at Yahoo, any comments from Spark?
https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at


On 17 Apr 2016, at 12:41, andy petrella 
> wrote:

Just adding one thing to the mix: `that the latency for streaming data is 
eliminated` is insane :-D

On Sun, Apr 17, 2016 at 12:19 PM Mich Talebzadeh 
> wrote:
 It seems that Flink argues that the latency for streaming data is eliminated 
whereas with Spark RDD there is this latency.

I noticed that Flink does not support interactive shell much like Spark shell 
where you can add jars to it to do kafka testing. The advice was to add the 
streaming Kafka jar file to CLASSPATH but that does not work.

Most Flink documentation also rather sparce with the usual example of word 
count which is not exactly what you want.

Anyway I will have a look at it further. I have a Spark Scala streaming Kafka 
program that works fine in Spark and I want to recode it using Scala for Flink 
with Kafka but have difficulty importing and testing libraries.

Cheers

Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



On 17 April 2016 at 02:41, Ascot Moss 
> wrote:
I compared both last month, seems to me that Flink's MLLib is not yet ready.

On Sun, Apr 17, 2016 at 12:23 AM, Mich Talebzadeh 
> wrote:
Thanks Ted. I was wondering if someone is using both :)

Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



On 16 April 2016 at 17:08, Ted Yu 
> wrote:
Looks like this question is more relevant on flink mailing list :-)

On Sat, Apr 16, 2016 at 8:52 AM, Mich Talebzadeh 
> wrote:
Hi,

Has anyone used Apache Flink instead of Spark by any chance

I am interested in its set of libraries for Complex Event Processing.

Frankly I don't know if it offers far more than Spark offers.

Thanks

Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com






--
andy



RE: Spark Metrics : Why is the Sink class declared private[spark] ?

2016-04-02 Thread Silvio Fiorito
In the meantime you can simply define your custom metric source in the 
org.apache.spark package.


From: Walid Lezzar
Sent: Saturday, April 2, 2016 4:23 AM
To: Saisai Shao
Cc: spark users
Subject: Re: Spark Metrics : Why is the Sink class declared private[spark] ?

This is great ! Hope this jira will be resolved for the next version of spark

Thanks.

Le 2 avr. 2016 ? 01:07, Saisai Shao 
> a ?crit :

There's a JIRA (https://issues.apache.org/jira/browse/SPARK-14151) about it, 
please take a look.

Thanks
Saisai

On Sat, Apr 2, 2016 at 6:48 AM, Walid Lezzar 
> wrote:
Hi,

I looked into the spark code at how spark report metrics using the 
MetricsSystem class. I've seen that the spark MetricsSystem class when 
instantiated parses the metrics.properties file, tries to find the sinks class 
name and load them dinamically. It would be great to implement my own sink by 
inheriting from the org.apache.spark.metrics.sinks.Sink class but 
unfortunately, this class has been declared private[spark] ! So it is not 
possible to inverit from it ! Why is that ? Is this gonna change in future 
spark versions ?
-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org




Re: Spark Metrics Framework?

2016-03-25 Thread Silvio Fiorito
Hi Mike,

Sorry got swamped with work and didn’t get a chance to reply.

I misunderstood what you were trying to do. I thought you were just looking to 
create custom metrics vs looking for the existing Hadoop Output Format counters.

I’m not familiar enough with the Hadoop APIs but I think it would require a 
change to the 
SparkHadoopWriter<https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala>
 class since it generates the JobContext which is required to read the 
counters. Then it could publish the counters to the Spark metrics system.

I would suggest going ahead and submitting a JIRA request if there isn’t one 
already.

Thanks,
Silvio

From: Mike Sukmanowsky 
<mike.sukmanow...@gmail.com<mailto:mike.sukmanow...@gmail.com>>
Date: Friday, March 25, 2016 at 10:48 AM
To: Silvio Fiorito 
<silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Spark Metrics Framework?

Pinging again - any thoughts?

On Wed, 23 Mar 2016 at 09:17 Mike Sukmanowsky 
<mike.sukmanow...@gmail.com<mailto:mike.sukmanow...@gmail.com>> wrote:
Thanks Ted and Silvio. I think I'll need a bit more hand holding here, sorry. 
The way we use ES Hadoop is in pyspark via 
org.elasticsearch.hadoop.mr.EsOutputFormat in a saveAsNewAPIHadoopFile call. 
Given the Hadoop interop, I wouldn't assume that the EsOutputFormat 
class<https://github.com/elastic/elasticsearch-hadoop/blob/master/mr/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java>
 could be modified to define a new Source and register it via 
MetricsSystem.createMetricsSystem. This feels like a good feature request for 
Spark actually: "Support Hadoop Counters in Input/OutputFormats as Spark 
metrics" but I wanted some feedback first to see if that makes sense.

That said, some of the custom RDD 
classes<https://github.com/elastic/elasticsearch-hadoop/tree/master/spark/core/main/scala/org/elasticsearch/spark/rdd>
 could probably be modified to register a new Source when they perform 
reading/writing from/to Elasticsearch.

On Tue, 22 Mar 2016 at 15:17 Silvio Fiorito 
<silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>> wrote:
Hi Mike,

It’s been a while since I worked on a custom Source but I think all you need to 
do is make your Source in the org.apache.spark package.

Thanks,
Silvio

From: Mike Sukmanowsky 
<mike.sukmanow...@gmail.com<mailto:mike.sukmanow...@gmail.com>>
Date: Tuesday, March 22, 2016 at 3:13 PM
To: Silvio Fiorito 
<silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Spark Metrics Framework?

The Source class is 
private<https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/metrics/source/Source.scala#L22-L25>
 to the spark package and any new Sources added to the metrics registry must be 
of type 
Source<https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L144-L152>.
 So unless I'm mistaken, we can't define a custom source. I linked to 1.4.1 
code, but the same is true in 1.6.1.

On Mon, 21 Mar 2016 at 12:05 Silvio Fiorito 
<silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>> wrote:
You could use the metric sources and sinks described here: 
http://spark.apache.org/docs/latest/monitoring.html#metrics

If you want to push the metrics to another system you can define a custom sink. 
You can also extend the metrics by defining a custom source.

From: Mike Sukmanowsky 
<mike.sukmanow...@gmail.com<mailto:mike.sukmanow...@gmail.com>>
Date: Monday, March 21, 2016 at 11:54 AM
To: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Spark Metrics Framework?

We make extensive use of the elasticsearch-hadoop library for Hadoop/Spark. In 
trying to troubleshoot our Spark applications, it'd be very handy to have 
access to some of the many 
metrics<https://www.elastic.co/guide/en/elasticsearch/hadoop/current/metrics.html>
 that the library makes available when running in map reduce mode. The 
library's author 
noted<https://discuss.elastic.co/t/access-es-hadoop-stats-from-spark/44913> 
that Spark doesn't offer any kind of a similar metrics API where by these 
metrics could be reported or aggregated on.

Are there any plans to bring a metrics framework similar to Hadoop's Counter 
system to Spark or is there an alternative means for us to grab metrics exposed 
when using Hadoop APIs to load/save RDDs?

Thanks,
Mike


Re: Spark Metrics Framework?

2016-03-22 Thread Silvio Fiorito
Hi Mike,

It’s been a while since I worked on a custom Source but I think all you need to 
do is make your Source in the org.apache.spark package.

Thanks,
Silvio

From: Mike Sukmanowsky 
<mike.sukmanow...@gmail.com<mailto:mike.sukmanow...@gmail.com>>
Date: Tuesday, March 22, 2016 at 3:13 PM
To: Silvio Fiorito 
<silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: Spark Metrics Framework?

The Source class is 
private<https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/metrics/source/Source.scala#L22-L25>
 to the spark package and any new Sources added to the metrics registry must be 
of type 
Source<https://github.com/apache/spark/blob/v1.4.1/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala#L144-L152>.
 So unless I'm mistaken, we can't define a custom source. I linked to 1.4.1 
code, but the same is true in 1.6.1.

On Mon, 21 Mar 2016 at 12:05 Silvio Fiorito 
<silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>> wrote:
You could use the metric sources and sinks described here: 
http://spark.apache.org/docs/latest/monitoring.html#metrics

If you want to push the metrics to another system you can define a custom sink. 
You can also extend the metrics by defining a custom source.

From: Mike Sukmanowsky 
<mike.sukmanow...@gmail.com<mailto:mike.sukmanow...@gmail.com>>
Date: Monday, March 21, 2016 at 11:54 AM
To: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Spark Metrics Framework?

We make extensive use of the elasticsearch-hadoop library for Hadoop/Spark. In 
trying to troubleshoot our Spark applications, it'd be very handy to have 
access to some of the many 
metrics<https://www.elastic.co/guide/en/elasticsearch/hadoop/current/metrics.html>
 that the library makes available when running in map reduce mode. The 
library's author 
noted<https://discuss.elastic.co/t/access-es-hadoop-stats-from-spark/44913> 
that Spark doesn't offer any kind of a similar metrics API where by these 
metrics could be reported or aggregated on.

Are there any plans to bring a metrics framework similar to Hadoop's Counter 
system to Spark or is there an alternative means for us to grab metrics exposed 
when using Hadoop APIs to load/save RDDs?

Thanks,
Mike


Re: Work out date column in CSV more than 6 months old (datediff or something)

2016-03-21 Thread Silvio Fiorito
There’s a months_between function you could use, as well:

df.filter(months_between(current_date, $”Payment Date”) > 6).show

From: Mich Talebzadeh 
>
Date: Monday, March 21, 2016 at 5:53 PM
To: "user @spark" >
Subject: Work out date column in CSV more than 6 months old (datediff or 
something)

Hi,

For test purposes I am reading in a simple csv file as follows:

val df = 
sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", 
"true").option("header", "true").load("/data/stg/table2")
df: org.apache.spark.sql.DataFrame = [Invoice Number: string, Payment date: 
string, Net: string, VAT: string, Total: string]

For this work I am interested in column "Payment Date" > 6 months old from today

Data is stored in the following format for that column

scala> df.select("Payment date").take(2)
res40: Array[org.apache.spark.sql.Row] = Array([10/02/2014], [17/02/2014])

stored as 'dd/MM/'

The current time I get as

scala> val today = sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(), 
'dd/MM/') ").collect.apply(0).getString(0)
today: String = 21/03/2016


So I want to filter the csv file

scala>  df.filter(col("Payment date") < lit(today)).show(2)
+--++-+-+-+
|Invoice Number|Payment date|  Net|  VAT|Total|
+--++-+-+-+
|   360|  10/02/2014|?2,500.00|?0.00|?2,500.00|
|   361|  17/02/2014|?2,500.00|?0.00|?2,500.00|
+--++-+-+-+


However, I want to use datediff() function here not just < today!


Obviously one can store the file as a table and use SQL on it. However, I want 
to see if there are other ways using fp.

Thanks

Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com




Re: Spark Metrics Framework?

2016-03-21 Thread Silvio Fiorito
You could use the metric sources and sinks described here: 
http://spark.apache.org/docs/latest/monitoring.html#metrics

If you want to push the metrics to another system you can define a custom sink. 
You can also extend the metrics by defining a custom source.

From: Mike Sukmanowsky 
>
Date: Monday, March 21, 2016 at 11:54 AM
To: "user@spark.apache.org" 
>
Subject: Spark Metrics Framework?

We make extensive use of the elasticsearch-hadoop library for Hadoop/Spark. In 
trying to troubleshoot our Spark applications, it'd be very handy to have 
access to some of the many 
metrics
 that the library makes available when running in map reduce mode. The 
library's author 
noted 
that Spark doesn't offer any kind of a similar metrics API where by these 
metrics could be reported or aggregated on.

Are there any plans to bring a metrics framework similar to Hadoop's Counter 
system to Spark or is there an alternative means for us to grab metrics exposed 
when using Hadoop APIs to load/save RDDs?

Thanks,
Mike


Re: Get the number of days dynamically in with Column

2016-03-20 Thread Silvio Fiorito
I’m not entirely sure if this is what you’re asking, but you could just use the 
datediff function:

val df2 = df.withColumn("ID”, datediff($"end", $"start”))

If you want it formatted as {n}D then:

val df2 = df.withColumn("ID", concat(datediff($"end", $"start"),lit("D")))

Thanks,
Silvio

From: Divya Gehlot >
Date: Sunday, March 20, 2016 at 11:42 PM
To: "user @spark" >
Subject: Get the number of days dynamically in with Column

I have a time stamping table which has data like
No of Days ID
11D
22D



and so on till 30 days

Have another Dataframe with
start date and end date
I need to get the difference between these two days and get the ID from Time 
Stamping table and do With Column .

The tedious solution is


val dfTimeStamping = df.withColumn("ID",when(Diff between Start date and 
Enddate ,"1D").when(Diff between Start date and Enddate ,"2D")).. have to do 
till 30 days .

How can I do it dynamically ?


Thanks,
Divya





RE: Extra libs in executor classpath

2016-03-19 Thread Silvio Fiorito
Could you publish it as a library (to an internal repo) then you can simply use 
the “--packages" option? Also will help with versioning as you make changes, 
that way you’re not having to manually ship JARs around to your machines and 
users.



From: Леонид Поляков
Sent: Wednesday, March 16, 2016 7:22 AM
To: user@spark.apache.org
Subject: Extra libs in executor classpath

Hello, guys!

I’ve been developing a kind of framework on top of spark, and my idea is to 
bundle the framework jars and some extra configs with the spark and pass it to 
other developers for their needs. So that devs can use this bundle and run 
usual spark stuff but with extra flavor that framework will add.

I’m trying to figure out how to properly set up the driver/executor classpath, 
so that framework classes are always loaded when you use the bundle.
I put framework libs in /lib folder right now, but will switch to something 
more specific later. I’m putting next spark-defaults.conf into my bundle:

spark.executor.extraClassPath /home/user/Apps/spark-bundled/lib/*
spark.driver.extraClassPath lib/*

And this seem to work, but I want to get rid of the absolute path from 
spark.executor.extraClassPath and use something relative, or spark home 
somehow, since libs are right there under /lib
I’ve tried these settings for executor, and they do not work:
spark.executor.extraClassPath $SPARK_HOME/lib/*
spark.executor.extraClassPath lib/*

I’ve found out that work directory for started workers is like 
$SPARK_HOME/work/app-20160316070310-0002/0, so this works:
spark.executor.extraClassPath ../../../lib/*

But looks cheaty and not stable.

Could you help me with this issue? Maybe there are some placeholders that I can 
use in configs?
Let me know if you need any worker/master/driver logs

P.S. driver does not work if I am not in $SPARK_HOME when I execute  
spark-submit, e.g. if I do
cd bin
./spark-submit …
Then driver classpath is relative to /bin and now lib/* or ./lib/* in classpath 
does not work, so I need $SPARK_HOME for driver as well

Thanks, Leonid


RE: Spark on YARN memory consumption

2016-03-11 Thread Silvio Fiorito
Hi Jan,



Yes what you’re seeing is due to YARN container memory overhead. Also, 
typically the memory increments for YARN containers is 1GB.



This gives a good overview: 
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/



Thanks,

Silvio







From: Jan Štěrba
Sent: Friday, March 11, 2016 8:27 AM
To: User
Subject: Spark on YARN memory consumption



Hello,

I am exprimenting with tuning an on demand spark-cluster on top of our
cloudera hadoop. I am running Cloudera 5.5.2 with Spark 1.5 right now
and I am running spark in yarn-client mode.

Right now my main experimentation is about spark.executor.memory
property and I have noticed a strange behaviour.

When I set spark.executor.memory=512M several things happen:
- per each executor a container with 1GB memory is requested and
assigned from YARN
- in Spark UI I can see that each executor has 256M memory

So what I am seeing is that spark requests 2x the memory but the
executor has only 1/4 of what has been requested. Why is that?

Thanks.

--
Jan Sterba
https://twitter.com/honzasterba | http://flickr.com/honzasterba |
http://500px.com/honzasterba

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Achieving 700 Spark SQL Queries Per Second

2016-03-10 Thread Silvio Fiorito
Very cool stuff Evan. Thanks for your work on this and sharing!







From: Evan Chan
Sent: Thursday, March 10, 2016 1:38 PM
To: user@spark.apache.org
Subject: Achieving 700 Spark SQL Queries Per Second



Hey folks,

I just saw a recent thread on here (but can't find it anymore) on
using Spark as a web-speed query engine.   I want to let you guys know
that this is definitely possible!   Most folks don't realize how
low-latency Spark can actually be.  Please check out my blog post
below on achieving 700 queries per second in Spark:

http://velvia.github.io/Spark-Concurrent-Fast-Queries/

Would love your feedback.

thanks,
Evan

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Using dynamic allocation and shuffle service in Standalone Mode

2016-03-08 Thread Silvio Fiorito
There’s a script to start it up under sbin, start-shuffle-service.sh. Run that 
on each of your worker nodes.



From: Yuval Itzchakov<mailto:yuva...@gmail.com>
Sent: Tuesday, March 8, 2016 2:17 PM
To: Silvio Fiorito<mailto:silvio.fior...@granturing.com>; 
user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Using dynamic allocation and shuffle service in Standalone Mode

Actually, I assumed that setting the flag in the spark job would turn on the 
shuffle service in the workers. I now understand that assumption was wrong.

Is there any way to set the flag via the driver? Or must I manually set it via 
spark-env.sh on each worker?


On Tue, Mar 8, 2016, 20:14 Silvio Fiorito 
<silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>> wrote:

You’ve started the external shuffle service on all worker nodes, correct? Can 
you confirm they’re still running and haven’t exited?







From: Yuval.Itzchakov<mailto:yuva...@gmail.com>
Sent: Tuesday, March 8, 2016 12:41 PM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Using dynamic allocation and shuffle service in Standalone Mode



Hi,
I'm using Spark 1.6.0, and according to the documentation, dynamic
allocation and spark shuffle service should be enabled.

When I submit a spark job via the following:

spark-submit \
--master  \
--deploy-mode cluster \
--executor-cores 3 \
--conf "spark.streaming.backpressure.enabled=true" \
--conf "spark.dynamicAllocation.enabled=true" \
--conf "spark.dynamicAllocation.minExecutors=2" \
--conf "spark.dynamicAllocation.maxExecutors=24" \
--conf "spark.shuffle.service.enabled=true" \
--conf "spark.executor.memory=8g" \
--conf "spark.driver.memory=10g" \
--class SparkJobRunner
/opt/clicktale/entityCreator/com.clicktale.ai.entity-creator-assembly-0.0.2.jar

I'm seeing error logs from the workers being unable to connect to the
shuffle service:

16/03/08 17:33:15 ERROR storage.BlockManager: Failed to connect to external
shuffle server, will retry 2 more times after waiting 5 seconds...
java.io.IOException: Failed to connect to 
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
at
org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181)
at
org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141)
at
org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:211)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at
org.apache.spark.storage.BlockManager.registerWithExternalShuffleServer(BlockManager.scala:208)
at 
org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:194)
at org.apache.spark.executor.Executor.(Executor.scala:85)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:83)
at
org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at
org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I verified all relevant ports are open. Has anyone else experienced such a
failure?

Yuval.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-dynamic-allocation-and-shuffle-service-in-Standalone-Mode-tp26430.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>



RE: Using dynamic allocation and shuffle service in Standalone Mode

2016-03-08 Thread Silvio Fiorito
You’ve started the external shuffle service on all worker nodes, correct? Can 
you confirm they’re still running and haven’t exited?







From: Yuval.Itzchakov
Sent: Tuesday, March 8, 2016 12:41 PM
To: user@spark.apache.org
Subject: Using dynamic allocation and shuffle service in Standalone Mode



Hi,
I'm using Spark 1.6.0, and according to the documentation, dynamic
allocation and spark shuffle service should be enabled.

When I submit a spark job via the following:

spark-submit \
--master  \
--deploy-mode cluster \
--executor-cores 3 \
--conf "spark.streaming.backpressure.enabled=true" \
--conf "spark.dynamicAllocation.enabled=true" \
--conf "spark.dynamicAllocation.minExecutors=2" \
--conf "spark.dynamicAllocation.maxExecutors=24" \
--conf "spark.shuffle.service.enabled=true" \
--conf "spark.executor.memory=8g" \
--conf "spark.driver.memory=10g" \
--class SparkJobRunner
/opt/clicktale/entityCreator/com.clicktale.ai.entity-creator-assembly-0.0.2.jar

I'm seeing error logs from the workers being unable to connect to the
shuffle service:

16/03/08 17:33:15 ERROR storage.BlockManager: Failed to connect to external
shuffle server, will retry 2 more times after waiting 5 seconds...
java.io.IOException: Failed to connect to 
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
at
org.apache.spark.network.client.TransportClientFactory.createUnmanagedClient(TransportClientFactory.java:181)
at
org.apache.spark.network.shuffle.ExternalShuffleClient.registerWithShuffleServer(ExternalShuffleClient.java:141)
at
org.apache.spark.storage.BlockManager$$anonfun$registerWithExternalShuffleServer$1.apply$mcVI$sp(BlockManager.scala:211)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at
org.apache.spark.storage.BlockManager.registerWithExternalShuffleServer(BlockManager.scala:208)
at 
org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:194)
at org.apache.spark.executor.Executor.(Executor.scala:85)
at
org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:83)
at
org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
at
org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I verified all relevant ports are open. Has anyone else experienced such a
failure?

Yuval.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-dynamic-allocation-and-shuffle-service-in-Standalone-Mode-tp26430.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Unit testing framework for Spark Jobs?

2016-03-02 Thread Silvio Fiorito
Please check out the following for some good resources:

https://github.com/holdenk/spark-testing-base


https://spark-summit.org/east-2016/events/beyond-collect-and-parallelize-for-tests/





On 3/2/16, 12:54 PM, "SRK"  wrote:

>Hi,
>
>What is a good unit testing framework for Spark batch/streaming jobs? I have
>core spark, spark sql with dataframes and streaming api getting used. Any
>good framework to cover unit tests for these APIs?
>
>Thanks!
>
>
>
>--
>View this message in context: 
>http://apache-spark-user-list.1001560.n3.nabble.com/Unit-testing-framework-for-Spark-Jobs-tp26380.html
>Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Save DataFrame to Hive Table

2016-03-01 Thread Silvio Fiorito
Just do:

val df = sqlContext.read.load(“/path/to/parquets/*”)

If you do df.explain it’ll show the multiple input paths.

From: "andres.fernan...@wellsfargo.com" 
>
Date: Tuesday, March 1, 2016 at 12:00 PM
To: "user@spark.apache.org" 
>
Subject: RE: Save DataFrame to Hive Table

Good day colleagues. Quick question on Parquet and Dataframes. Right now I have 
the 4 parquet files stored in HDFS under the same path:
/path/to/parquets/parquet1, /path/to/parquets/parquet2, 
/path/to/parquets/parquet3, /path/to/parquets/parquet4…
I want to perform a union on all this parquet files. Is there any other way of 
doing this different to DataFrame’s unionAll?

Thank you very much in advance.

Andres Fernandez

From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
Sent: Tuesday, March 01, 2016 1:50 PM
To: Jeff Zhang
Cc: Yogesh Vyas; user@spark.apache.org
Subject: Re: Save DataFrame to Hive Table

Hi

It seems that your code is not specifying which database is your table created

Try this

scala> val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
scala> // Choose a database
scala> HiveContext.sql("show databases").show

scala> HiveContext.sql("use test")  // I chose test database
scala> HiveContext.sql("CREATE TABLE IF NOT EXISTS TableName (key INT, value 
STRING)")
scala> HiveContext.sql("desc TableName").show
++-+---+
|col_name|data_type|comment|
++-+---+
| key|  int|   null|
|   value|   string|   null|
++-+---+

// create a simple DF

Seq((1, "Mich"), (2, "James"))
val b = a.toDF

//Let me keep it simple. Create a temporary table and do a simple 
insert/select. No need to convolute it

b.registerTempTable("tmp")

// Rember this temporaryTable is created in sql context NOT HiveContext/ So 
HiveContext will NOT see that table
//
HiveContext.sql("INSERT INTO TableName SELECT * FROM tmp")
org.apache.spark.sql.AnalysisException: no such table tmp; line 1 pos 36

// This will work

sql("INSERT INTO TableName SELECT * FROM tmp")

sql("select count(1) from TableName").show
+---+
|_c0|
+---+
|  2|
+---+

HTH



Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw



http://talebzadehmich.wordpress.com



On 1 March 2016 at 06:33, Jeff Zhang 
> wrote:

The following line does not execute the sql so the table is not created.  Add 
.show() at the end to execute the sql.

hiveContext.sql("CREATE TABLE IF NOT EXISTS TableName (key INT, value STRING)")

On Tue, Mar 1, 2016 at 2:22 PM, Yogesh Vyas 
> wrote:
Hi,

I have created a DataFrame in Spark, now I want to save it directly
into the hive table. How to do it.?

I have created the hive table using following hiveContext:

HiveContext hiveContext = new 
org.apache.spark.sql.hive.HiveContext(sc.sc());
hiveContext.sql("CREATE TABLE IF NOT EXISTS TableName (key
INT, value STRING)");

I am using the following to save it into hive:
DataFrame.write().mode(SaveMode.Append).insertInto("TableName");

But it gives the error:
Exception in thread "main" java.lang.RuntimeException: Table Not
Found: TableName
at scala.sys.package$.error(package.scala:27)
at 
org.apache.spark.sql.catalyst.analysis.SimpleCatalog.lookupRelation(Catalog.scala:139)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:257)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:266)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:264)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:264)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:254)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:83)
at 
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:80)
at 

Re: Union Parquet, DataFrame

2016-03-01 Thread Silvio Fiorito
Just replied to your other email, but here’s the same thing:

Just do:

val df = sqlContext.read.load(“/path/to/parquets/*”)

If you do df.explain it’ll show the multiple input paths.

From: "andres.fernan...@wellsfargo.com" 
>
Date: Tuesday, March 1, 2016 at 12:01 PM
To: "user@spark.apache.org" 
>
Subject: Union Parquet, DataFrame

Good day colleagues. Quick question on Parquet and Dataframes. Right now I have 
the 4 parquet files stored in HDFS under the same path:
/path/to/parquets/parquet1, /path/to/parquets/parquet2, 
/path/to/parquets/parquet3, /path/to/parquets/parquet4…
I want to perform a union on all this parquet files. Is there any other way of 
doing this different to DataFrame’s unionAll?

Thank you very much in advance.

Andres Fernandez



RE: Use maxmind geoip lib to process ip on Spark/Spark Streaming

2016-02-29 Thread Silvio Fiorito

I’ve used the code below with SparkSQL. I was using this with Spark 1.4 but 
should still be good with 1.6. In this case I have a UDF to do the lookup, but 
for Streaming you’d just have a lambda to apply in a map function, so no UDF 
wrapper.

import org.apache.spark.sql.functions._
import java.io.File
import java.net.InetAddress
import com.maxmind.geoip2._

object GeoIPLookup {
@transient lazy val reader = {
val db = new File("/data/meetup/GeoLite2-City.mmdb")

val reader = new DatabaseReader.Builder(db).build()

reader
}
}

case class Location(latitude: Double, longitude: Double)
case class Geo(city: String, country: String, loc: Location)

val iplookup = udf { (s: String) => {
   val ip = InetAddress.getByName(s)

   val record = GeoIPLookup.reader.city(ip)

   val city = record.getCity
   val country = record.getCountry
   val location = record.getLocation

   Geo(city.getName, country.getName, Location(location.getLatitude, 
location.getLongitude))
} }

val withGeo = df.withColumn("geo", iplookup(column("ip")))


From: Zhun Shen
Sent: Monday, February 29, 2016 11:17 PM
To: romain sagean
Cc: user
Subject: Re: Use maxmind geoip lib to process ip on Spark/Spark Streaming

Hi,

I check the dependencies and fix the bug. It work well on Spark but not on 
Spark Streaming. So I think I still need find another way to do it.


On Feb 26, 2016, at 2:47 PM, Zhun Shen 
> wrote:

Hi,

thanks for you advice. I tried your method, I use Gradle to manage my scala 
code. 'com.snowplowanalytics:scala-maxmind-iplookups:0.2.0’ was imported in 
Gradle.

spark version: 1.6.0
scala: 2.10.4
scala-maxmind-iplookups: 0.2.0

I run my test, got the error as below:
java.lang.NoClassDefFoundError: scala/collection/JavaConversions$JMapWrapperLike
at com.snowplowanalytics.maxmind.iplookups.IpLookups$.apply(IpLookups.scala:53)




On Feb 24, 2016, at 1:10 AM, romain sagean 
> wrote:

I realize I forgot the sbt part

resolvers += "SnowPlow Repo" at 
"http://maven.snplow.com/releases/;

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.3.0",
  "com.snowplowanalytics"  %% "scala-maxmind-iplookups"  % "0.2.0"
)

otherwise, to process streaming log I use logstash with kafka as input. You can 
set kafka as output if you need to do some extra calculation with spark.

Le 23/02/2016 15:07, Romain Sagean a écrit :
Hi,
I use maxmind geoip with spark (no streaming). To make it work you should use 
mapPartition. I don't know if something similar exist for spark streaming.

my code for reference:

  def parseIP(ip:String, ipLookups: IpLookups): List[String] = {
val lookupResult = ipLookups.performLookups(ip)
val countryName = (lookupResult._1).map(_.countryName).getOrElse("")
val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("")
val latitude = (lookupResult._1).map(_.latitude).getOrElse(None).toString
val longitude = (lookupResult._1).map(_.longitude).getOrElse(None).toString
return List(countryName, city, latitude, longitude)
  }
sc.addFile("/home/your_user/GeoLiteCity.dat")

//load your data in my_data rdd

my_data.mapPartitions { rows =>
val ipLookups = IpLookups(geoFile = 
Some(SparkFiles.get("GeoLiteCity.dat")))
rows.map { row => row ::: parseIP(row(3),ipLookups) }
}

Le 23/02/2016 14:28, Zhun Shen a écrit :
Hi all,

Currently, I sent nginx log to Kafka then I want to use Spark Streaming to 
parse the log and enrich the IP info with geoip libs from Maxmind.

I found this one  
https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git, but spark streaming 
throw error and told that the lib was not Serializable.

Does anyone there way to process the IP info in Spark Streaming? Many thanks.







Re: map operation clears custom partitioner

2016-02-22 Thread Silvio Fiorito
You can use mapValues to ensure partitioning is not lost.

From: Brian London >
Date: Monday, February 22, 2016 at 1:21 PM
To: user >
Subject: map operation clears custom partitioner

It appears that when a custom partitioner is applied in a groupBy operation, it 
is not propagated through subsequent non-shuffle operations.  Is this 
intentional? Is there any way to carry custom partitioning through maps?

I've uploaded a gist that exhibits the behavior. 
https://gist.github.com/BrianLondon/c3c3355d1971971f3ec6


RE: coalesce and executor memory

2016-02-14 Thread Silvio Fiorito
Actually, rereading your email I see you're caching. But ‘cache’ uses 
MEMORY_ONLY. Do you see errors about losing partitions as your job is running?

Are you sure you need to cache if you're just saving to disk? Can you try the 
coalesce without cache?


From: Christopher Brady<mailto:christopher.br...@oracle.com>
Sent: Friday, February 12, 2016 8:34 PM
To: Koert Kuipers<mailto:ko...@tresata.com>; Silvio 
Fiorito<mailto:silvio.fior...@granturing.com>
Cc: user<mailto:user@spark.apache.org>
Subject: Re: coalesce and executor memory

Thank you for the responses. The map function just changes the format of the 
record slightly, so I don't think that would be the cause of the memory problem.

So if I have 3 cores per executor, I need to be able to fit 3 partitions per 
executor within whatever I specify for the executor memory? Is there a way I 
can programmatically find a number of partitions I can coalesce down to without 
running out of memory? Is there some documentation where this is explained?


On 02/12/2016 05:10 PM, Koert Kuipers wrote:
in spark, every partition needs to fit in the memory available to the core 
processing it.

as you coalesce you reduce number of partitions, increasing partition size. at 
some point the partition no longer fits in memory.

On Fri, Feb 12, 2016 at 4:50 PM, Silvio Fiorito 
<silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>> wrote:
Coalesce essentially reduces parallelism, so fewer cores are getting more 
records. Be aware that it could also lead to loss of data locality, depending 
on how far you reduce. Depending on what you’re doing in the map operation, it 
could lead to OOM errors. Can you give more details as to what the code for the 
map looks like?




On 2/12/16, 1:13 PM, "Christopher Brady" 
<<mailto:christopher.br...@oracle.com>christopher.br...@oracle.com<mailto:christopher.br...@oracle.com>>
 wrote:

>Can anyone help me understand why using coalesce causes my executors to
>crash with out of memory? What happens during coalesce that increases
>memory usage so much?
>
>If I do:
>hadoopFile -> sample -> cache -> map -> saveAsNewAPIHadoopFile
>
>everything works fine, but if I do:
>hadoopFile -> sample -> coalesce -> cache -> map -> saveAsNewAPIHadoopFile
>
>my executors crash with out of memory exceptions.
>
>Is there any documentation that explains what causes the increased
>memory requirements with coalesce? It seems to be less of a problem if I
>coalesce into a larger number of partitions, but I'm not sure why this
>is. How would I estimate how much additional memory the coalesce requires?
>
>Thanks.
>
>-
>To unsubscribe, e-mail: 
>user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
>For additional commands, e-mail: <mailto:user-h...@spark.apache.org> 
>user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>
>




Re: Allowing parallelism in spark local mode

2016-02-12 Thread Silvio Fiorito
You’ll want to setup the FAIR scheduler as described here: 
https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application

From: yael aharon >
Date: Friday, February 12, 2016 at 2:00 PM
To: "user@spark.apache.org" 
>
Subject: Allowing parallelism in spark local mode

Hello,
I have an application that receives requests over HTTP and uses spark in local 
mode to process the requests. Each request is running in its own thread.
It seems that spark is queueing the jobs, processing them one at a time. When 2 
requests arrive simultaneously, the processing time for each of them is almost 
doubled.
I tried setting spark.default.parallelism, spark.executor.cores, 
spark.driver.cores but that did not change the time in a meaningful way.

Am I missing something obvious?
thanks, Yael



Re: coalesce and executor memory

2016-02-12 Thread Silvio Fiorito
Coalesce essentially reduces parallelism, so fewer cores are getting more 
records. Be aware that it could also lead to loss of data locality, depending 
on how far you reduce. Depending on what you’re doing in the map operation, it 
could lead to OOM errors. Can you give more details as to what the code for the 
map looks like?




On 2/12/16, 1:13 PM, "Christopher Brady"  wrote:

>Can anyone help me understand why using coalesce causes my executors to 
>crash with out of memory? What happens during coalesce that increases 
>memory usage so much?
>
>If I do:
>hadoopFile -> sample -> cache -> map -> saveAsNewAPIHadoopFile
>
>everything works fine, but if I do:
>hadoopFile -> sample -> coalesce -> cache -> map -> saveAsNewAPIHadoopFile
>
>my executors crash with out of memory exceptions.
>
>Is there any documentation that explains what causes the increased 
>memory requirements with coalesce? It seems to be less of a problem if I 
>coalesce into a larger number of partitions, but I'm not sure why this 
>is. How would I estimate how much additional memory the coalesce requires?
>
>Thanks.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


Re: Spark with .NET

2016-02-09 Thread Silvio Fiorito
That’s just a .NET assembly (not related to Spark DataSets) but doesn’t look 
like they’re actually using it. It’s typically a default reference pulled in by 
the project templates.

The code though is available from Mono here: 
https://github.com/mono/mono/tree/master/mcs/class/System.Data.DataSetExtensions

From: Ted Yu >
Date: Tuesday, February 9, 2016 at 3:56 PM
To: Bryan Jeffrey >
Cc: Arko Provo Mukherjee 
>, user 
>
Subject: Re: Spark with .NET

Looks like they have some system support whose source is not in the repo:


FYI

On Tue, Feb 9, 2016 at 12:17 PM, Bryan Jeffrey 
> wrote:
Arko,

Check this out: https://github.com/Microsoft/SparkCLR

This is a Microsoft authored C# language binding for Spark.

Regards,

Bryan Jeffrey

On Tue, Feb 9, 2016 at 3:13 PM, Arko Provo Mukherjee 
> wrote:
Doesn't seem to be supported, but thanks! I will probably write some .NET 
wrapper in my front end and use the java api in the backend.
Warm regards
Arko


On Tue, Feb 9, 2016 at 12:05 PM, Ted Yu 
> wrote:
This thread is related:
http://search-hadoop.com/m/q3RTtwp4nR1lugin1=+NET+on+Apache+Spark+

On Tue, Feb 9, 2016 at 11:43 AM, Arko Provo Mukherjee 
> wrote:
Hello,

I want to use Spark (preferable Spark SQL) using C#. Anyone has any pointers to 
that?

Thanks & regards
Arko







Re: Unit test with sqlContext

2016-02-04 Thread Silvio Fiorito
Hi Steve,

Have you looked at the spark-testing-base package by Holden? It’s really useful 
for unit testing Spark apps as it handles all the bootstrapping for you.

https://github.com/holdenk/spark-testing-base

DataFrame examples are here: 
https://github.com/holdenk/spark-testing-base/blob/master/src/test/1.3/scala/com/holdenkarau/spark/testing/SampleDataFrameTest.scala

Thanks,
Silvio

From: Steve Annessa >
Date: Thursday, February 4, 2016 at 8:36 PM
To: "user@spark.apache.org" 
>
Subject: Unit test with sqlContext

I'm trying to unit test a function that reads in a JSON file, manipulates the 
DF and then returns a Scala Map.

The function has signature:
def ingest(dataLocation: String, sc: SparkContext, sqlContext: SQLContext)

I've created a bootstrap spec for spark jobs that instantiates the Spark 
Context and SQLContext like so:

@transient var sc: SparkContext = _
@transient var sqlContext: SQLContext = _

override def beforeAll = {
  System.clearProperty("spark.driver.port")
  System.clearProperty("spark.hostPort")

  val conf = new SparkConf()
.setMaster(master)
.setAppName(appName)

  sc = new SparkContext(conf)
  sqlContext = new SQLContext(sc)
}

When I do not include sqlContext, my tests run. Once I add the sqlContext I get 
the following errors:

16/02/04 17:31:58 WARN SparkContext: Another SparkContext is being constructed 
(or threw an exception in its constructor).  This may indicate an error, since 
only one SparkContext may be running in this JVM (see SPARK-2243). The other 
SparkContext was created at:
org.apache.spark.SparkContext.(SparkContext.scala:81)

16/02/04 17:31:59 ERROR SparkContext: Error initializing SparkContext.
akka.actor.InvalidActorNameException: actor name [ExecutorEndpoint] is not 
unique!

and finally:

[info] IngestSpec:
[info] Exception encountered when attempting to run a suite with class name: 
com.company.package.IngestSpec *** ABORTED ***
[info]   akka.actor.InvalidActorNameException: actor name [ExecutorEndpoint] is 
not unique!


What do I need to do to get a sqlContext through my tests?

Thanks,

-- Steve


Re: General Question (Spark Hive integration )

2016-01-21 Thread Silvio Fiorito
Also, just to clarify it doesn’t read the whole table into memory unless you 
specifically cache it.

From: Silvio Fiorito 
<silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>>
Date: Thursday, January 21, 2016 at 10:02 PM
To: "Balaraju.Kagidala Kagidala" 
<balaraju.kagid...@gmail.com<mailto:balaraju.kagid...@gmail.com>>, 
"user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: General Question (Spark Hive integration )

Hi Bala,

It depends on how your Hive table is configured. If you used partitioning and 
you are filtering on a partition column then it will only load the relevant 
partitions. If, however, you’re filtering on a non-partitioned column then it 
will have to read all the data and then filter as part of the Spark job.

Thanks,
Silvio

From: "Balaraju.Kagidala Kagidala" 
<balaraju.kagid...@gmail.com<mailto:balaraju.kagid...@gmail.com>>
Date: Thursday, January 21, 2016 at 9:37 PM
To: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: General Question (Spark Hive integration )

Hi ,


  I have simple question regarding Spark Hive integration with DataFrames.

When we query  for a table, does spark loads whole table into memory and 
applies the filter on top of it or it only loads the data with filter applied.

for example if the my query 'select * from employee where deptno=10' does my 
rdd loads whole employee data into memory and applies fileter or will it load 
only dept number 10 data.


Thanks
Bala







Re: General Question (Spark Hive integration )

2016-01-21 Thread Silvio Fiorito
Hi Bala,

It depends on how your Hive table is configured. If you used partitioning and 
you are filtering on a partition column then it will only load the relevant 
partitions. If, however, you’re filtering on a non-partitioned column then it 
will have to read all the data and then filter as part of the Spark job.

Thanks,
Silvio

From: "Balaraju.Kagidala Kagidala" 
>
Date: Thursday, January 21, 2016 at 9:37 PM
To: "user@spark.apache.org" 
>
Subject: General Question (Spark Hive integration )

Hi ,


  I have simple question regarding Spark Hive integration with DataFrames.

When we query  for a table, does spark loads whole table into memory and 
applies the filter on top of it or it only loads the data with filter applied.

for example if the my query 'select * from employee where deptno=10' does my 
rdd loads whole employee data into memory and applies fileter or will it load 
only dept number 10 data.


Thanks
Bala







Re: visualize data from spark streaming

2016-01-20 Thread Silvio Fiorito
You’ve got a few options:

  *   Use a notebook tool such as Zeppelin, Jupyter, or Spark Notebook to write 
up some visualizations which update in time with your streaming batches
  *   Use Spark Streaming to push your batch results to another 3rd-party 
system with a BI tool that supports realtime updates such as ZoomData or Power 
BI
  *   Write your own using many of the tools that support a reactive model to 
push updates to a d3.js web front end

I’ve had the opportunity to work on all 3 of the options above and it just 
depends on your timeframe, requirements, and budget. For instance, the 
notebooks are good for engineers or data scientists but not something you’d 
necessarily put in front of a non-technical end-user. The BI tools on the other 
hand may be less customizable but more approachable by business users.

Thanks,
Silvio

On 1/20/16, 2:54 PM, "patcharee" 
> wrote:

Hi,

How to visualize realtime data (in graph/chart) from spark streaming?
Any tools?

Best,
Patcharee

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org




Re: Spark Streaming: BatchDuration and Processing time

2016-01-17 Thread Silvio Fiorito
It will just queue up the subsequent batches, however if this delay is constant 
you may start losing batches. It can handle spikes in processing time, but if 
you know you're consistently running over your batch duration you either need 
to increase the duration or look at enabling back pressure support. See: 
http://spark.apache.org/docs/latest/configuration.html#spark-streaming (1.5+).


From: pyspark2555 
Sent: Sunday, January 17, 2016 11:32 AM
To: user@spark.apache.org
Subject: Spark Streaming: BatchDuration and Processing time

Hi,

If BatchDuration is set to 1 second in StreamingContext and the actual
processing time is longer than one second, then how does Spark handle that?

For example, I am receiving a continuous Input stream. Every 1 second (batch
duration), the RDDs will be processed. What if this processing time is
longer than 1 second? What happens in the next batch duration?

Thanks.
Amit



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-BatchDuration-and-Processing-time-tp25986.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: why one of Stage is into Skipped section instead of Completed

2015-12-26 Thread Silvio Fiorito
Skipped stages result from existing shuffle output of a stage when re-running a 
transformation. The executors will have the output of the stage in their local 
dirs and Spark recognizes that, so rather than re-computing, it will start from 
the following stage. So, this is a good thing in that you’re not re-computing a 
stage. In your case, it looks like there’s already the output of the userreqs 
RDD (reduceByKey) so it doesn’t re-compute it.

From: Prem Spark >
Date: Friday, December 25, 2015 at 11:41 PM
To: "user@spark.apache.org" 
>
Subject: why one of Stage is into Skipped section instead of Completed


Whats does the below Skipped Stage means. can anyone help in clarifying?
I was expecting 3 stages to get Succeeded but only 2 of them getting completed 
while one is skipped.
Status: SUCCEEDED
Completed Stages: 2
Skipped Stages: 1

Scala REPL Code Used:

accounts is a basic RDD contains weblog text data.

var accountsByID = accounts.

map(line => line.split(',')).

map(values => (values(0),values(4)+','+values(3)));

var userreqs = sc.

textFile("/loudacre/weblogs/*6").

map(line => line.split(' ')).

map(words => (words(2),1)).

reduceByKey((v1,v2) => v1 + v2);

var accounthits =

accountsByID.join(userreqs).map(pair => pair._2)

accounthits.

saveAsTextFile("/loudacre/userreqs")

scala> accounthits.toDebugString
res15: String =
(32) MapPartitionsRDD[24] at map at :28 []
 |   MapPartitionsRDD[23] at join at :28 []
 |   MapPartitionsRDD[22] at join at :28 []
 |   CoGroupedRDD[21] at join at :28 []
 +-(15) MapPartitionsRDD[15] at map at :25 []
 |  |   MapPartitionsRDD[14] at map at :24 []
 |  |   /loudacre/accounts/* MapPartitionsRDD[13] at textFile at :21 []
 |  |   /loudacre/accounts/* HadoopRDD[12] at textFile at :21 []
 |   ShuffledRDD[20] at reduceByKey at :25 []
 +-(32) MapPartitionsRDD[19] at map at :24 []
|   MapPartitionsRDD[18] at map at :23 []
|   /loudacre/weblogs/*6 MapPartitionsRDD[17] at textFile at :22 []
|   /loudacre/weblogs/*6 HadoopRDD[16] at textFile at 

Re: Spark data frame

2015-12-22 Thread Silvio Fiorito
Michael,

collect will bring down the results to the driver JVM, whereas the RDD or 
DataFrame would be cached on the executors (if it is cached). So, as Dean said, 
the driver JVM needs to have enough memory to store the results of collect.

Thanks,
Silvio

From: Michael Segel 
>
Date: Tuesday, December 22, 2015 at 4:26 PM
To: Dean Wampler >
Cc: Gaurav Agarwal >, 
"user@spark.apache.org" 
>
Subject: Re: Spark data frame

Dean,

RDD in memory and then the collect() resulting in a collection, where both are 
alive at the same time.
(Again not sure how Tungsten plays in to this… )

So his collection can’t be larger than 1/2 of the memory allocated to the heap.

(Unless you have allocated swap…, right?)

On Dec 22, 2015, at 12:11 PM, Dean Wampler 
> wrote:

You can call the collect() method to return a collection, but be careful. If 
your data is too big to fit in the driver's memory, it will crash.

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd 
Edition (O'Reilly)
Typesafe
@deanwampler
http://polyglotprogramming.com

On Tue, Dec 22, 2015 at 1:09 PM, Gaurav Agarwal 
> wrote:

We are able to retrieve data frame by filtering the rdd object . I need to 
convert that data frame into java pojo. Any idea how to do that




Re: Testing with spark testing base

2015-12-05 Thread Silvio Fiorito
Yes, with IntelliJ you can set up a scalatest run configuration. You can also 
run directly from the sbt CLI by running “sbt test”


From: Masf >
Date: Saturday, December 5, 2015 at 12:51 PM
To: "user@spark.apache.org" 
>
Subject: Testing with spark testing base

Hi.

I'm testing "spark testing base". For example:

class MyFirstTest extends FunSuite with SharedSparkContext{
  def tokenize(f: RDD[String]) = {
f.map(_.split("").toList)
  }

  test("really simple transformation"){
val input = List("hi", "hi miguel", "bye")
val expected = List(List("hi"), List("hi", "holden"), List("bye"))
assert(tokenize(sc.parallelize(input)).collect().toList === expected)
  }

}


But...How can I launch this test??
Spark-submit or IntelliJ?

Thanks.

--
Regards
Miguel


RE: key not found: sportingpulse.com in Spark SQL 1.5.0

2015-10-30 Thread Silvio Fiorito
It's something due to the columnar compression. I've seen similar intermittent 
issues when caching DataFrames. "sportingpulse.com" is a value in one of the 
columns of the DF.

From: Ted Yu
Sent: ‎10/‎30/‎2015 6:33 PM
To: Zhang, Jingyu
Cc: user
Subject: Re: key not found: sportingpulse.com in Spark SQL 1.5.0

I searched for sportingpulse in *.scala and *.java files under 1.5 branch.
There was no hit.

mvn dependency doesn't show sportingpulse either.

Is it possible this is specific to EMR ?

Cheers

On Fri, Oct 30, 2015 at 2:57 PM, Zhang, Jingyu 
> wrote:

There is not a problem in Spark SQL 1.5.1 but the error of "key not found: 
sportingpulse.com" shown up when I use 1.5.0.

I have to use the version of 1.5.0 because that the one AWS EMR support.  Can 
anyone tell me why Spark uses "sportingpulse.com" 
and how to fix it?

Thanks.

Caused by: java.util.NoSuchElementException: key not found: 
sportingpulse.com

at scala.collection.MapLike$class.default(MapLike.scala:228)

at scala.collection.AbstractMap.default(Map.scala:58)

at scala.collection.mutable.HashMap.apply(HashMap.scala:64)

at 
org.apache.spark.sql.columnar.compression.DictionaryEncoding$Encoder.compress(compressionSchemes.scala:258)

at 
org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.build(CompressibleColumnBuilder.scala:110)

at 
org.apache.spark.sql.columnar.NativeColumnBuilder.build(ColumnBuilder.scala:87)

at 
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply(InMemoryColumnarTableScan.scala:152)

at 
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply(InMemoryColumnarTableScan.scala:152)

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)

at 
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:152)

at 
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:120)

at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)

at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)

at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)

at 
org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)

at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)

at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

at org.apache.spark.scheduler.Task.run(Task.scala:88)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

This message and its attachments may contain legally privileged or confidential 
information. It is intended solely for the named addressee. If you are not the 
addressee indicated in this message or responsible for delivery of the message 
to the addressee, you may not copy or deliver this message or its attachments 
to anyone. Rather, you should permanently delete this message and its 
attachments and kindly notify the sender by reply e-mail. Any content of this 
message and its attachments which does not relate to the official business of 
the sending company must be taken not to have been 

Re: key not found: sportingpulse.com in Spark SQL 1.5.0

2015-10-30 Thread Silvio Fiorito
I don’t believe I have it on 1.5.1. Are you able to test the data locally to 
confirm, or is it too large?

From: "Zhang, Jingyu" 
<jingyu.zh...@news.com.au<mailto:jingyu.zh...@news.com.au>>
Date: Friday, October 30, 2015 at 7:31 PM
To: Silvio Fiorito 
<silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>>
Cc: Ted Yu <yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>>, user 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: key not found: sportingpulse.com in Spark SQL 1.5.0

Thanks Silvio and Ted,

Can you please let me know how to fix this intermittent issues? Should I wait 
EMR upgrading to support the Spark 1.5.1 or change my code from DataFrame to 
normal Spark map-reduce?

Regards,

Jingyu

On 31 October 2015 at 09:40, Silvio Fiorito 
<silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>> wrote:
It's something due to the columnar compression. I've seen similar intermittent 
issues when caching DataFrames. "sportingpulse.com<http://sportingpulse.com>" 
is a value in one of the columns of the DF.

From: Ted Yu<mailto:yuzhih...@gmail.com>
Sent: ‎10/‎30/‎2015 6:33 PM
To: Zhang, Jingyu<mailto:jingyu.zh...@news.com.au>
Cc: user<mailto:user@spark.apache.org>
Subject: Re: key not found: sportingpulse.com<http://sportingpulse.com> in 
Spark SQL 1.5.0

I searched for sportingpulse in *.scala and *.java files under 1.5 branch.
There was no hit.

mvn dependency doesn't show sportingpulse either.

Is it possible this is specific to EMR ?

Cheers

On Fri, Oct 30, 2015 at 2:57 PM, Zhang, Jingyu 
<jingyu.zh...@news.com.au<mailto:jingyu.zh...@news.com.au>> wrote:

There is not a problem in Spark SQL 1.5.1 but the error of "key not found: 
sportingpulse.com<http://sportingpulse.com/>" shown up when I use 1.5.0.

I have to use the version of 1.5.0 because that the one AWS EMR support.  Can 
anyone tell me why Spark uses "sportingpulse.com<http://sportingpulse.com/>" 
and how to fix it?

Thanks.

Caused by: java.util.NoSuchElementException: key not found: 
sportingpulse.com<http://sportingpulse.com>

at scala.collection.MapLike$class.default(MapLike.scala:228)

at scala.collection.AbstractMap.default(Map.scala:58)

at scala.collection.mutable.HashMap.apply(HashMap.scala:64)

at 
org.apache.spark.sql.columnar.compression.DictionaryEncoding$Encoder.compress(compressionSchemes.scala:258)

at 
org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.build(CompressibleColumnBuilder.scala:110)

at 
org.apache.spark.sql.columnar.NativeColumnBuilder.build(ColumnBuilder.scala:87)

at 
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply(InMemoryColumnarTableScan.scala:152)

at 
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply(InMemoryColumnarTableScan.scala:152)

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)

at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)

at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)

at 
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:152)

at 
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:120)

at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)

at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)

at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)

at 
org.apache.spark.rdd.MapPartitionsWithPreparationRDD.compute(MapPartitionsWithPreparationRDD.scala:63)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)

at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)

at org.apache.spark.rdd.

Re: Maintaining overall cumulative data in Spark Streaming

2015-10-30 Thread Silvio Fiorito
In the update function you can return None for a key and it will remove it. If 
you’re restarting your app you can delete your checkpoint directory to start 
from scratch, rather than continuing from the previous state.

From: Sandeep Giri >
Date: Friday, October 30, 2015 at 9:29 AM
To: skaarthik oss >
Cc: dev >, user 
>
Subject: Re: Maintaining overall cumulative data in Spark Streaming

How to we reset the aggregated statistics to null?

Regards,
Sandeep Giri,
+1 347 781 4573 (US)
+91-953-899-8962 (IN)

www.KnowBigData.com.
Phone: +1-253-397-1945 (Office)

[linkedin icon][other site 
icon] [facebook icon] 
 [twitter icon] 
 


On Fri, Oct 30, 2015 at 9:49 AM, Sandeep Giri 
> wrote:

Yes, update state by key worked.

Though there are some more complications.

On Oct 30, 2015 8:27 AM, "skaarthik oss" 
> wrote:
Did you consider UpdateStateByKey operation?

From: Sandeep Giri 
[mailto:sand...@knowbigdata.com]
Sent: Thursday, October 29, 2015 3:09 PM
To: user >; dev 
>
Subject: Maintaining overall cumulative data in Spark Streaming

Dear All,

If a continuous stream of text is coming in and you have to keep publishing the 
overall word count so far since 0:00 today, what would you do?

Publishing the results for a window is easy but if we have to keep aggregating 
the results, how to go about it?

I have tried to keep an StreamRDD with aggregated count and keep doing a 
fullouterjoin but didn't work. Seems like the StreamRDD gets reset.

Kindly help.

Regards,
Sandeep Giri




Re: how to merge two dataframes

2015-10-30 Thread Silvio Fiorito
Are you able to upgrade to Spark 1.5.1 and Cassandra connector to latest 
version? It no longer requires a separate CassandraSQLContext.

From: Yana Kadiyska >
Reply-To: "yana.kadiy...@gmail.com" 
>
Date: Friday, October 30, 2015 at 3:57 PM
To: Ted Yu >
Cc: "user@spark.apache.org" 
>
Subject: Re: how to merge two dataframes

Not a bad idea I suspect but doesn't help me. I dumbed down the repro to ask 
for help. In reality one of my dataframes is a cassandra DF. So 
cassDF.registerTempTable("df1") registers the temp table in a different SQL 
Context (new CassandraSQLContext(sc)).


scala> sql("select customer_id, uri, browser, epoch from df union all select 
customer_id, uri, browser, epoch from df1").show()
org.apache.spark.sql.AnalysisException: no such table df1; line 1 pos 103
at 
org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:225)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:233)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$7.applyOrElse(Analyzer.scala:229)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:222)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:222)
at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:221)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:242)


On Fri, Oct 30, 2015 at 3:34 PM, Ted Yu 
> wrote:
How about the following ?

scala> df.registerTempTable("df")
scala> df1.registerTempTable("df1")
scala> sql("select customer_id, uri, browser, epoch from df union select 
customer_id, uri, browser, epoch from df1").show()
+---+-+---+-+
|customer_id|  uri|browser|epoch|
+---+-+---+-+
|999|http://foobar|firefox| 1234|
|888|http://foobar| ie|12343|
+---+-+---+-+

Cheers

On Fri, Oct 30, 2015 at 12:11 PM, Yana Kadiyska 
> wrote:
Hi folks,

I have a need to "append" two dataframes -- I was hoping to use UnionAll but it 
seems that this operation treats the underlying dataframes as sequence of 
columns, rather than a map.

In particular, my problem is that the columns in the two DFs are not in the 
same order --notice that my customer_id somehow comes out a string:

This is Spark 1.4.1

case class Test(epoch: Long,browser:String,customer_id:Int,uri:String)
val test = Test(1234l,"firefox",999,"http://foobar;)

case class Test1( customer_id :Int,uri:String,browser:String,   epoch 
:Long)
val test1 = Test1(888,"http://foobar","ie",12343)
val df=sc.parallelize(Seq(test)).toDF
val df1=sc.parallelize(Seq(test1)).toDF
df.unionAll(df1)

//res2: org.apache.spark.sql.DataFrame = [epoch: bigint, browser: string, 
customer_id: string, uri: string]


​

Is unionAll the wrong operation? Any special incantations? Or advice on how to 
otherwise get this to succeeed?




RE: Maintaining overall cumulative data in Spark Streaming

2015-10-29 Thread Silvio Fiorito
You could use updateStateByKey. There's a stateful word count example on Github.

https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala

From: Sandeep Giri
Sent: ‎10/‎29/‎2015 6:08 PM
To: user; dev
Subject: Maintaining overall cumulative data in Spark Streaming

Dear All,

If a continuous stream of text is coming in and you have to keep publishing the 
overall word count so far since 0:00 today, what would you do?

Publishing the results for a window is easy but if we have to keep aggregating 
the results, how to go about it?

I have tried to keep an StreamRDD with aggregated count and keep doing a 
fullouterjoin but didn't work. Seems like the StreamRDD gets reset.

Kindly help.

Regards,
Sandeep Giri



RE: Dynamic Resource Allocation with Spark Streaming (Standalone Cluster, Spark 1.5.1)

2015-10-26 Thread Silvio Fiorito
Hi Matthias,

Unless there was a change in 1.5, I'm afraid dynamic resource allocation is not 
yet supported in streaming apps.

Thanks,
Silvio

Sent from my Lumia 930

From: Matthias Niehoff
Sent: ‎10/‎26/‎2015 4:00 PM
To: user@spark.apache.org
Subject: Dynamic Resource Allocation with Spark Streaming (Standalone Cluster, 
Spark 1.5.1)

Hello everybody,

I have a few (~15) Spark Streaming jobs which have load peaks as well as long 
times with a low load. So I thought the new Dynamic Resource Allocation for 
Standalone Clusters might be helpful (SPARK-4751).

I have a test "cluster" with 1 worker consisting of 4 executors with 2 cores 
each, so 8 cores in total.

I started a simple streaming application without limiting the max cores for 
this app. As expected the app occupied every core of the cluster. Then I 
started a second app, also without limiting the maximum cores. As the first app 
did not get any input through the stream, my naive expectation was that the 
second app would get at least 2 cores (1 receiver, 1 processing), but that's 
not what happened. The cores are still assigned to the first app.
When I look at the application UI of the first app every executor is still 
running. That explains why no executor is used for the second app.

I end up with two questions:
- When does an executor getting idle in a Spark Streaming application? (and so 
could be reassigned to another app)
- Is there another way to compete with uncertain load when using Spark 
Streaming Applications? I already combined multiple jobs to a Spark Application 
using different threads, but this approach comes to a limit for me, because 
Spark Applications get to big to manage.

Thank You!




RE: Concurrent execution of actions within a driver

2015-10-26 Thread Silvio Fiorito
There is a collectAsync action if you want to run them in parallel, but keep in 
mind the two jobs will need to share resources and you should use the FAIR 
scheduler.

From: praveen S
Sent: ‎10/‎26/‎2015 4:27 AM
To: user@spark.apache.org
Subject: Concurrent execution of actions within a driver


Does spark run different actions of an rdd within a driver in parallel also?

Let's say
class Driver{

val rdd1= sc. textFile("... ")
val rdd2=sc.textFile("")
rdd1. collect //Action 1
rdd2. collect //Action 2

}

Does Spark run Action 1 & 2 run in parallel? ( some kind of a pass through the 
driver code and than start the execution)?

if not than is using threads safe for independent actions/red's?


Re: Java REST custom receiver

2015-10-01 Thread Silvio Fiorito
When you say “receive messages” you mean acting as a REST endpoint, right? If 
so, it might be better to use JMS (or Kafka) option for a few reasons:

The receiver will be deployed to any of the available executors, so your REST 
clients will need to be made aware of the IP where the receiver is running (or 
you have some other proxy doing that). In the event the app restarts or the 
receiver dies and is restarted the executor the receiver runs on may change. 
Now there’s an option to specify a preference for what hosts to run the 
receiver on by overriding “preferredLocation” if you still decide to go that 
route.

There’s a potential loss of messages when you are deploying or if your receiver 
or streaming app dies. You would now have to worry about managing those 
messages on the client’s side during a deployment or outage.

There’s also a security issue (may not be relevant in your case) in that you 
would be exposing your executors and cluster in order to receive these 
messages. Worst case would be if the clients are outside your enterprise 
network.

My preference would be to use JMS or Kafka or some other messaging systems as a 
buffer between the two systems.

Thanks,
Silvio

From: Pavol Loffay
Date: Thursday, October 1, 2015 at 3:58 PM
To: "user@spark.apache.org"
Subject: Java REST custom receiver

Hello,

is it possible to implement custom receiver [1] which will receive messages 
from REST calls?

As REST classes in Java(jax-rs) are defined declarative and instantiated by 
application server I'm not use if it is possible.

I have tried to implement custom receiver which is inject to REST class via CDI 
and then is passed  to JavaStreamingContext. But there is problem receiver 
instance in REST class is not the same as in SparkContext (supervisor is null).


Could anyone help me with this? I'm also using JMS in my app so data from REST 
can be sent to JMS and then received by spark JMS receiver. But I think there 
should be more straight forward solution.




[1]: https://spark.apache.org/docs/latest/api/java/


Re: Receiver and Parallelization

2015-09-25 Thread Silvio Fiorito
One thing you should look at is your batch duration and 
spark.streaming.blockInterval

Those 2 things control how many partitions are generated for each RDD (batch) 
of the DStream when using a receiver (vs direct approach).

So if you have a 2 second batch duration and the default blockInterval of 200ms 
this will create 10 partitions. This means you can have a max of 10 parallel 
tasks (as long as you have the cores available) running at a time for a 
map-like operation.




On 9/25/15, 9:08 AM, "nib...@free.fr"  wrote:

>Hello,
>I used a custom receiver in order to receive JMS messages from MQ Servers.
>I want to benefit of Yarn cluster, my questions are :
>
>- Is it possible to have only one node receiving JMS messages and parralelize 
>the RDD over all the cluster nodes ?
>- Is it possible to parallelize also the message receiver over cluster nodes ?
>
>If you have any code example for the both items it would be fine, because the 
>parralelization mechanism in the code is not crystal clear for me ...
>
>Tks
>Nicolas
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


Re: DataFrame repartition not repartitioning

2015-09-16 Thread Silvio Fiorito
You just need to assign it to a new variable:

val avroFile = sqlContext.read.format("com.databricks.spark.avro").load(inFile)
val repart = avroFile.repartition(10)
repart.save(outFile, "parquet")

From: Steve Annessa
Date: Wednesday, September 16, 2015 at 2:08 PM
To: "user@spark.apache.org"
Subject: DataFrame repartition not repartitioning

Hello,

I'm trying to load in an Avro file and write it out as Parquet. I would like to 
have enough partitions to properly parallelize on. When I do the simple load 
and save I get 1 partition out. I thought I would be able to use repartition 
like the following:

val avroFile = sqlContext.read.format("com.databricks.spark.avro").load(inFile)
avroFile.repartition(10)
avroFile.save(outFile, "parquet")

However, the saved file is still a single partition in the directory.

What am I missing?

Thanks,

-- Steve


Re: Where can I learn how to write udf?

2015-09-14 Thread Silvio Fiorito
Hi Saif,

There are 2 types of UDFs. Those used by SQL and those used by the Scala DSL.

For SQL, you just register a function like so (this example is from the docs):

sqlContext.udf.register(“strLen”, (s: String) => s.length)
sqlContext.sql(“select name, strLen(name) from people”).show


The other method, for Scala DSL, instead:

import org.apache.spark.sql.functions._

def strLen = udf { (s: String) => s.length }

people.select(people(“name”), strLen(people(“name”))).show


Thanks,
Silvio

From: "saif.a.ell...@wellsfargo.com"
Date: Monday, September 14, 2015 at 12:39 PM
To: "user@spark.apache.org"
Subject: Where can I learn how to write udf?

Hi all,

I am failing to find a proper guide or tutorial onto how to write proper udf 
functions in scala.

Appreciate the effort saving,
Saif



Re: Realtime Data Visualization Tool for Spark

2015-09-11 Thread Silvio Fiorito
So if you want to build your own from the ground up, then yes you could go the 
d3js route. Like Feynman also responded you could use something like Spark 
Notebook or Zeppelin to create some charts as well. It really depends on your 
intended audience and ultimate goal. If you just want some counters and graphs 
without any interactivity it shouldn't be too difficult.

Another option, if you’re willing to use a hosted service, would be something 
like MS Power BI. I’ve used this to publish data and have realtime dashboards 
and reports fed by Spark.

From: Shashi Vishwakarma
Date: Friday, September 11, 2015 at 11:56 AM
To: "user@spark.apache.org"
Subject: Realtime Data Visualization Tool for Spark

Hi

I have got streaming data which needs to be processed and send for 
visualization.  I am planning to use spark streaming for this but little bit 
confused in choosing visualization tool. I read somewhere that D3.js can be 
used but i wanted know which is best tool for visualization while dealing with 
streaming application.(something that can be easily integrated)

If someone has any link which can tell about D3.js(or any other visualization 
tool) and Spark streaming application integration  then please share . That 
would be great help.


Thanks and Regards
Shashi



Re: Can Spark Provide Multiple Context Support?

2015-09-08 Thread Silvio Fiorito
Is the data from HDFS static or is it unique for each event in the stream? If 
it’s static, you can just create the SparkContext, load the files from HDFS, 
then start a StreamingContext with the existing SparkContext and go from there.

From: Rachana Srivastava
Date: Tuesday, September 8, 2015 at 1:12 PM
To: "user@spark.apache.org"
Subject: Can Spark Provide Multiple Context Support?

Question: How does Spark support multiple context?

Background:  I have a stream of data coming to Spark from Kafka.   For each 
data in the stream I want to download some files from HDFS and process the file 
data.  I have written code to process the file from HDFS and I have code 
written to process stream data from Kafka using SparkStreaming API.  I have not 
been able to link both.

Can you please let me know if it is feasible to create JavaRDD from file inside 
SparkStreamingRDD job processing step?

Thanks,

Rachana


Re: Unbale to run Group BY on Large File

2015-09-02 Thread Silvio Fiorito
Unfortunately, groupBy is not the most efficient operation. What is it you’re 
trying to do? It may be possible with one of the other *byKey transformations.

From: "SAHA, DEBOBROTA"
Date: Wednesday, September 2, 2015 at 7:46 PM
To: "'user@spark.apache.org'"
Subject: Unbale to run Group BY on Large File

Hi ,

I am getting below error while I am trying to select data using SPARK SQL from 
a RDD table.

java.lang.OutOfMemoryError: GC overhead limit exceeded
"Spark Context Cleaner" java.lang.InterruptedException


The file or table size is around 113 GB and I am running SPARK 1.4 on a 
standalone cluster. Tried to extend the heap size but extending to 64GB also 
didn’t help.

I would really appreciate any help on this.

Thanks,
Debobrota


Re: Help! Stuck using withColumn

2015-08-26 Thread Silvio Fiorito
Hi Saif,

In both cases you’re referencing columns that don’t exist in the current 
DataFrame.

The first email you did a select and then a withColumn for ‘month_date_cur' on 
the resulting DF, but that column does not exist, because you did a select for 
only ‘month_balance’.

In the second email you’re using 2 different DFs and trying to select a column 
from one in a withColumn on the other, that just wouldn’t work. Also, there’s 
no explicit column names given to either DF, so that column doesn’t exist.

Did you intend to do a join instead?

Thanks,
Silvio

From: saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com
Date: Wednesday, August 26, 2015 at 6:06 PM
To: saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com, 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: RE: Help! Stuck using withColumn

I can reproduce this even simpler with the following:

val gf = sc.parallelize(Array(3,6,4,7,3,4,5,5,31,4,5,2)).toDF(ASD)
val ff = sc.parallelize(Array(4,6,2,3,5,1,4,6,23,6,4,7)).toDF(GFD)

gf.withColumn(DSA, ff.col(GFD))

org.apache.spark.sql.AnalysisException: resolved attribute(s) GFD#421 missing 
from ASD#419 in operator !Project [ASD#419,GFD#421 AS DSA#422];
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:121)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931)
at org.apache.spark.sql.DataFrame.init(DataFrame.scala:131)
at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595)
at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1039)


From: saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com 
[mailto:saif.a.ell...@wellsfargo.com]
Sent: Wednesday, August 26, 2015 6:47 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Help! Stuck using withColumn

This simple comand call:

val final_df = data.select(month_balance).withColumn(month_date, 
data.col(month_date_curr))

Is throwing:

org.apache.spark.sql.AnalysisException: resolved attribute(s) 
month_date_curr#324 missing from month_balance#234 in operator !Project 
[month_balance#234, month_date_curr#324 AS month_date_curr#408];
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:121)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931)
at org.apache.spark.sql.DataFrame.init(DataFrame.scala:131)
at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595)
at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1039)





Re: Left outer joining big data set with small lookups

2015-08-17 Thread Silvio Fiorito
Try doing a count on both lookups to force the caching to occur before the join.




On 8/17/15, 12:39 PM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io wrote:

Thanks for your help

I tried to cache the lookup tables and left out join with the big table (DF). 
Join does not seem to be using broadcast join-still it goes with hash 
partition join and shuffling big table. Here is the scenario


…
table1 as big_df
left outer join
table2 as lkup
on big_df.lkupid = lkup.lkupid

table1 above is well distributed across all 40 partitions because 
sqlContext.sql(SET spark.sql.shuffle.partitions=40). table2 is small, using 
just 2 partition.  s. After the join stage, sparkUI showed me that all 
activities ended up in  just 2 executors. When I tried to dump the data in 
hdfs after join stage, all data ended up in 2 partition files and rest 38 
files are 0 sized files.

Since above one did not work, I tried to broadcast DF and registered as table 
before join. 

val table2_df = sqlContext.sql(select * from table2)
val broadcast_table2 =sc.broadcast(table2_df)
broadcast_table2.value.registerTempTable(“table2”)

Broadcast is also having same issue as explained above. All data processed by 
just executors due to lookup skew.

Any more idea to tackle this issue in Spark Dataframe?

Thanks
Vijay


 On Aug 14, 2015, at 10:27 AM, Silvio Fiorito silvio.fior...@granturing.com 
 wrote:
 
 You could cache the lookup DataFrames, it’ll then do a broadcast join.
 
 
 
 
 On 8/14/15, 9:39 AM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io wrote:
 
 Hi
 
 I am facing huge performance problem when I am trying to left outer join 
 very big data set (~140GB) with bunch of small lookups [Start schema type]. 
 I am using data frame  in spark sql. It looks like data is shuffled and 
 skewed when that join happens. Is there any way to improve performance of 
 such type of join in spark? 
 
 How can I hint optimizer to go with replicated join etc., to avoid shuffle? 
 Would it help to create broadcast variables on small lookups?  If I create 
 broadcast variables, how can I convert them into data frame and use them in 
 sparksql type of join?
 
 Thanks
 Vijay
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Left outer joining big data set with small lookups

2015-08-14 Thread Silvio Fiorito
You could cache the lookup DataFrames, it’ll then do a broadcast join.




On 8/14/15, 9:39 AM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io wrote:

Hi

I am facing huge performance problem when I am trying to left outer join very 
big data set (~140GB) with bunch of small lookups [Start schema type]. I am 
using data frame  in spark sql. It looks like data is shuffled and skewed when 
that join happens. Is there any way to improve performance of such type of 
join in spark? 

How can I hint optimizer to go with replicated join etc., to avoid shuffle? 
Would it help to create broadcast variables on small lookups?  If I create 
broadcast variables, how can I convert them into data frame and use them in 
sparksql type of join?

Thanks
Vijay
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark DataFrames uses too many partition

2015-08-11 Thread Silvio Fiorito
You need to configure the spark.sql.shuffle.partitions parameter to a different 
value. It defaults to 200.




On 8/11/15, 11:31 AM, Al M alasdair.mcbr...@gmail.com wrote:

I am using DataFrames with Spark 1.4.1.  I really like DataFrames but the
partitioning makes no sense to me.

I am loading lots of very small files and joining them together.  Every file
is loaded by Spark with just one partition.  Each time I join two small
files the partition count increases to 200.  This makes my application take
10x as long as if I coalesce everything to 1 partition after each join.

With normal RDDs it would not expand out the partitions to 200 after joining
two files with one partition each.  It would either keep it at one or expand
it to two.

Why do DataFrames expand out the partitions so much?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-DataFrames-uses-too-many-partition-tp24214.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: How to increase parallelism of a Spark cluster?

2015-08-02 Thread Silvio Fiorito
Can you share the transformations up to the foreachPartition?

From: Sujit Palmailto:sujitatgt...@gmail.com
Sent: ‎8/‎2/‎2015 4:42 PM
To: Igor Bermanmailto:igor.ber...@gmail.com
Cc: usermailto:user@spark.apache.org
Subject: Re: How to increase parallelism of a Spark cluster?

Hi Igor,

The cluster is a Databricks Spark cluster. It consists of 1 master + 4 workers, 
each worker has 60GB RAM and 4 CPUs. The original mail has some more details 
(also the reference to the HttpSolrClient in there should be HttpSolrServer, 
sorry about that, mistake while writing the email).

There is no additional configuration on the external Solr host from my code, I 
am using the default HttpClient provided by HttpSolrServer. According to the 
Javadocs, you can pass in a HttpClient object as well. Is there some specific 
configuration you would suggest to get past any limits?

On another project, I faced a similar problem but I had more leeway (was using 
a Spark cluster from EC2) and less time, my workaround was to use python 
multiprocessing to create a program that started up 30 python JSON/HTTP clients 
and wrote output into 30 output files, which were then processed by Spark. 
Reason I mention this is that I was using default configurations there as well, 
just needed to increase the number of connections against Solr to a higher 
number.

This time round, I would like to do this through Spark because it makes the 
pipeline less complex.

-sujit


On Sun, Aug 2, 2015 at 10:52 AM, Igor Berman 
igor.ber...@gmail.commailto:igor.ber...@gmail.com wrote:

What kind of cluster? How many cores on each worker? Is there config for http 
solr client? I remember standard httpclient has limit per route/host.

On Aug 2, 2015 8:17 PM, Sujit Pal 
sujitatgt...@gmail.commailto:sujitatgt...@gmail.com wrote:
No one has any ideas?

Is there some more information I should provide?

I am looking for ways to increase the parallelism among workers. Currently I 
just see number of simultaneous connections to Solr equal to the number of 
workers. My number of partitions is (2.5x) larger than number of workers, and 
the workers seem to be large enough to handle more than one task at a time.

I am creating a single client per partition in my mapPartition call. Not sure 
if that is creating the gating situation? Perhaps I should use a Pool of 
clients instead?

Would really appreciate some pointers.

Thanks in advance for any help you can provide.

-sujit


On Fri, Jul 31, 2015 at 1:03 PM, Sujit Pal 
sujitatgt...@gmail.commailto:sujitatgt...@gmail.com wrote:
Hello,

I am trying to run a Spark job that hits an external webservice to get back 
some information. The cluster is 1 master + 4 workers, each worker has 60GB RAM 
and 4 CPUs. The external webservice is a standalone Solr server, and is 
accessed using code similar to that shown below.

def getResults(keyValues: Iterator[(String, Array[String])]):
Iterator[(String, String)] = {
val solr = new HttpSolrClient()
initializeSolrParameters(solr)
keyValues.map(keyValue = (keyValue._1, process(solr, keyValue)))
}
myRDD.repartition(10)
 .mapPartitions(keyValues = getResults(keyValues))

The mapPartitions does some initialization to the SolrJ client per partition 
and then hits it for each record in the partition via the getResults() call.

I repartitioned in the hope that this will result in 10 clients hitting Solr 
simultaneously (I would like to go upto maybe 30-40 simultaneous clients if I 
can). However, I counted the number of open connections using netstat -anp | 
grep :8983.*ESTABLISHED in a loop on the Solr box and observed that Solr has 
a constant 4 clients (ie, equal to the number of workers) over the lifetime of 
the run.

My observation leads me to believe that each worker processes a single stream 
of work sequentially. However, from what I understand about how Spark works, 
each worker should be able to process number of tasks parallelly, and that 
repartition() is a hint for it to do so.

Is there some SparkConf environment variable I should set to increase 
parallelism in these workers, or should I just configure a cluster with 
multiple workers per machine? Or is there something I am doing wrong?

Thank you in advance for any pointers you can provide.

-sujit





Re: Local Repartition

2015-07-20 Thread Silvio Fiorito
Hi Daniel,

Coalesce, by default will not cause a shuffle. The second parameter when set to 
true will cause a full shuffle. This is actually what repartition does (calls 
coalesce with shuffle=true).

It will attempt to keep colocated partitions together (as you describe) on the 
same executor. What may happen is you lose data locality if you reduce the 
partitions to fewer than the number of executors. You obviously also reduce 
parallelism so you need to be aware of that as you decide when to call coalesce.

Thanks,
Silvio

From: Daniel Haviv
Date: Monday, July 20, 2015 at 4:59 PM
To: Doug Balog
Cc: user
Subject: Re: Local Repartition

Thanks Doug,
coalesce might invoke a shuffle as well.
I don't think what I'm suggesting is a feature but it definitely should be.

Daniel

On Mon, Jul 20, 2015 at 4:15 PM, Doug Balog 
d...@balog.netmailto:d...@balog.net wrote:
Hi Daniel,
Take a look at .coalesce()
I’ve seen good results by coalescing to num executors * 10, but I’m still 
trying to figure out the
optimal number of partitions per executor.
To get the number of executors, sc.getConf.getInt(“spark.executor.instances”,-1)


Cheers,

Doug

 On Jul 20, 2015, at 5:04 AM, Daniel Haviv 
 daniel.ha...@veracity-group.commailto:daniel.ha...@veracity-group.com 
 wrote:

 Hi,
 My data is constructed from a lot of small files which results in a lot of 
 partitions per RDD.
 Is there some way to locally repartition the RDD without shuffling so that 
 all of the partitions that reside on a specific node will become X partitions 
 on the same node ?

 Thank you.
 Daniel




Re: Sessionization using updateStateByKey

2015-07-15 Thread Silvio Fiorito
Hi Cody,

I’ve had success using updateStateByKey for real-time sessionization by aging 
off timed-out sessions (returning None in the update function). This was on a 
large commercial website with millions of hits per day. This was over a year 
ago so I don’t have access to the stats any longer for length of sessions 
unfortunately, but I seem to remember they were around 10-30 minutes long. Even 
with peaks in volume, Spark managed to keep up very well.

Thanks,
Silvio

From: Cody Koeninger
Date: Wednesday, July 15, 2015 at 5:38 PM
To: algermissen1971
Cc: Tathagata Das, swetha, user
Subject: Re: Sessionization using updateStateByKey

An in-memory hash key data structure of some kind so that you're close to 
linear on the number of items in a batch, not the number of outstanding keys.  
That's more complex, because you have to deal with expiration for keys that 
never get hit, and for unusually long sessions you have to either drop them or 
hit durable storage.

Maybe someone has a better idea, I'd like to hear it.

On Wed, Jul 15, 2015 at 8:54 AM, algermissen1971 
algermissen1...@icloud.commailto:algermissen1...@icloud.com wrote:
Hi Cody,

oh ... I though that was one of *the* use cases for it. Do you have a 
suggestion / best practice how to achieve the same thing with better scaling 
characteristics?

Jan

On 15 Jul 2015, at 15:33, Cody Koeninger 
c...@koeninger.orgmailto:c...@koeninger.org wrote:

 I personally would try to avoid updateStateByKey for sessionization when you 
 have long sessions / a lot of keys, because it's linear on the number of keys.

 On Tue, Jul 14, 2015 at 6:25 PM, Tathagata Das 
 t...@databricks.commailto:t...@databricks.com wrote:
 [Apologies for repost, for those who have seen this response already in the 
 dev mailing list]

 1. When you set ssc.checkpoint(checkpointDir), the spark streaming 
 periodically saves the state RDD (which is a snapshot of all the state data) 
 to HDFS using RDD checkpointing. In fact, a streaming app with 
 updateStateByKey will not start until you set checkpoint directory.

 2. The updateStateByKey performance is sort of independent of the what is the 
 source that is being use - receiver based or direct Kafka. The absolutely 
 performance obvious depends on a LOT of variables, size of the cluster, 
 parallelization, etc. The key things is that you must ensure sufficient 
 parallelization at every stage - receiving, shuffles (updateStateByKey 
 included), and output.

 Some more discussion in my talk - https://www.youtube.com/watch?v=d5UJonrruHk



 On Tue, Jul 14, 2015 at 4:13 PM, swetha 
 swethakasire...@gmail.commailto:swethakasire...@gmail.com wrote:

 Hi,

 I have a question regarding sessionization using updateStateByKey. If near
 real time state needs to be maintained in a Streaming application, what
 happens when the number of RDDs to maintain the state becomes very large?
 Does it automatically get saved to HDFS and reload when needed or do I have
 to use any code like ssc.checkpoint(checkpointDir)?  Also, how is the
 performance if I use both DStream Checkpointing for maintaining the state
 and use Kafka Direct approach for exactly once semantics?


 Thanks,
 Swetha



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Sessionization-using-updateStateByKey-tp23838.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: 
 user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
 For additional commands, e-mail: 
 user-h...@spark.apache.orgmailto:user-h...@spark.apache.org







Re: .NET on Apache Spark?

2015-07-05 Thread Silvio Fiorito
Joe Duffy, director of engineering on Microsoft's compiler team made a comment 
about investigating  F# type providers for Spark. 
https://twitter.com/xjoeduffyx/status/614076012372955136


From: Ashic Mahtabmailto:as...@live.com
Sent: ?Sunday?, ?July? ?5?, ?2015 ?1?:?29? ?PM
To: Ruslan Dautkhanovmailto:dautkha...@gmail.com, 
pedromailto:ski.rodrig...@gmail.com
Cc: user@spark.apache.orgmailto:user@spark.apache.org

Unfortunately, afaik that project is long dead.

It'd be an interesting project to create an intermediary protocol, perhaps 
using something that nearly everything these days understand (unfortunately [!] 
that might be JavaScript). For example, instead of pickling language 
constructs, it might be interesting to translate rdd operations to some json 
structure, and have a single thing server side processing the instructions.

There's also mbrace (http://www.m-brace.net/)... mbrace-spark integration would 
be quite interesting indeed. Though the difference in approach might be quite a 
challenge.

Another approach could be using IKVM to host the JVM, much like how pyspark 
executes.

Microsoft research published some very early work in OneNet: 
http://research.microsoft.com/en-us/um/people/jinl/redesign/research/onenet_executive_summary.pdf
 - their careers page seems to be recruiting for the project.

Again, these are all future things, most of which would need to be community 
driven. If you need something right now, then there really isn't good 
integration between spark and .NET. However, given your requirements, mbrace 
might be something that you might find useful.

-Ashic.


Date: Sun, 5 Jul 2015 11:05:30 -0600
Subject: Re: .NET on Apache Spark?
From: dautkha...@gmail.com
To: ski.rodrig...@gmail.com
CC: user@spark.apache.org

Scala used to run on .NET
http://www.scala-lang.org/old/node/10299


--
Ruslan Dautkhanov

On Thu, Jul 2, 2015 at 1:26 PM, pedro 
ski.rodrig...@gmail.commailto:ski.rodrig...@gmail.com wrote:
You might try using .pipe() and installing your .NET program as a binary
across the cluster (or using addFile). Its not ideal to pipe things in/out
along with the overhead, but it would work.

I don't know much about IronPython, but perhaps changing the default python
by changing your path might work?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NET-on-Apache-Spark-tp23578p23594.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.orgmailto:user-h...@spark.apache.org




Re: Spark performance issue

2015-07-03 Thread Silvio Fiorito
It’ll help to see the code or at least understand what transformations you’re 
using.

Also, you have 15 nodes but not using all of them, so that means you may be 
losing data locality. You can see this in the job UI for Spark if any jobs do 
not have node or process local.

From: diplomatic Guru
Date: Friday, July 3, 2015 at 8:58 AM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Spark performance issue

Hello guys,

I'm after some advice on Spark performance.

I've a MapReduce job that read inputs carry out a simple calculation and write 
the results into HDFS. I've implemented the same logic in Spark job.

When I tried both jobs on same datasets, I'm getting different execution time, 
which is expected.

BUT
..
In my example, MapReduce job is performing much better than Spark.

The difference is that I'm not changing much with the MR job configuration, 
e.g., memory, cores, etc...But this is not the case with Spark as it's very 
flexible. So I'm sure my configuration isn't correct which is why MR is 
outperforming Spark but need your advice.

For example:

Test 1:
4.5GB data -  MR job took ~55 seconds to compute, but Spark took ~3 minutes and 
20 seconds.

Test 2:
25GB data -MR took 2 minutes and 15 seconds, whereas Spark job is still 
running, and it's already been 15 minutes.


I have a cluster of 15 nodes. The maximum memory that I could allocate to each 
executor is 6GB. Therefore, for Test 1, this is the config I used:

--executor-memory 6G --num-executors 4 --driver-memory 6G  --executor-cores 2 
(also I set spark.storage.memoryFraction to 0.3)


For Test 2:
--executor-memory 6G --num-executors 10 --driver-memory 6G  --executor-cores 2 
(also I set spark.storage.memoryFraction to 0.3)

I tried all possible combination but couldn't get better performance. Any 
suggestions will be much appreciated.








Re: Spark Streaming broadcast to all keys

2015-07-03 Thread Silvio Fiorito
updateStateByKey will run for all keys, whether they have new data in a batch 
or not so you should be able to still use it.



On 7/3/15, 7:34 AM, micvog mich...@micvog.com wrote:

UpdateStateByKey is useful but what if I want to perform an operation to all
existing keys (not only the ones in this RDD).

Word count for example - is there a way to decrease *all* words seen so far
by 1?

I was thinking of keeping a static class per node with the count information
and issuing a broadcast command to take a certain action, but could not find
a broadcast-to-all-nodes functionality or a better way.

Thanks,
Michael



-
Michael Vogiatzis
@mvogiatzis 
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-broadcast-to-all-keys-tp23609.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: .NET on Apache Spark?

2015-07-02 Thread Silvio Fiorito
Since Spark runs on the JVM, no there isn't support for .Net.

You should take a look at Dryad and Naiad instead.

https://github.com/MicrosoftResearch/

From: Zwitsmailto:daniel.van...@ortec-finance.com
Sent: ‎7/‎2/‎2015 4:33 AM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: .NET on Apache Spark?

I'm currently looking into a way to run a program/code (DAG) written in .NET
on a cluster using Spark. However I ran into problems concerning the coding
language, Spark has no .NET API.
I tried looking into IronPython because Spark does have a Python API, but i
couldn't find a way to use this.

Is there a way to implement a DAG of jobs on a cluster using .NET
programming language?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NET-on-Apache-Spark-tp23578.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: map vs foreach for sending data to external system

2015-07-02 Thread Silvio Fiorito
foreach absolutely runs on the executors. For sending data to an external 
system you should likely use foreachPartition in order to batch the output. 
Also if you want to limit the parallelism of the output action then you can use 
coalesce.

What makes you think foreach is running on the driver?

From: Alexandre Rodrigues
Date: Thursday, July 2, 2015 at 12:32 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: Fwd: map vs foreach for sending data to external system

Hi Spark devs,

I'm coding a spark job and at a certain point in execution I need to send some 
data present in an RDD to an external system.

val myRdd = 

myRdd.foreach { record =
  sendToWhtv(record)
}

The thing is that foreach forces materialization of the RDD and it seems to be 
executed on the driver program, which is not very benefitial in my case. So I 
changed the logic to a Map (mapWithParititons, but it's the same).

val newRdd = myRdd.map { record =
  sendToWhtv(record)
}
newRdd.count()

My understanding is that map is a transformation operation and then I have to 
force materialization by invoking some action (such as count). Is this the 
correct way to do this kind of distributed foreach or is there any other 
function to achieve this that doesn't necessarily imply a data transformation 
or a returned RDD ?


Thanks,
Alex



Re: custom RDD in java

2015-07-01 Thread Silvio Fiorito
If all you’re doing is just dumping tables from SQLServer to HDFS, have you 
looked at Sqoop?

Otherwise, if you need to run this in Spark could you just use the existing 
JdbcRDD?


From: Shushant Arora
Date: Wednesday, July 1, 2015 at 10:19 AM
To: user
Subject: custom RDD in java

Hi

Is it possible to write custom RDD in java?

Requirement is - I am having a list of Sqlserver tables  need to be dumped in 
HDFS.

So I have a
ListString tables = {dbname.tablename,dbname.tablename2..};

then
JavaRDDString rdd = javasparkcontext.parllelise(tables);

JavaRDDString tablecontent = rdd.map(new 
FunctionString,IterableString){fetch table and return populate iterable}

tablecontent.storeAsTextFile(hffs path);


In rdd.map(new FunctionString,). I cannot keep complete table content in 
memory , so I want to creat my own RDD to handle it.

Thanks
Shushant








Re: custom RDD in java

2015-07-01 Thread Silvio Fiorito
Sure, you can create custom RDDs. Haven’t done so in Java, but in Scala 
absolutely.

From: Shushant Arora
Date: Wednesday, July 1, 2015 at 1:44 PM
To: Silvio Fiorito
Cc: user
Subject: Re: custom RDD in java

ok..will evaluate these options but is it possible to create RDD in java?


On Wed, Jul 1, 2015 at 8:29 PM, Silvio Fiorito 
silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote:
If all you’re doing is just dumping tables from SQLServer to HDFS, have you 
looked at Sqoop?

Otherwise, if you need to run this in Spark could you just use the existing 
JdbcRDD?


From: Shushant Arora
Date: Wednesday, July 1, 2015 at 10:19 AM
To: user
Subject: custom RDD in java

Hi

Is it possible to write custom RDD in java?

Requirement is - I am having a list of Sqlserver tables  need to be dumped in 
HDFS.

So I have a
ListString tables = {dbname.tablename,dbname.tablename2..};

then
JavaRDDString rdd = javasparkcontext.parllelise(tables);

JavaRDDString tablecontent = rdd.map(new 
FunctionString,IterableString){fetch table and return populate iterable}

tablecontent.storeAsTextFile(hffs path);


In rdd.map(new FunctionString,). I cannot keep complete table content in 
memory , so I want to creat my own RDD to handle it.

Thanks
Shushant









Re: Shuffle files lifecycle

2015-06-29 Thread Silvio Fiorito
Regarding 1 and 2, yes shuffle output is stored on the worker local disks and 
will be reused across jobs as long as they’re available. You can identify when 
they’re used by seeing skipped stages in the job UI. They are periodically 
cleaned up based on available space of the configured spark.local.dirs paths.

From: Thomas Gerber
Date: Monday, June 29, 2015 at 10:12 PM
To: user
Subject: Shuffle files lifecycle

Hello,

It is my understanding that shuffle are written on disk and that they act as 
checkpoints.

I wonder if this is true only within a job, or across jobs. Please note that I 
use the words job and stage carefully here.

1. can a shuffle created during JobN be used to skip many stages from JobN+1? 
Or is the lifecycle of the shuffle files bound to the job that created them?

2. when are shuffle files actually deleted? Is it TTL based or is it cleaned 
when the job is over?

3. we have a very long batch application, and as it goes on, the number of 
total tasks for each job gets larger and larger. It is not really a problem, 
because most of those tasks will be skipped since we cache RDDs. We noticed 
however that there is a delay in the actual start of a job of 1 min for every 
2M tasks in your job. Are there suggested workarounds to avoid that delay? 
Maybe saving the RDD and re-loading it?

Thanks
Thomas



Re:

2015-06-26 Thread Silvio Fiorito
OK, here’s how I did it, using just the built-in Avro libraries with Spark 1.3:

import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat

val hadoopConf = new 
org.apache.hadoop.conf.Configuration(sc.hadoopConfiguration)
hadoopConf.setLong(FileInputFormat.SPLIT_MAXSIZE, 100)

val input = sc.newAPIHadoopFile(
  examples/src/main/resources/users.avro,
  classOf[AvroKeyInputFormat[GenericRecord]],
  classOf[AvroKey[GenericRecord]],
  classOf[NullWritable],
  hadoopConf).map(_._1.datum.get(name))

println(input.partitions.size)




From: ÐΞ€ρ@Ҝ (๏̯͡๏)
Date: Friday, June 26, 2015 at 11:04 AM
To: Silvio Fiorito
Cc: user
Subject: Re:


dependency

groupIdorg.apache.avro/groupId

artifactIdavro/artifactId

version1.7.7/version

scopeprovided/scope

/dependency

dependency

groupIdcom.databricks/groupId

artifactIdspark-avro_2.10/artifactId

version1.0.0/version

/dependency

dependency

groupIdorg.apache.avro/groupId

artifactIdavro-mapred/artifactId

version1.7.7/version

classifierhadoop2/classifier

scopeprovided/scope

/dependency

On Fri, Jun 26, 2015 at 8:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
deepuj...@gmail.commailto:deepuj...@gmail.com wrote:
Same code of yours works for me as well

On Fri, Jun 26, 2015 at 8:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
deepuj...@gmail.commailto:deepuj...@gmail.com wrote:
Is that its not supported with Avro. Unlikely.

On Fri, Jun 26, 2015 at 8:01 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
deepuj...@gmail.commailto:deepuj...@gmail.com wrote:
My imports:


import org.apache.avro.generic.GenericData

import org.apache.avro.generic.GenericRecord

import org.apache.avro.mapred.AvroKey

import org.apache.avro.Schema

import org.apache.hadoop.io.NullWritable

import org.apache.avro.mapreduce.AvroKeyInputFormat

import org.apache.hadoop.conf.Configuration

import org.apache.hadoop.fs.FileSystem

import org.apache.hadoop.fs.Path

import org.apache.hadoop.io.Text


  def readGenericRecords(sc: SparkContext, inputDir: String, startDate: Date, 
endDate: Date) = {

val path = getInputPaths(inputDir, startDate, endDate)

val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)

hadoopConfiguration.set(mapreduce.input.fileinputformat.split.maxsize, 
67108864)

sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, 
AvroKeyInputFormat[GenericRecord]](path + /*.avro)

  }

I need to read Avro datasets and am using strings instead of constant from 
InputFormat class.


When i click on any hadoop dependency from eclipse, i see they point to hadoop 
2.2.x jars.


On Fri, Jun 26, 2015 at 7:44 AM, Silvio Fiorito 
silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote:
Make sure you’re importing the right namespace for Hadoop v2.0. This is what I 
tried:

import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, TextInputFormat}

val hadoopConf = new org.apache.hadoop.conf.Configuration()
hadoopConf.setLong(FileInputFormat.SPLIT_MAXSIZE, 2048)

val input = sc.newAPIHadoopFile(
  README.md,
  classOf[TextInputFormat],
  classOf[LongWritable],
  classOf[Text],
  hadoopConf).map(_._2.toString())

println(input.partitions.size)

input.
  flatMap(_.split( )).
  filter(_.length  0).
  map((_, 1)).
  reduceByKey(_ + _).
  coalesce(1).
  sortBy(_._2, false).
  take(10).
  foreach(println)


From: ÐΞ€ρ@Ҝ (๏̯͡๏)
Date: Friday, June 26, 2015 at 10:18 AM
To: Silvio Fiorito
Cc: user
Subject: Re:


All these throw compilation error at newAPIHadoopFile

1)

val hadoopConfiguration = new Configuration()

hadoopConfiguration.set(mapreduce.input.fileinputformat.split.maxsize, 
67108864)

sc.newAPIHadoopFile[AvroKey, NullWritable, AvroKeyInputFormat](path + 
/*.avro, classOf[AvroKey], classOf[NullWritable], 
classOf[AvroKeyInputFormat], hadoopConfiguration)

2)

val hadoopConfiguration = new Configuration()

hadoopConfiguration.set(mapreduce.input.fileinputformat.split.maxsize, 
67108864)

sc.newAPIHadoopFile[AvroKey, NullWritable, AvroKeyInputFormat](path + 
/*.avro, classOf[AvroKey[GenericRecord]], classOf[NullWritable], 
classOf[AvroKeyInputFormat[GenericRecord]],hadoopConfiguration)

3)

val hadoopConfiguration = new Configuration()

hadoopConfiguration.set(mapreduce.input.fileinputformat.split.maxsize, 
67108864)

sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, 
AvroKeyInputFormat[GenericRecord]](path + /*.avro, 
classOf[AvroKey[GenericRecord]], classOf[NullWritable], 
classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration)

Error:

[ERROR] 
/Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:37:
 error: overloaded method value newAPIHadoopFile with alternatives:

[INFO]   (path: String,fClass: 
Class[org.apache.avro.mapreduce.AvroKeyInputFormat

Re:

2015-06-26 Thread Silvio Fiorito
Make sure you’re importing the right namespace for Hadoop v2.0. This is what I 
tried:

import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, TextInputFormat}

val hadoopConf = new org.apache.hadoop.conf.Configuration()
hadoopConf.setLong(FileInputFormat.SPLIT_MAXSIZE, 2048)

val input = sc.newAPIHadoopFile(
  README.md,
  classOf[TextInputFormat],
  classOf[LongWritable],
  classOf[Text],
  hadoopConf).map(_._2.toString())

println(input.partitions.size)

input.
  flatMap(_.split( )).
  filter(_.length  0).
  map((_, 1)).
  reduceByKey(_ + _).
  coalesce(1).
  sortBy(_._2, false).
  take(10).
  foreach(println)


From: ÐΞ€ρ@Ҝ (๏̯͡๏)
Date: Friday, June 26, 2015 at 10:18 AM
To: Silvio Fiorito
Cc: user
Subject: Re:


All these throw compilation error at newAPIHadoopFile

1)

val hadoopConfiguration = new Configuration()

hadoopConfiguration.set(mapreduce.input.fileinputformat.split.maxsize, 
67108864)

sc.newAPIHadoopFile[AvroKey, NullWritable, AvroKeyInputFormat](path + 
/*.avro, classOf[AvroKey], classOf[NullWritable], 
classOf[AvroKeyInputFormat], hadoopConfiguration)

2)

val hadoopConfiguration = new Configuration()

hadoopConfiguration.set(mapreduce.input.fileinputformat.split.maxsize, 
67108864)

sc.newAPIHadoopFile[AvroKey, NullWritable, AvroKeyInputFormat](path + 
/*.avro, classOf[AvroKey[GenericRecord]], classOf[NullWritable], 
classOf[AvroKeyInputFormat[GenericRecord]],hadoopConfiguration)

3)

val hadoopConfiguration = new Configuration()

hadoopConfiguration.set(mapreduce.input.fileinputformat.split.maxsize, 
67108864)

sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, 
AvroKeyInputFormat[GenericRecord]](path + /*.avro, 
classOf[AvroKey[GenericRecord]], classOf[NullWritable], 
classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration)

Error:

[ERROR] 
/Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:37:
 error: overloaded method value newAPIHadoopFile with alternatives:

[INFO]   (path: String,fClass: 
Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],kClass:
 
Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],vClass:
 Class[org.apache.hadoop.io.NullWritable],conf: 
org.apache.hadoop.conf.Configuration)org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord],
 org.apache.hadoop.io.NullWritable)] and

[INFO]   (path: String)(implicit km: 
scala.reflect.ClassTag[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],
 implicit vm: scala.reflect.ClassTag[org.apache.hadoop.io.NullWritable], 
implicit fm: 
scala.reflect.ClassTag[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]])org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord],
 org.apache.hadoop.io.NullWritable)]

[INFO]  cannot be applied to (String, 
Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]], 
Class[org.apache.hadoop.io.NullWritable], 
Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],
 org.apache.hadoop.conf.Configuration)

[INFO] sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, 
AvroKeyInputFormat[GenericRecord]](path + /*.avro, 
classOf[AvroKey[GenericRecord]], classOf[NullWritable], 
classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration)


On Thu, Jun 25, 2015 at 4:14 PM, Silvio Fiorito 
silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote:
Ok, in that case I think you can set the max split size in the Hadoop config 
object, using the FileInputFormat.SPLIT_MAXSIZE config parameter.

Again, I haven’t done this myself, but looking through the Spark codebase here: 
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L1053

And the HDFS FileInputFormat implementation, that seems like a good option to 
try.

You should be able to call conf.setLong(FileInputFormat.SPLIT_MAXSIZE, max).

I hope that helps!

From: ÐΞ€ρ@Ҝ (๏̯͡๏)
Date: Thursday, June 25, 2015 at 5:49 PM
To: Silvio Fiorito
Cc: user
Subject: Re:

I use

sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, 
AvroKeyInputFormat[GenericRecord]](path + /*.avro)


https://spark.apache.org/docs/1.3.1/api/java/org/apache/spark/SparkContext.html#newAPIHadoopFile(java.lang.String,
 java.lang.Class, java.lang.Class, java.lang.Class, 
org.apache.hadoop.conf.Configuration)

Does not seem to have that partition option.

On Thu, Jun 25, 2015 at 12:24 PM, Silvio Fiorito 
silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote:
Hi Deepak,

Have you tried specifying the minimum partitions when you load the file? I 
haven’t tried that myself against HDFS before, so I’m not sure if it will 
affect data locality. Ideally not, it should still maintain data

Re:

2015-06-26 Thread Silvio Fiorito
No worries, glad to help! It also helped me as I had not worked directly with 
the Hadoop APIs for controlling splits.

From: ÐΞ€ρ@Ҝ (๏̯͡๏)
Date: Friday, June 26, 2015 at 1:31 PM
To: Silvio Fiorito
Cc: user
Subject: Re:

Silvio,
Thanks for your responses and patience. It worked after i reshuffled the 
arguments and removed avro dependencies.

On Fri, Jun 26, 2015 at 9:55 AM, Silvio Fiorito 
silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote:
OK, here’s how I did it, using just the built-in Avro libraries with Spark 1.3:

import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat

val hadoopConf = new 
org.apache.hadoop.conf.Configuration(sc.hadoopConfiguration)
hadoopConf.setLong(FileInputFormat.SPLIT_MAXSIZE, 100)

val input = sc.newAPIHadoopFile(
  examples/src/main/resources/users.avro,
  classOf[AvroKeyInputFormat[GenericRecord]],
  classOf[AvroKey[GenericRecord]],
  classOf[NullWritable],
  hadoopConf).map(_._1.datum.get(name))

println(input.partitions.size)




From: ÐΞ€ρ@Ҝ (๏̯͡๏)
Date: Friday, June 26, 2015 at 11:04 AM
To: Silvio Fiorito
Cc: user
Subject: Re:


dependency

groupIdorg.apache.avro/groupId

artifactIdavro/artifactId

version1.7.7/version

scopeprovided/scope

/dependency

dependency

groupIdcom.databricks/groupId

artifactIdspark-avro_2.10/artifactId

version1.0.0/version

/dependency

dependency

groupIdorg.apache.avro/groupId

artifactIdavro-mapred/artifactId

version1.7.7/version

classifierhadoop2/classifier

scopeprovided/scope

/dependency

On Fri, Jun 26, 2015 at 8:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
deepuj...@gmail.commailto:deepuj...@gmail.com wrote:
Same code of yours works for me as well

On Fri, Jun 26, 2015 at 8:02 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
deepuj...@gmail.commailto:deepuj...@gmail.com wrote:
Is that its not supported with Avro. Unlikely.

On Fri, Jun 26, 2015 at 8:01 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
deepuj...@gmail.commailto:deepuj...@gmail.com wrote:
My imports:


import org.apache.avro.generic.GenericData

import org.apache.avro.generic.GenericRecord

import org.apache.avro.mapred.AvroKey

import org.apache.avro.Schema

import org.apache.hadoop.io.NullWritable

import org.apache.avro.mapreduce.AvroKeyInputFormat

import org.apache.hadoop.conf.Configuration

import org.apache.hadoop.fs.FileSystem

import org.apache.hadoop.fs.Path

import org.apache.hadoop.io.Text


  def readGenericRecords(sc: SparkContext, inputDir: String, startDate: Date, 
endDate: Date) = {

val path = getInputPaths(inputDir, startDate, endDate)

val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)

hadoopConfiguration.set(mapreduce.input.fileinputformat.split.maxsize, 
67108864)

sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, 
AvroKeyInputFormat[GenericRecord]](path + /*.avro)

  }

I need to read Avro datasets and am using strings instead of constant from 
InputFormat class.


When i click on any hadoop dependency from eclipse, i see they point to hadoop 
2.2.x jars.


On Fri, Jun 26, 2015 at 7:44 AM, Silvio Fiorito 
silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote:
Make sure you’re importing the right namespace for Hadoop v2.0. This is what I 
tried:

import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, TextInputFormat}

val hadoopConf = new org.apache.hadoop.conf.Configuration()
hadoopConf.setLong(FileInputFormat.SPLIT_MAXSIZE, 2048)

val input = sc.newAPIHadoopFile(
  README.md,
  classOf[TextInputFormat],
  classOf[LongWritable],
  classOf[Text],
  hadoopConf).map(_._2.toString())

println(input.partitions.size)

input.
  flatMap(_.split( )).
  filter(_.length  0).
  map((_, 1)).
  reduceByKey(_ + _).
  coalesce(1).
  sortBy(_._2, false).
  take(10).
  foreach(println)


From: ÐΞ€ρ@Ҝ (๏̯͡๏)
Date: Friday, June 26, 2015 at 10:18 AM
To: Silvio Fiorito
Cc: user
Subject: Re:


All these throw compilation error at newAPIHadoopFile

1)

val hadoopConfiguration = new Configuration()

hadoopConfiguration.set(mapreduce.input.fileinputformat.split.maxsize, 
67108864)

sc.newAPIHadoopFile[AvroKey, NullWritable, AvroKeyInputFormat](path + 
/*.avro, classOf[AvroKey], classOf[NullWritable], 
classOf[AvroKeyInputFormat], hadoopConfiguration)

2)

val hadoopConfiguration = new Configuration()

hadoopConfiguration.set(mapreduce.input.fileinputformat.split.maxsize, 
67108864)

sc.newAPIHadoopFile[AvroKey, NullWritable, AvroKeyInputFormat](path + 
/*.avro, classOf[AvroKey[GenericRecord]], classOf[NullWritable], 
classOf[AvroKeyInputFormat[GenericRecord]],hadoopConfiguration)

3)

val hadoopConfiguration = new Configuration()

hadoopConfiguration.set(mapreduce.input.fileinputformat.split.maxsize, 
67108864)

sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, 
AvroKeyInputFormat

Re:

2015-06-25 Thread Silvio Fiorito
Hi Deepak,

Have you tried specifying the minimum partitions when you load the file? I 
haven’t tried that myself against HDFS before, so I’m not sure if it will 
affect data locality. Ideally not, it should still maintain data locality but 
just more partitions. Once your job runs, you can check in the Spark tasks web 
UI to ensure they’re all Node local.

val details = sc.textFile(“hdfs://….”, 500)

If you’re using something other than text file you can also specify minimum 
partitions when using sc.hadoopFile.

Thanks,
Silvio

From: ÐΞ€ρ@Ҝ (๏̯͡๏)
Date: Thursday, June 25, 2015 at 3:10 PM
To: Akhil Das
Cc: user
Subject: Re:

How can i increase the number of tasks from 174 to 500 without running 
repartition.

The input size is 512.0 MB (hadoop) / 4159106. Can this be reduced to 64 MB so 
as to increase the number of tasks. Similar to split size that increases the 
number of mappers in Hadoop M/R.

On Thu, Jun 25, 2015 at 12:06 AM, Akhil Das 
ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote:
Look in the tuning sectionhttps://spark.apache.org/docs/latest/tuning.html, 
also you need to figure out whats taking time and where's your bottleneck etc. 
If everything is tuned properly, then you will need to throw more cores :)

Thanks
Best Regards

On Thu, Jun 25, 2015 at 12:19 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
deepuj...@gmail.commailto:deepuj...@gmail.com wrote:
Its taking an hour and on Hadoop it takes 1h 30m, is there a way to make it run 
faster ?

On Wed, Jun 24, 2015 at 11:39 AM, Akhil Das 
ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote:

Cool. :)

On 24 Jun 2015 23:44, ÐΞ€ρ@Ҝ (๏̯͡๏) 
deepuj...@gmail.commailto:deepuj...@gmail.com wrote:
Its running now.

On Wed, Jun 24, 2015 at 10:45 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
deepuj...@gmail.commailto:deepuj...@gmail.com wrote:
Now running with

--num-executors 9973 --driver-memory 14g --driver-java-options 
-XX:MaxPermSize=512M -Xmx4096M -Xms4096M --executor-memory 14g 
--executor-cores 1


On Wed, Jun 24, 2015 at 10:34 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
deepuj...@gmail.commailto:deepuj...@gmail.com wrote:
There are multiple of these

1)
15/06/24 09:53:37 ERROR executor.Executor: Exception in task 443.0 in stage 3.0 
(TID 1767)
java.lang.OutOfMemoryError: GC overhead limit exceeded
at 
sun.reflect.GeneratedSerializationConstructorAccessor1327.newInstance(Unknown 
Source)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at 
org.objenesis.instantiator.sun.SunReflectionFactoryInstantiator.newInstance(SunReflectionFactoryInstantiator.java:56)
at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134)
at 
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at 
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138)
at 
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on LARS? timer thread

2)
15/06/24 09:53:37 ERROR actor.ActorSystemImpl: exception on LARS? timer 

  1   2   >