Re: Spark hangs at "Removed broadcast_*"

2016-07-12 Thread Anton Sviridov
Hi.

Here's the last few lines before it starts removing broadcasts:

16/07/11 14:02:11 INFO FileOutputCommitter: Saved output of task
'attempt_20160723_0009_m_003209_20886' to
file:/mnt/rendang/cache-main/RunWikistatsSFCounts727fc9d635f25d0922984e59a0d18fdd/stats/sf_counts/_temporary/0/task_20160723_0009_m_003209
16/07/11 14:02:11 INFO SparkHadoopMapRedUtil:
attempt_20160723_0009_m_003209_20886: Committed
16/07/11 14:02:11 INFO TaskSetManager: Finished task 3211.0 in stage 9.0
(TID 20888) in 95 ms on localhost (3209/3214)
16/07/11 14:02:11 INFO Executor: Finished task 3209.0 in stage 9.0 (TID
20886). 1721 bytes result sent to driver
16/07/11 14:02:11 INFO TaskSetManager: Finished task 3209.0 in stage 9.0
(TID 20886) in 103 ms on localhost (3210/3214)
16/07/11 14:02:11 INFO FileOutputCommitter: Saved output of task
'attempt_20160723_0009_m_003208_20885' to
file:/mnt/rendang/cache-main/RunWikistatsSFCounts727fc9d635f25d0922984e59a0d18fdd/stats/sf_counts/_temporary/0/task_20160723_0009_m_003208
16/07/11 14:02:11 INFO SparkHadoopMapRedUtil:
attempt_20160723_0009_m_003208_20885: Committed
16/07/11 14:02:11 INFO Executor: Finished task 3208.0 in stage 9.0 (TID
20885). 1721 bytes result sent to driver
16/07/11 14:02:11 INFO TaskSetManager: Finished task 3208.0 in stage 9.0
(TID 20885) in 109 ms on localhost (3211/3214)
16/07/11 14:02:11 INFO FileOutputCommitter: Saved output of task
'attempt_20160723_0009_m_003212_20889' to
file:/mnt/rendang/cache-main/RunWikistatsSFCounts727fc9d635f25d0922984e59a0d18fdd/stats/sf_counts/_temporary/0/task_20160723_0009_m_003212
16/07/11 14:02:11 INFO SparkHadoopMapRedUtil:
attempt_20160723_0009_m_003212_20889: Committed
16/07/11 14:02:11 INFO Executor: Finished task 3212.0 in stage 9.0 (TID
20889). 1721 bytes result sent to driver
16/07/11 14:02:11 INFO TaskSetManager: Finished task 3212.0 in stage 9.0
(TID 20889) in 84 ms on localhost (3212/3214)
16/07/11 14:02:11 INFO FileOutputCommitter: Saved output of task
'attempt_20160723_0009_m_003210_20887' to
file:/mnt/rendang/cache-main/RunWikistatsSFCounts727fc9d635f25d0922984e59a0d18fdd/stats/sf_counts/_temporary/0/task_20160723_0009_m_003210
16/07/11 14:02:11 INFO SparkHadoopMapRedUtil:
attempt_20160723_0009_m_003210_20887: Committed
16/07/11 14:02:11 INFO Executor: Finished task 3210.0 in stage 9.0 (TID
20887). 1721 bytes result sent to driver
16/07/11 14:02:11 INFO TaskSetManager: Finished task 3210.0 in stage 9.0
(TID 20887) in 100 ms on localhost (3213/3214)
16/07/11 14:02:11 INFO FileOutputCommitter: File Output Committer Algorithm
version is 1
16/07/11 14:02:11 INFO FileOutputCommitter: Saved output of task
'attempt_20160723_0009_m_003213_20890' to
file:/mnt/rendang/cache-main/RunWikistatsSFCounts727fc9d635f25d0922984e59a0d18fdd/stats/sf_counts/_temporary/0/task_20160723_0009_m_003213
16/07/11 14:02:11 INFO SparkHadoopMapRedUtil:
attempt_20160723_0009_m_003213_20890: Committed
16/07/11 14:02:11 INFO Executor: Finished task 3213.0 in stage 9.0 (TID
20890). 1721 bytes result sent to driver
16/07/11 14:02:11 INFO TaskSetManager: Finished task 3213.0 in stage 9.0
(TID 20890) in 82 ms on localhost (3214/3214)
16/07/11 14:02:11 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks
have all completed, from pool
*16/07/11 14:02:11 INFO DAGScheduler: ResultStage 9 (saveAsTextFile at
SfCountsDumper.scala:13) finished in 42.294 s*
*16/07/11 14:02:11 INFO DAGScheduler: Job 1 finished: saveAsTextFile at
SfCountsDumper.scala:13, took 9517.124624 s*
16/07/11 14:28:46 INFO BlockManagerInfo: Removed broadcast_0_piece0 on
10.101.230.154:35192 in memory (size: 15.8 KB, free: 37.1 GB)
16/07/11 14:28:46 INFO ContextCleaner: Cleaned shuffle 7
16/07/11 14:28:46 INFO ContextCleaner: Cleaned shuffle 6
16/07/11 14:28:46 INFO ContextCleaner: Cleaned shuffle 5
16/07/11 14:28:46 INFO ContextCleaner: Cleaned shuffle 4
16/07/11 14:28:46 INFO ContextCleaner: Cleaned shuffle 3
16/07/11 14:28:46 INFO ContextCleaner: Cleaned shuffle 2
16/07/11 14:28:46 INFO ContextCleaner: Cleaned shuffle 1
16/07/11 14:28:46 INFO BlockManager: Removing RDD 14
16/07/11 14:28:46 INFO ContextCleaner: Cleaned RDD 14
16/07/11 14:28:46 INFO BlockManagerInfo: Removed broadcast_11_piece0 on
10.101.230.154:35192 in memory (size: 25.5 KB, free: 37.1 GB)
...

In fact, the job is still running, Spark's UI shows uptime of 20.6 hours
with last job finishing 18 hours ago at least.

On Mon, 11 Jul 2016 at 23:23 dhruve ashar  wrote:

> Hi,
>
> Can you check the time when the job actually finished from the logs. The
> logs provided are too short and do not reveal meaningful information.
>
>
>
> On Mon, Jul 11, 2016 at 9:50 AM, velvetbaldmime  wrote:
>
>> Spark 2.0.0-preview
>>
>> We've got an app that uses a fairly big broadcast variable. We run this
>> on a
>> big EC2 instance, so deployment is in client-mode. Broadcasted variable
>> is a
>> massive Map[String, Array[String]].
>>
>> At the end of saveAsTextFile, the output in the fo

Re: Using Spark on Hive with Hive also using Spark as its execution engine

2016-07-12 Thread Mich Talebzadeh
This the whole idea. Spark uses DAG + IM, MR is classic


This is for Hive on Spark

hive> explain select max(id) from dummy_parquet;
OK
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 depends on stages: Stage-1
STAGE PLANS:
  Stage: Stage-1
Spark
  Edges:
Reducer 2 <- Map 1 (GROUP, 1)

*  DagName:
hduser_20160712083219_632c2749-7387-478f-972d-9eaadd9932c6:1*  Vertices:
Map 1
Map Operator Tree:
TableScan
  alias: dummy_parquet
  Statistics: Num rows: 1 Data size: 7
Basic stats: COMPLETE Column stats: NONE
  Select Operator
expressions: id (type: int)
outputColumnNames: id
Statistics: Num rows: 1 Data size: 7
Basic stats: COMPLETE Column stats: NONE
Group By Operator
  aggregations: max(id)
  mode: hash
  outputColumnNames: _col0
  Statistics: Num rows: 1 Data size: 4 Basic stats:
COMPLETE Column stats: NONE
  Reduce Output Operator
sort order:
Statistics: Num rows: 1 Data size: 4 Basic stats:
COMPLETE Column stats: NONE
value expressions: _col0 (type: int)
Reducer 2
Reduce Operator Tree:
  Group By Operator
aggregations: max(VALUE._col0)
mode: mergepartial
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE
Column stats: NONE
File Output Operator
  compressed: false
  Statistics: Num rows: 1 Data size: 4 Basic stats:
COMPLETE Column stats: NONE
  table:
  input format: org.apache.hadoop.mapred.TextInputFormat
  output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
  serde:
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
  Stage: Stage-0
Fetch Operator
  limit: -1
  Processor Tree:
ListSink
Time taken: 2.801 seconds, Fetched: 50 row(s)

And this is with setting the execution engine to MR

hive> set hive.execution.engine=mr;
Hive-on-MR is deprecated in Hive 2 and may not be available in the future
versions. Consider using a different execution engine (i.e. spark, tez) or
using Hive 1.X releases.

hive> explain select max(id) from dummy_parquet;
OK
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 depends on stages: Stage-1
STAGE PLANS:
  Stage: Stage-1
Map Reduce
  Map Operator Tree:
  TableScan
alias: dummy_parquet
Statistics: Num rows: 1 Data size: 7 Basic
stats: COMPLETE Column stats: NONE
Select Operator
  expressions: id (type: int)
  outputColumnNames: id
  Statistics: Num rows: 1 Data size: 7 Basic
stats: COMPLETE Column stats: NONE
  Group By Operator
aggregations: max(id)
mode: hash
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE
Column stats: NONE
Reduce Output Operator
  sort order:
  Statistics: Num rows: 1 Data size: 4 Basic stats:
COMPLETE Column stats: NONE
  value expressions: _col0 (type: int)
  Reduce Operator Tree:
Group By Operator
  aggregations: max(VALUE._col0)
  mode: mergepartial
  outputColumnNames: _col0
  Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE Column
stats: NONE
  File Output Operator
compressed: false
Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE
Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format:
org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
  Stage: Stage-0
Fetch Operator
  limit: -1
  Processor Tree:
ListSink
Time taken: 0.1 seconds, Fetched: 44 row(s)


HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 12 July 2016 at 08:16, Markovitz, Dudu  wrote:

> This is a simple task –
>
>

回复:Re: question about UDAF

2016-07-12 Thread luohui20001
hi pedro thanks for your advices. I got my code working as below:code in 
main:val hc = new org.apache.spark.sql.hive.HiveContext(sc)
val hiveTable = hc.sql("select lp_location_id,id,pv from 
house_id_pv_location_top50")val jsonArray = new JsonArray
val middleResult = 
hiveTable.groupBy("lp_location_id").agg(jsonArray(col("id"), 
col("pv")).substr(2, 2048).as("id_pv"))
middleResult.collect.foreach(println)
middleResult.write.saveAsTable("house_id_pv_top50_json")
code in my UDAF:class JsonArray extends UserDefinedAggregateFunction {
  def inputSchema: org.apache.spark.sql.types.StructType =
StructType(StructField("id", IntegerType) :: StructField("pv", IntegerType) 
:: Nil)

  def bufferSchema: StructType = StructType(
StructField("id", StringType) :: StructField("pv", StringType) :: Nil)
  def dataType: DataType = StringType
  def deterministic: Boolean = true
  def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = ""
  }
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = 
buffer.getAs[String](0).concat(",{\"id\":\""+input.getInt(0).toString()+"\",\"pv\":\""+input.getInt(1).toString()+"\"}")
  }
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val s1 = buffer1.getAs[Int](0).toString()
val s2 = buffer2.getAs[Int](0).toString()
buffer1(0) = s1.concat(s2)
  }
And the result is what I am expecting as attached file.



 

Thanks&Best regards!
San.Luo

- 原始邮件 -
发件人:Pedro Rodriguez 
收件人:luohui20...@sina.com
抄送人:user 
主题:Re: question about UDAF
日期:2016年07月12日 04点17分

I am not sure I understand your code entirely, but I worked with UDAFs Friday 
and over the weekend 
(https://gist.github.com/EntilZha/3951769a011389fef25e930258c20a2a).
I think what is going on is that your "update" function is not defined 
correctly. Update should take a possibly initialized or in progress buffer and 
integrate new results in. Right now, you ignore the input row. What is probably 
occurring is that the initialization value "" is setting the buffer equal to 
the buffer itself which is "".
Merge is responsible for taking two buffers and merging them together. In this 
case, the buffers are "" since initialize makes it "" and update keeps it "" so 
the result is just "". I am not sure it matters, but you probably also want to 
do buffer.getString(0).
Pedro
On Mon, Jul 11, 2016 at 3:04 AM,   wrote:
hello guys: I have a DF and a UDAF. this DF has 2 columns, lp_location_id , 
id, both are of Int type. I want to group by id and aggregate all value of id 
into 1 string. So I used a UDAF to do this transformation: multi Int values to 
1 String. However my UDAF returns empty values as the accessory attached. 
Here is my code for my main class:val hc = new 
org.apache.spark.sql.hive.HiveContext(sc)
val hiveTable = hc.sql("select lp_location_id,id from 
house_id_pv_location_top50")

val jsonArray = new JsonArray
val result = 
hiveTable.groupBy("lp_location_id").agg(jsonArray(col("id")).as("jsonArray")).collect.foreach(println)
-- Here is 
my code of my UDAF:
class JsonArray extends UserDefinedAggregateFunction {
  def inputSchema: org.apache.spark.sql.types.StructType =
StructType(StructField("id", IntegerType) :: Nil)

  def bufferSchema: StructType = StructType(
StructField("id", StringType) :: Nil)
  def dataType: DataType = StringType
  def deterministic: Boolean = true
  def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = ""
  }
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getAs[Int](0)
  }
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val s1 = buffer1.getAs[Int](0).toString()
val s2 = buffer2.getAs[Int](0).toString()
buffer1(0) = s1.concat(s2)
  }
  def evaluate(buffer: Row): Any = {
buffer(0)
  }
}

I don't quit understand why I get empty result from my UDAF, I guess there may 
be 2 reason:1. error initialization with "" in code of define initialize 
method2. the buffer didn't write successfully.
can anyone share a idea about this. thank you.





 

Thanks&Best regards!
San.Luo



-

To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU BoulderUC Berkeley AMPLab 
Alumni
ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423Github: 
github.com/EntilZha | LinkedIn: 
https://www.linkedin.com/in/pedrorodriguezscience




-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Using Spark on Hive with Hive also using Spark as its execution engine

2016-07-12 Thread Mich Talebzadeh
That is only a plan not what execution engine is doing.

As I stated before Spark uses DAG + in-memory computing. MR is serial on
disk.

The key is the execution here or rather the execution engine.

In general

The standard MapReduce  as I know reads the data from HDFS, apply
map-reduce algorithm and writes back to HDFS. If there are many iterations
of map-reduce then, there will be many intermediate writes to HDFS. This is
all serial writes to disk. Each map-reduce step is completely independent
of other steps, and the executing engine does not have any global knowledge
of what map-reduce steps are going to come after each map-reduce step. For
many iterative algorithms this is inefficient as the data between each
map-reduce pair gets written and read from the file system.

The equivalent to parallelism in Big Data is deploying what is known as
Directed Acyclic Graph (DAG
) algorithm. In a
nutshell deploying DAG results in a fuller picture of global optimisation
by deploying parallelism, pipelining consecutive map steps into one and not
writing intermediate data to HDFS. So in short this prevents writing data
back and forth after every reduce step which for me is a significant
improvement, compared to the classical MapReduce algorithm.

Now Tez is basically MR with DAG. With Spark you get DAG + in-memory
computing. Think of it as a comparison between a classic RDBMS like Oracle
and IMDB like Oracle TimesTen with in-memory processing.

The outcome is that Hive using Spark as execution engine is pretty
impressive. You have the advantage of Hive CBO + In-memory computing. If
you use Spark for all this (say Spark SQL) but no Hive, Spark uses its own
optimizer called Catalyst that does not have CBO yet plus in memory
computing.

As usual your mileage varies.

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 12 July 2016 at 09:33, Markovitz, Dudu  wrote:

> I don’t see how this explains the time differences.
>
>
>
> Dudu
>
>
>
> *From:* Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
> *Sent:* Tuesday, July 12, 2016 10:56 AM
> *To:* user 
> *Cc:* user @spark 
>
> *Subject:* Re: Using Spark on Hive with Hive also using Spark as its
> execution engine
>
>
>
> This the whole idea. Spark uses DAG + IM, MR is classic
>
>
>
>
>
> This is for Hive on Spark
>
>
>
> hive> explain select max(id) from dummy_parquet;
> OK
> STAGE DEPENDENCIES:
>   Stage-1 is a root stage
>   Stage-0 depends on stages: Stage-1
>
> STAGE PLANS:
>   Stage: Stage-1
> Spark
>   Edges:
> Reducer 2 <- Map 1 (GROUP, 1)
> *  DagName:
> hduser_20160712083219_632c2749-7387-478f-972d-9eaadd9932c6:1*
>   Vertices:
> Map 1
> Map Operator Tree:
> TableScan
>   alias: dummy_parquet
>   Statistics: Num rows: 1 Data size: 7
> Basic stats: COMPLETE Column stats: NONE
>   Select Operator
> expressions: id (type: int)
> outputColumnNames: id
> Statistics: Num rows: 1 Data size: 7
> Basic stats: COMPLETE Column stats: NONE
> Group By Operator
>   aggregations: max(id)
>   mode: hash
>   outputColumnNames: _col0
>   Statistics: Num rows: 1 Data size: 4 Basic stats:
> COMPLETE Column stats: NONE
>   Reduce Output Operator
> sort order:
> Statistics: Num rows: 1 Data size: 4 Basic stats:
> COMPLETE Column stats: NONE
> value expressions: _col0 (type: int)
> Reducer 2
> Reduce Operator Tree:
>   Group By Operator
> aggregations: max(VALUE._col0)
> mode: mergepartial
> outputColumnNames: _col0
> Statistics: Num rows: 1 Data size: 4 Basic stats: COMPLETE
> Column stats: NONE
> File Output Operator
>   compressed: false
>   Statistics: Num rows: 1 Data size: 4 Basic stats:
> COMPLETE Column stats: NONE
>   table:
>   input format:
> org.apache.hadoop.mapred.TextInputFormat
>   output format:
> org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
>   serde:
> org.apache.hadoop.hive.se

Handling categorical variables in StreamingLogisticRegressionwithSGD

2016-07-12 Thread kundan kumar
Hi ,

I am trying to use StreamingLogisticRegressionwithSGD to build a CTR
prediction model.

The document :

http://spark.apache.org/docs/latest/mllib-linear-methods.html#streaming-linear-regression

mentions that the numFeatures should be *constant*.

The problem that I am facing is :
Since most of my variables are categorical, the numFeatures variable should
be the final set of variables after encoding and parsing the categorical
variables in labeled point format.

Suppose, for a categorical variable x1 I have 10 distinct values in current
window.

But in the next window some new values/items gets added to x1 and the
number of distinct values increases. How should I handle the numFeatures
variable in this case, because it will change now ?

Basically, my question is how should I handle the new values of the
categorical variables in streaming model.

Thanks,
Kundan


Re: How to Register Permanent User-Defined-Functions (UDFs) in SparkSQL

2016-07-12 Thread Daniel Darabos
Hi Lokesh,
There is no way to do that. SqlContext.newSession documentation says:

Returns a SQLContext as new session, with separated SQL configurations,
temporary tables, registered functions, but sharing the same SparkContext,
CacheManager, SQLListener and SQLTab.

You have two options: either use the same SQLContext instead of creating
new SQLContexts, or have a function for creating SQLContexts, and this
function can also register the UDFs in every created SQLContext.

On Sun, Jul 10, 2016 at 6:14 PM, Lokesh Yadav 
wrote:

> Hi
> with sqlContext we can register a UDF like
> this: sqlContext.udf.register("sample_fn", sample_fn _ )
> But this UDF is limited to that particular sqlContext only. I wish to make
> the registration persistent, so that I can access the same UDF in any
> subsequent sqlcontext.
> Or is there any other way to register UDFs in sparkSQL so that they remain
> persistent?
>
> Regards
> Lokesh
>


Error in Spark job

2016-07-12 Thread Saurav Sinha
Hi,

I am getting into an issue where job is running in multiple partition
around 21000 parts.


Setting

Driver = 5G
Executor memory = 10G
Total executor core =32
It us falling when I am trying to write to aerospace earlier it is working
fine. I am suspecting number of partition as reason.

Kindly help to solve this.

It is giving error :


16/07/12 14:53:54 INFO MapOutputTrackerMaster: Size of output statuses for
shuffle 37 is 9436142 bytes
16/07/12 14:58:46 WARN HeartbeatReceiver: Removing executor 0 with no
recent heartbeats: 150060 ms exceeds timeout 12 ms
16/07/12 14:58:48 WARN DAGScheduler: Creating new stage failed due to
exception - job: 14
java.lang.IllegalStateException: unread block data
at
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2421)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$2.apply(MapOutputTracker.scala:371)
at
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$2.apply(MapOutputTracker.scala:371)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
at
org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:372)
at
org.apache.spark.scheduler.DAGScheduler.newOrUsedShuffleStage(DAGScheduler.scala:292)
at
org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:343)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:221)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:324)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:321)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:321)
at
org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:333)
at
org.apache.spark.scheduler.DAGScheduler.getParentStagesAndId(DAGScheduler.scala:234)
at
org.apache.spark.scheduler.DAGScheduler.newResultStage(DAGScheduler.scala:270)
at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:768)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1426)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
16/07/12 14:58:48 ERROR TaskSchedulerImpl: Lost executor 0 : Executor
heartbeat timed out after 150060 ms
16/07/12 14:58:48 INFO DAGScheduler: Job 14 failed: foreachPartition at
WriteToAerospike.java:47, took 338.345827 s
16/07/12 14:58:48 ERROR MinervaLauncher: Job failed due to exception
=java.lang.IllegalStateException: unread block data
java.lang.IllegalStateException: unread block data
at
java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2421)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$2.apply(MapOutputTracker.scala:371)
at
org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$2.apply(MapOutputTracker.scala:371)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
at
org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:372)
at
org.apache.spark.scheduler.DAGScheduler.newOrUsedShuffleStage(DAGScheduler.scala:292)
at
org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:343)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:221)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:324)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:321)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:321)
at
org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:333)
at
org.apache.spark.scheduler.DAGScheduler.getParentStagesAndId(DAGScheduler.scala:234)
at
org.apache.spark.scheduler.DAGScheduler.newResultStage(DAGScheduler.scala:270)
at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:768)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1426)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
at org.apache.spark.util.EventL

Re: Error in Spark job

2016-07-12 Thread Yash Sharma
Looks like the write to Aerospike is taking too long.

Could you try writing the rdd directly to filesystem, skipping the
Aerospike write.

foreachPartition at WriteToAerospike.java:47, took 338.345827 s

- Thanks, via mobile,  excuse brevity.
On Jul 12, 2016 8:08 PM, "Saurav Sinha"  wrote:

> Hi,
>
> I am getting into an issue where job is running in multiple partition
> around 21000 parts.
>
>
> Setting
>
> Driver = 5G
> Executor memory = 10G
> Total executor core =32
> It us falling when I am trying to write to aerospace earlier it is working
> fine. I am suspecting number of partition as reason.
>
> Kindly help to solve this.
>
> It is giving error :
>
>
> 16/07/12 14:53:54 INFO MapOutputTrackerMaster: Size of output statuses for
> shuffle 37 is 9436142 bytes
> 16/07/12 14:58:46 WARN HeartbeatReceiver: Removing executor 0 with no
> recent heartbeats: 150060 ms exceeds timeout 12 ms
> 16/07/12 14:58:48 WARN DAGScheduler: Creating new stage failed due to
> exception - job: 14
> java.lang.IllegalStateException: unread block data
> at
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2421)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$2.apply(MapOutputTracker.scala:371)
> at
> org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$2.apply(MapOutputTracker.scala:371)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
> at
> org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:372)
> at
> org.apache.spark.scheduler.DAGScheduler.newOrUsedShuffleStage(DAGScheduler.scala:292)
> at
> org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:343)
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:221)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:324)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:321)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:321)
> at
> org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:333)
> at
> org.apache.spark.scheduler.DAGScheduler.getParentStagesAndId(DAGScheduler.scala:234)
> at
> org.apache.spark.scheduler.DAGScheduler.newResultStage(DAGScheduler.scala:270)
> at
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:768)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1426)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> 16/07/12 14:58:48 ERROR TaskSchedulerImpl: Lost executor 0 : Executor
> heartbeat timed out after 150060 ms
> 16/07/12 14:58:48 INFO DAGScheduler: Job 14 failed: foreachPartition at
> WriteToAerospike.java:47, took 338.345827 s
> 16/07/12 14:58:48 ERROR MinervaLauncher: Job failed due to exception
> =java.lang.IllegalStateException: unread block data
> java.lang.IllegalStateException: unread block data
> at
> java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2421)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$2.apply(MapOutputTracker.scala:371)
> at
> org.apache.spark.MapOutputTracker$$anonfun$deserializeMapStatuses$2.apply(MapOutputTracker.scala:371)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285)
> at
> org.apache.spark.MapOutputTracker$.deserializeMapStatuses(MapOutputTracker.scala:372)
> at
> org.apache.spark.scheduler.DAGScheduler.newOrUsedShuffleStage(DAGScheduler.scala:292)
> at
> org.apache.spark.scheduler.DAGScheduler.registerShuffleDependencies(DAGScheduler.scala:343)
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$getShuffleMapStage(DAGScheduler.scala:221)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:324)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$visit$1$1.apply(DAGScheduler.scala:321)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:321)
> at
> org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala

Re: KEYS file?

2016-07-12 Thread Steve Loughran

On 11 Jul 2016, at 04:48, Shuai Lin 
mailto:linshuai2...@gmail.com>> wrote:

at least links to the keys used to sign releases on the
download page

+1 for that.


really all release keys for ASF projects should be signed by others in the 
project and the broader ASF community; its really time for the next apachecons 
& similar to do key auth sessions. Oh, and you should be verifying full 
signatures; generating collisions in short signatures is now computationally 
feasible.

I've authenticated patrick's key  EEDA BD1C 71C5 48D6 F006  61D3 7C6C 105F FC8E 
D089 and pushed that fact up to the MIT keyservers; I'm willing to do the same 
for others over skype/F2F.

And at some point someone needs to enhance ivy/maven to check GPG signatures of 
artifacts on the public repos. Checksum validation is meaningless unless you 
are getting the checksums from a trusted HTTPS server *and* the versions of the 
HTTP client you have gets its HTTPS signature logic right (something the asf 
commons http libs haven't always done).


Re: Handling categorical variables in StreamingLogisticRegressionwithSGD

2016-07-12 Thread Sean Owen
Yeah, for this to work, you need to know the number of distinct values
a categorical feature will take on, ever. Sometimes that's known,
sometimes it's not.

One option is to use an algorithm that can use categorical features
directly, like decision trees.

You could consider hashing your features if so. So, you'd have maybe
10 indicator columns and you hash the feature into one of those 10
columns to figure out which one it corresponds to. Of course, when you
have an 11th value it collides with one of them and they get
conflated, but, at least you can sort of proceed.

This is more usually done with a large number of feature values, but
maybe that's what you have. It's more problematic the smaller your
hash space is.

On Tue, Jul 12, 2016 at 10:21 AM, kundan kumar  wrote:
> Hi ,
>
> I am trying to use StreamingLogisticRegressionwithSGD to build a CTR
> prediction model.
>
> The document :
>
> http://spark.apache.org/docs/latest/mllib-linear-methods.html#streaming-linear-regression
>
> mentions that the numFeatures should be constant.
>
> The problem that I am facing is :
> Since most of my variables are categorical, the numFeatures variable should
> be the final set of variables after encoding and parsing the categorical
> variables in labeled point format.
>
> Suppose, for a categorical variable x1 I have 10 distinct values in current
> window.
>
> But in the next window some new values/items gets added to x1 and the number
> of distinct values increases. How should I handle the numFeatures variable
> in this case, because it will change now ?
>
> Basically, my question is how should I handle the new values of the
> categorical variables in streaming model.
>
> Thanks,
> Kundan
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Using Spark on Hive with Hive also using Spark as its execution engine

2016-07-12 Thread Jörn Franke

I think the comparison with Oracle rdbms and oracle times ten is not so good. 
There are times when the in-memory database of Oracle is slower than the rdbms 
(especially in case of Exadata) due to the issue that in-memory - as in Spark - 
means everything is in memory and everything is always processed (no storage 
indexes , no bloom filters etc) which explains this behavior quiet well.

Hence, I do not agree with the statement that tez is basically mr with dag (or 
that llap is basically in-memory which is also not correct). This is a wrong 
oversimplification and I do not think this is useful for the community, but 
better is to understand when something can be used and when not. In-memory is 
also not the solution to everything and if you look for example behind SAP Hana 
or NoSql there is much more around this, which is not even on the roadmap of 
Spark.

Anyway, discovering good use case patterns should be done on standardized 
benchmarks going beyond the select count etc 

> On 12 Jul 2016, at 11:16, Mich Talebzadeh  wrote:
> 
> That is only a plan not what execution engine is doing.
> 
> As I stated before Spark uses DAG + in-memory computing. MR is serial on 
> disk. 
> 
> The key is the execution here or rather the execution engine.
> 
> In general
> 
> The standard MapReduce  as I know reads the data from HDFS, apply map-reduce 
> algorithm and writes back to HDFS. If there are many iterations of map-reduce 
> then, there will be many intermediate writes to HDFS. This is all serial 
> writes to disk. Each map-reduce step is completely independent of other 
> steps, and the executing engine does not have any global knowledge of what 
> map-reduce steps are going to come after each map-reduce step. For many 
> iterative algorithms this is inefficient as the data between each map-reduce 
> pair gets written and read from the file system.
> 
> The equivalent to parallelism in Big Data is deploying what is known as 
> Directed Acyclic Graph (DAG) algorithm. In a nutshell deploying DAG results 
> in a fuller picture of global optimisation by deploying parallelism, 
> pipelining consecutive map steps into one and not writing intermediate data 
> to HDFS. So in short this prevents writing data back and forth after every 
> reduce step which for me is a significant improvement, compared to the 
> classical MapReduce algorithm.
> 
> Now Tez is basically MR with DAG. With Spark you get DAG + in-memory 
> computing. Think of it as a comparison between a classic RDBMS like Oracle 
> and IMDB like Oracle TimesTen with in-memory processing.
> 
> The outcome is that Hive using Spark as execution engine is pretty 
> impressive. You have the advantage of Hive CBO + In-memory computing. If you 
> use Spark for all this (say Spark SQL) but no Hive, Spark uses its own 
> optimizer called Catalyst that does not have CBO yet plus in memory computing.
> 
> As usual your mileage varies.
> 
> HTH
> 
> 
> Dr Mich Talebzadeh
>  
> LinkedIn  
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>  
> http://talebzadehmich.wordpress.com
> 
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
>> On 12 July 2016 at 09:33, Markovitz, Dudu  wrote:
>> I don’t see how this explains the time differences.
>> 
>>  
>> 
>> Dudu
>> 
>>  
>> 
>> From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com] 
>> Sent: Tuesday, July 12, 2016 10:56 AM
>> To: user 
>> Cc: user @spark 
>> 
>> 
>> Subject: Re: Using Spark on Hive with Hive also using Spark as its execution 
>> engine
>>  
>> 
>> This the whole idea. Spark uses DAG + IM, MR is classic
>> 
>>  
>> 
>>  
>> 
>> This is for Hive on Spark
>> 
>>  
>> 
>> hive> explain select max(id) from dummy_parquet;
>> OK
>> STAGE DEPENDENCIES:
>>   Stage-1 is a root stage
>>   Stage-0 depends on stages: Stage-1
>> 
>> STAGE PLANS:
>>   Stage: Stage-1
>> Spark
>>   Edges:
>> Reducer 2 <- Map 1 (GROUP, 1)
>>   DagName: hduser_20160712083219_632c2749-7387-478f-972d-9eaadd9932c6:1
>>   Vertices:
>> Map 1
>> Map Operator Tree:
>> TableScan
>>   alias: dummy_parquet
>>   Statistics: Num rows: 1 Data size: 7 Basic 
>> stats: COMPLETE Column stats: NONE
>>   Select Operator
>> expressions: id (type: int)
>> outputColumnNames: id
>> Statistics: Num rows: 1 Data size: 7 
>> Basic stats: COMPLETE Column stats: NONE
>> Group By Operator
>>   aggregations: max(id)
>>   mode: hash
>>   outputColumnNames: _col0
>> 

Large files with wholetextfile()

2016-07-12 Thread Bahubali Jain
Hi,
We have a requirement where in we need to process set of xml files, each of
the xml files contain several records (eg:

 data of record 1..



data of record 2..


Expected output is   

Since we needed file name as well in output ,we chose wholetextfile() . We
had to go against using StreamXmlRecordReader and StreamInputFormat since I
could not find a way to retreive the filename.

These xml files could be pretty big, occasionally they could reach a size
of 1GB.Since contents of each file would be put into a single partition,would
such big files be a issue ?
The AWS cluster(50 Nodes) that we use is fairly strong , with each machine
having memory of around 60GB.

Thanks,
Baahu


Re: Large files with wholetextfile()

2016-07-12 Thread Prashant Sharma
Hi Baahu,

That should not be a problem, given you allocate sufficient buffer for
reading.

I was just working on implementing a patch[1] to support the feature for
reading wholetextfiles in SQL. This can actually be slightly better
approach, because here we read to offheap memory for holding data(using
unsafe interface).

1. https://github.com/apache/spark/pull/14151

Thanks,



--Prashant


On Tue, Jul 12, 2016 at 6:24 PM, Bahubali Jain  wrote:

> Hi,
> We have a requirement where in we need to process set of xml files, each
> of the xml files contain several records (eg:
> 
>  data of record 1..
> 
>
> 
> data of record 2..
> 
>
> Expected output is   
>
> Since we needed file name as well in output ,we chose wholetextfile() . We
> had to go against using StreamXmlRecordReader and StreamInputFormat since I
> could not find a way to retreive the filename.
>
> These xml files could be pretty big, occasionally they could reach a size
> of 1GB.Since contents of each file would be put into a single partition,would
> such big files be a issue ?
> The AWS cluster(50 Nodes) that we use is fairly strong , with each machine
> having memory of around 60GB.
>
> Thanks,
> Baahu
>


Re: Using Spark on Hive with Hive also using Spark as its execution engine

2016-07-12 Thread Mich Talebzadeh
sorry I completely miss your points

I was NOT talking about Exadata. I was comparing Oracle 12c caching with
that of Oracle TimesTen. no one mentioned Exadata here and neither
storeindex etc..


so if Tez is not MR with DAG could you give me an example of how it works.
No opinions but relevant to this point. I do not know much about Tez as I
stated it before

Case in point if Tez could do the job on its own why Tez is used in
conjunction with LLAP as Martin alluded to as well in this thread.


Having said that , I would be interested if you provide a working example
of Hive on Tez, compared to Hive on MR.

One experiment is worth hundreds of opinions





Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 12 July 2016 at 13:31, Jörn Franke  wrote:

>
> I think the comparison with Oracle rdbms and oracle times ten is not so
> good. There are times when the in-memory database of Oracle is slower than
> the rdbms (especially in case of Exadata) due to the issue that in-memory -
> as in Spark - means everything is in memory and everything is always
> processed (no storage indexes , no bloom filters etc) which explains this
> behavior quiet well.
>
> Hence, I do not agree with the statement that tez is basically mr with dag
> (or that llap is basically in-memory which is also not correct). This is a
> wrong oversimplification and I do not think this is useful for the
> community, but better is to understand when something can be used and when
> not. In-memory is also not the solution to everything and if you look for
> example behind SAP Hana or NoSql there is much more around this, which is
> not even on the roadmap of Spark.
>
> Anyway, discovering good use case patterns should be done on standardized
> benchmarks going beyond the select count etc
>
> On 12 Jul 2016, at 11:16, Mich Talebzadeh 
> wrote:
>
> That is only a plan not what execution engine is doing.
>
> As I stated before Spark uses DAG + in-memory computing. MR is serial on
> disk.
>
> The key is the execution here or rather the execution engine.
>
> In general
>
> The standard MapReduce  as I know reads the data from HDFS, apply
> map-reduce algorithm and writes back to HDFS. If there are many iterations
> of map-reduce then, there will be many intermediate writes to HDFS. This is
> all serial writes to disk. Each map-reduce step is completely independent
> of other steps, and the executing engine does not have any global knowledge
> of what map-reduce steps are going to come after each map-reduce step. For
> many iterative algorithms this is inefficient as the data between each
> map-reduce pair gets written and read from the file system.
>
> The equivalent to parallelism in Big Data is deploying what is known as
> Directed Acyclic Graph (DAG
> ) algorithm. In a
> nutshell deploying DAG results in a fuller picture of global optimisation
> by deploying parallelism, pipelining consecutive map steps into one and not
> writing intermediate data to HDFS. So in short this prevents writing data
> back and forth after every reduce step which for me is a significant
> improvement, compared to the classical MapReduce algorithm.
>
> Now Tez is basically MR with DAG. With Spark you get DAG + in-memory
> computing. Think of it as a comparison between a classic RDBMS like Oracle
> and IMDB like Oracle TimesTen with in-memory processing.
>
> The outcome is that Hive using Spark as execution engine is pretty
> impressive. You have the advantage of Hive CBO + In-memory computing. If
> you use Spark for all this (say Spark SQL) but no Hive, Spark uses its own
> optimizer called Catalyst that does not have CBO yet plus in memory
> computing.
>
> As usual your mileage varies.
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 12 July 2016 at 09:33, Markovitz, Dudu  wrote:
>
>> I don’t see how this explains the time differences.
>>
>>
>>

Send real-time alert using Spark

2016-07-12 Thread Priya Ch
Hi All,

 I am building Real-time Anomaly detection system where I am using k-means
to detect anomaly. Now in-order to send alert to mobile or an email alert
how do i send it using Spark itself ?

Thanks,
Padma CH


Re: Using Spark on Hive with Hive also using Spark as its execution engine

2016-07-12 Thread Mich Talebzadeh
Hi Ayan,

This is a very valid question and  I have not seen any available
instrumentation in Spark that allows one to measure this in a practical way
in a cluster.

Classic example:


   1. if you have memory issue do you upgrade your RAM or scale out
   horizontally by adding couple of more nodes
   2. In Spark is there such a use case where you actually get better
   performance by scaling up as opposed to scaling out?
   3. Very Large Table (VLT) should be viewed in the context of available
   resources within the cluster.


If any one has answers to the above and more I would be interested to know.

HTH





Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 12 July 2016 at 03:22, ayan guha  wrote:

> ccHi Mich
>
> Thanks for showing examples, makes perfect sense.
>
> One question: "...I agree that on VLT (very large tables), the limitation
> in available memory may be the overriding factor in using Spark"...have you
> observed any specific threshold for VLT which tilts the favor against
> Spark. For example, if I have a 10 node cluster with (say) 64G RAM and
> 8CPU, where I should expect Spark to crumble? What if my node is 128G RAM?
>
> I know its difficult to answer these values empirically and YMMV depending
> on cluster load, data format,  query etc. But is there a guesstimate around?
>
> Best
> Ayan
>
> On Tue, Jul 12, 2016 at 9:22 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Another point with Hive on spark and Hive on Tez + LLAP, I am thinking
>> loud :)
>>
>>
>>1. I am using Hive on Spark and I have a table of 10GB say with 100
>>users concurrently accessing the same partition of ORC table  (last one
>>hour or so)
>>2. Spark takes data and puts in in memory. I gather only data for
>>that partition will be loaded for 100 users. In other words there will be
>>100 copies.
>>3. Spark unlike RDBMS does not have the notion of hot cache or Most
>>Recently Used (MRU) or Least Recently Used. So once the user finishes data
>>is released from Spark memory. The next user will load that data again.
>>Potentially this is somehow wasteful of resources?
>>4. With Tez we only have DAG. It is MR with DAG. So the same
>>algorithm will be applied to 100 users session but no memory usage
>>5. If I add LLAP, will that be more efficient in terms of memory
>>usage compared to Hive or not? Will it keep the data in memory for reuse 
>> or
>>not.
>>6. What I don't understand what makes Tez and LLAP more efficient
>>compared to Spark!
>>
>> Cheers
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 11 July 2016 at 21:54, Mich Talebzadeh 
>> wrote:
>>
>>> In my test I did like for like keeping the systematic the same namely:
>>>
>>>
>>>1. Table was a parquet table of 100 Million rows
>>>2. The same set up was used for both Hive on Spark and Hive on MR
>>>3. Spark was very impressive compared to MR on this particular test.
>>>
>>>
>>> Just to see any issues I created an ORC table in in the image of Parquet
>>> (insert/select from Parquet to ORC) with stats updated for columns etc
>>>
>>> These were the results of the same run using ORC table this time:
>>>
>>> hive> select max(id) from oraclehadoop.dummy;
>>>
>>> Starting Spark Job = b886b869-5500-4ef7-aab9-ae6fb4dad22b
>>> Query Hive on Spark job[1] stages:
>>> 2
>>> 3
>>> Status: Running (Hive on Spark job[1])
>>> Job Progress Format
>>> CurrentTime StageId_StageAttemptId:
>>> SucceededTasksCount(+RunningTasksCount-FailedTasksCount)/TotalTasksCount
>>> [StageCost]
>>> 2016-07-11 21:35:45,020 Stage-2_0: 0(+8)/23 Stage-3_0: 0/1
>>> 2016-07-11 21:35:48,033 Stage-2_0: 0(+8)/23 Stage-3_0: 0/1
>>> 2016-07-11 21:35:51,046 Stage-2_0: 1(+8)/23 Stage-3_0: 0/1
>>> 2016-07-11 21:35:52,050 Stage-2_0: 3(+8)/23 Stage-3_0: 0/1
>>> 2016-07-11 21:35:53,055 Stage-2_0: 8(+4)/23 Stage-3_0:

Re: Send real-time alert using Spark

2016-07-12 Thread Priya Ch
I mean model training on incoming data is taken care by Spark. For detected
anomalies, need to send alert. Could we do this with Spark or any other
framework like Akka/REST API would do it ?

Thanks,
Padma CH

On Tue, Jul 12, 2016 at 7:30 PM, Marcin Tustin 
wrote:

> Priya,
>
> You wouldn't necessarily "use spark" to send the alert. Spark is in an
> important sense one library among many. You can have your application use
> any other library available for your language to send the alert.
>
> Marcin
>
> On Tue, Jul 12, 2016 at 9:25 AM, Priya Ch 
> wrote:
>
>> Hi All,
>>
>>  I am building Real-time Anomaly detection system where I am using
>> k-means to detect anomaly. Now in-order to send alert to mobile or an email
>> alert how do i send it using Spark itself ?
>>
>> Thanks,
>> Padma CH
>>
>
>
> Want to work at Handy? Check out our culture deck and open roles
> 
> Latest news  at Handy
> Handy just raised $50m
> 
>  led
> by Fidelity
>
>


Re: Using Spark on Hive with Hive also using Spark as its execution engine

2016-07-12 Thread Mich Talebzadeh
I guess that is what DAG adds up to with Tez



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 12 July 2016 at 14:40, Marcin Tustin  wrote:

> More like 2x than 10x as I recall.
>
> On Tue, Jul 12, 2016 at 9:39 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> thanks Marcin.
>>
>> What Is your guesstimate on the order of "faster" please?
>>
>> Cheers
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 12 July 2016 at 14:35, Marcin Tustin  wrote:
>>
>>> Quick note - my experience (no benchmarks) is that Tez without LLAP
>>> (we're still not on hive 2) is faster than MR by some way. I haven't dug
>>> into why that might be.
>>>
>>> On Tue, Jul 12, 2016 at 9:19 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 sorry I completely miss your points

 I was NOT talking about Exadata. I was comparing Oracle 12c caching
 with that of Oracle TimesTen. no one mentioned Exadata here and neither
 storeindex etc..


 so if Tez is not MR with DAG could you give me an example of how it
 works. No opinions but relevant to this point. I do not know much about Tez
 as I stated it before

 Case in point if Tez could do the job on its own why Tez is used in
 conjunction with LLAP as Martin alluded to as well in this thread.


 Having said that , I would be interested if you provide a working
 example of Hive on Tez, compared to Hive on MR.

 One experiment is worth hundreds of opinions





 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com


 *Disclaimer:* Use it at your own risk. Any and all responsibility for
 any loss, damage or destruction of data or any other property which may
 arise from relying on this email's technical content is explicitly
 disclaimed. The author will in no case be liable for any monetary damages
 arising from such loss, damage or destruction.



 On 12 July 2016 at 13:31, Jörn Franke  wrote:

>
> I think the comparison with Oracle rdbms and oracle times ten is not
> so good. There are times when the in-memory database of Oracle is slower
> than the rdbms (especially in case of Exadata) due to the issue that
> in-memory - as in Spark - means everything is in memory and everything is
> always processed (no storage indexes , no bloom filters etc) which 
> explains
> this behavior quiet well.
>
> Hence, I do not agree with the statement that tez is basically mr with
> dag (or that llap is basically in-memory which is also not correct). This
> is a wrong oversimplification and I do not think this is useful for the
> community, but better is to understand when something can be used and when
> not. In-memory is also not the solution to everything and if you look for
> example behind SAP Hana or NoSql there is much more around this, which is
> not even on the roadmap of Spark.
>
> Anyway, discovering good use case patterns should be done on
> standardized benchmarks going beyond the select count etc
>
> On 12 Jul 2016, at 11:16, Mich Talebzadeh 
> wrote:
>
> That is only a plan not what execution engine is doing.
>
> As I stated before Spark uses DAG + in-memory computing. MR is serial
> on disk.
>
> The key is the execution here or rather the execution engine.
>
> In general
>
> The standard MapReduce  as I know reads the data from HDFS, apply
> map-reduce algorithm and writes back to HDFS. If there are many iterations
> of map-reduce then, there will be many intermediate writes

ml and mllib persistence

2016-07-12 Thread aka.fe2s
What is the reason Spark has an individual implementations of read/write
routines for every model in mllib and ml? (Saveable and MLWritable trait
impls)

Wouldn't a generic implementation via Java serialization mechanism work? I
would like to use it to store the models to a custom storage.

--
Oleksiy


Re: Send real-time alert using Spark

2016-07-12 Thread Sivakumaran S
What language are you coding in? Use a mail client library to send out a custom 
mail to the required recipient. If you want to send an alert to a mobile, you 
may have to install a GSM card in the machine and then use it to send an SMS.

HTH,

Regards,

Sivakumaran

> On 12-Jul-2016, at 3:35 PM, Priya Ch  wrote:
> 
> I mean model training on incoming data is taken care by Spark. For detected 
> anomalies, need to send alert. Could we do this with Spark or any other 
> framework like Akka/REST API would do it ?
> 
> Thanks,
> Padma CH
> 
> On Tue, Jul 12, 2016 at 7:30 PM, Marcin Tustin  > wrote:
> Priya,
> 
> You wouldn't necessarily "use spark" to send the alert. Spark is in an 
> important sense one library among many. You can have your application use any 
> other library available for your language to send the alert. 
> 
> Marcin
> 
> On Tue, Jul 12, 2016 at 9:25 AM, Priya Ch  > wrote:
> Hi All,
> 
>  I am building Real-time Anomaly detection system where I am using k-means to 
> detect anomaly. Now in-order to send alert to mobile or an email alert how do 
> i send it using Spark itself ?
> 
> Thanks,
> Padma CH
> 
> 
> Want to work at Handy? Check out our culture deck and open roles 
> 
> Latest news  at Handy
> Handy just raised $50m 
> 
>  led by Fidelity
> 
> 
> 



Re: Spark SQL: Merge Arrays/Sets

2016-07-12 Thread Pedro Rodriguez
I saw that answer before, but as the response mentions its quite expensive.
I was able to do so with a UDAF, but was curious if I was just missing
something.

A more general question, what are the requirements to decide that a new
Spark SQL function should be added? Being able to make UDAFs is great, but
they also don't have native code generated and don't have supports to
"generics".

Pedro

On Mon, Jul 11, 2016 at 11:52 PM, Yash Sharma  wrote:

> This answers exactly what you are looking for -
>
> http://stackoverflow.com/a/34204640/1562474
>
> On Tue, Jul 12, 2016 at 6:40 AM, Pedro Rodriguez 
> wrote:
>
>> Is it possible with Spark SQL to merge columns whose types are Arrays or
>> Sets?
>>
>> My use case would be something like this:
>>
>> DF types
>> id: String
>> words: Array[String]
>>
>> I would want to do something like
>>
>> df.groupBy('id).agg(merge_arrays('words)) -> list of all words
>> df.groupBy('id).agg(merge_sets('words)) -> list of distinct words
>>
>> Thanks,
>> --
>> Pedro Rodriguez
>> PhD Student in Distributed Machine Learning | CU Boulder
>> UC Berkeley AMPLab Alumni
>>
>> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
>> Github: github.com/EntilZha | LinkedIn:
>> https://www.linkedin.com/in/pedrorodriguezscience
>>
>>
>


-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


RDD for loop vs foreach

2016-07-12 Thread philipghu
Hi,

I'm new to Spark and Scala as well. I understand that we can use foreach to
apply a function to each element of an RDD, like rdd.foreach
(x=>println(x)), but I saw we can also do a for loop to print each element
of an RDD, like 

for (x <- rdd){
println(x)
}

Does defining the foreach function in RDD make an RDD traversable like this?
Does the compiler automatically invoke the foreach function when it sees a
for loop? 


Thanks!
Phil





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-for-loop-vs-foreach-tp27326.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: RDD for loop vs foreach

2016-07-12 Thread aka.fe2s
Correct.

It's desugared into rdd.foreach() by Scala compiler

--
Oleksiy Dyagilev

On Tue, Jul 12, 2016 at 6:58 PM, philipghu  wrote:

> Hi,
>
> I'm new to Spark and Scala as well. I understand that we can use foreach to
> apply a function to each element of an RDD, like rdd.foreach
> (x=>println(x)), but I saw we can also do a for loop to print each element
> of an RDD, like
>
> for (x <- rdd){
> println(x)
> }
>
> Does defining the foreach function in RDD make an RDD traversable like
> this?
> Does the compiler automatically invoke the foreach function when it sees a
> for loop?
>
>
> Thanks!
> Phil
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-for-loop-vs-foreach-tp27326.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Matrix Factorization Model model.save error "NullPointerException"

2016-07-12 Thread Zhou (Joe) Xing
Anyone may have an idea on what this NPE issue below is about? Thank you!

cheers

zhou

On Jul 11, 2016, at 11:27 PM, Zhou (Joe) Xing 
mailto:joe.x...@nextev.com>> wrote:


Hi Guys,

I searched for the archive and also googled this problem when saving the ALS 
trained Matrix Factorization Model to local file system using Model.save() 
method, I found some hints such as partition the model before saving, etc. But 
it does not seem to solve my problem. I’m always getting this NPE error when 
running in a cluster of several nodes, while it’s totally fine when running in 
local node.

I’m using spark 1.6.2, pyspark. Any hint would be appreciated! thank you

cheers

zhou




model = ALS.train(ratingsRDD, rank, numIter, lmbda, 5)



16/07/12 02:14:32 INFO ParquetFileReader: Initiating action with parallelism: 5
16/07/12 02:14:32 WARN ParquetOutputCommitter: could not write summary file for 
file:/home/ec2-user/myCollaborativeFilterNoTesting_2016_07_12_02_13_35.dat/data/product
java.lang.NullPointerException
at 
org.apache.parquet.hadoop.ParquetFileWriter.mergeFooters(ParquetFileWriter.java:456)
at 
org.apache.parquet.hadoop.ParquetFileWriter.writeMetadataFile(ParquetFileWriter.java:420)
at 
org.apache.parquet.hadoop.ParquetOutputCommitter.writeMetaDataFile(ParquetOutputCommitter.java:58)
at 
org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
at 
org.apache.spark.sql.execution.datasources.BaseWriterContainer.commitJob(WriterContainer.scala:230)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:149)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:106)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:106)
at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:106)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
at 
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334)
at 
org.apache.spark.mllib.recommendation.MatrixFactorizationModel$SaveLoadV1_0$.save(MatrixFactorizationModel.scala:362)
at 
org.apache.spark.mllib.recommendation.MatrixFactorizationModel.save(MatrixFactorizationModel.scala:205)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)



Re: location of a partition in the cluster/ how parallelize method distribute the RDD partitions over the cluster.

2016-07-12 Thread aka.fe2s
The local collection is distributed into the cluster when you run any
action http://spark.apache.org/docs/latest/programming-guide.html#actions
due to laziness of RDD.

If you want to control how the collection is split into parititions, you
can create your own RDD implementation and implement this logic
in getPartitions/compute methods. See the ParallelCollectionRDD as a
reference.

--
Oleksiy Dyagilev

On Sun, Jul 10, 2016 at 3:58 PM, Mazen  wrote:

> Hi,
>
> Any hint about getting the location of a particular RDD partition on the
> cluster? a workaround?
>
>
> Parallelize method on RDDs partitions the RDD into splits  as specified or
> per as per the  default parallelism configuration. Does parallelize
> actually
> distribute the partitions into the cluster or the partitions are kept on
> the
> driver node. In the first case is there a protocol for assigning/mapping
> partitions (parallelocollectionpartition) to workers or it is just random.
> Otherwise, when partitions are distributed on the cluster? Is that when
> tasks are launched on partitions?
>
> thanks.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/location-of-a-partition-in-the-cluster-how-parallelize-method-distribute-the-RDD-partitions-over-the-tp27316.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: RDD for loop vs foreach

2016-07-12 Thread Deepak Sharma
Hi Phil
I guess for() is executed on the driver while foreach() will execute it in
parallel.
You can try this without collecting the rdd try both .
foreach in this case would print on executors and you would not see
anything on the driver console.
Thanks
Deepak


On Tue, Jul 12, 2016 at 9:28 PM, philipghu  wrote:

> Hi,
>
> I'm new to Spark and Scala as well. I understand that we can use foreach to
> apply a function to each element of an RDD, like rdd.foreach
> (x=>println(x)), but I saw we can also do a for loop to print each element
> of an RDD, like
>
> for (x <- rdd){
> println(x)
> }
>
> Does defining the foreach function in RDD make an RDD traversable like
> this?
> Does the compiler automatically invoke the foreach function when it sees a
> for loop?
>
>
> Thanks!
> Phil
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-for-loop-vs-foreach-tp27326.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: ml and mllib persistence

2016-07-12 Thread aka.fe2s
Okay, I think I found an answer on my question. Some models (for instance
org.apache.spark.mllib.recommendation.MatrixFactorizationModel) hold RDDs,
so just serializing these objects will not work.

--
Oleksiy Dyagilev

On Tue, Jul 12, 2016 at 5:40 PM, aka.fe2s  wrote:

> What is the reason Spark has an individual implementations of read/write
> routines for every model in mllib and ml? (Saveable and MLWritable trait
> impls)
>
> Wouldn't a generic implementation via Java serialization mechanism work? I
> would like to use it to store the models to a custom storage.
>
> --
> Oleksiy
>


Re: ml and mllib persistence

2016-07-12 Thread Reynold Xin
Also Java serialization isn't great for cross platform compatibility.

On Tuesday, July 12, 2016, aka.fe2s  wrote:

> Okay, I think I found an answer on my question. Some models (for instance
> org.apache.spark.mllib.recommendation.MatrixFactorizationModel) hold RDDs,
> so just serializing these objects will not work.
>
> --
> Oleksiy Dyagilev
>
> On Tue, Jul 12, 2016 at 5:40 PM, aka.fe2s  > wrote:
>
>> What is the reason Spark has an individual implementations of read/write
>> routines for every model in mllib and ml? (Saveable and MLWritable trait
>> impls)
>>
>> Wouldn't a generic implementation via Java serialization mechanism work?
>> I would like to use it to store the models to a custom storage.
>>
>> --
>> Oleksiy
>>
>
>


Feature importance IN random forest

2016-07-12 Thread pseudo oduesp
Hi,
 i use pyspark 1.5.0
can i  ask you how i can get feature imprtance for a randomforest
algorithme in pyspark and please give me example
thanks for advance.


Output Op Duration vs Job Duration: What's the difference?

2016-07-12 Thread Renxia Wang
Hi,

I am using Spark 1.6.1 on EMR running a streaming app on YARN. From the
Spark UI I see that for each batch, the *Output Op Duration* is larger
than *Job
Duration *(screenshot attached). What's the difference between these two,
is the *Job Duration* only counts the executor time of each time, but *Output
Op Duration* includes scheduler delay and other stuffs?


​
Thanks!

Renxia


Re: QuantileDiscretizer not working properly with big dataframes

2016-07-12 Thread Pasquinell Urbani
In the forum mentioned above the flowing solution is suggested

Problem is in line 113 and 114 of QuantileDiscretizer.scala and can be
fixed by changing line 113 like so:
before: val requiredSamples = math.max(numBins * numBins, 1)
after: val requiredSamples = math.max(numBins * numBins, 1.0)

Is there another way?


2016-07-11 18:28 GMT-04:00 Pasquinell Urbani <
pasquinell.urb...@exalitica.com>:

> Hi all,
>
> We have a dataframe with 2.5 millions of records and 13 features. We want
> to perform a logistic regression with this data but first we neet to divide
> each columns in discrete values using QuantileDiscretizer. This will
> improve the performance of the model by avoiding outliers.
>
> For small dataframes QuantileDiscretizer works perfect (see the ml
> example:
> https://spark.apache.org/docs/1.6.0/ml-features.html#quantilediscretizer),
> but for large data frames it tends to split the column in only the values 0
> and 1 (despite the custom number of buckets is settled in to 5). Here is my
> code:
>
> val discretizer = new QuantileDiscretizer()
>   .setInputCol("C4")
>   .setOutputCol("C4_Q")
>   .setNumBuckets(5)
>
> val result = discretizer.fit(df3).transform(df3)
> result.show()
>
> I found the same problem presented here:
> https://issues.apache.org/jira/browse/SPARK-13444 . But there is no
> solution yet.
>
> Do I am configuring the function in a bad way? Should I pre-process the
> data (like z-scores)? Can somebody help me dealing with this?
>
> Regards
>


Re: bisecting kmeans model tree

2016-07-12 Thread roni
Hi Spark,Mlib experts,
Anyone who can shine light on this?
Thanks
_R

On Thu, Apr 21, 2016 at 12:46 PM, roni  wrote:

> Hi ,
>  I want to get the bisecting kmeans tree structure to show a dendogram  on
> the heatmap I am generating based on the hierarchical clustering of data.
>  How do I get that using mlib .
> Thanks
> -Roni
>


Tools for Balancing Partitions by Size

2016-07-12 Thread Pedro Rodriguez
Hi,

Are there any tools for partitioning RDD/DataFrames by size at runtime? The
idea would be to specify that I would like for each partition to be roughly
X number of megabytes then write that through to S3. I haven't found
anything off the shelf, and looking through stack overflow posts doesn't
seem to yield anything concrete.

Is there a way to programmatically get the size or a size estimate for an
RDD/DataFrame at runtime (eg size of one record would be sufficient)? I
gave SizeEstimator a try, but it seems like the results varied quite a bit
(tried on whole RDD and a sample). It would also be useful to get
programmatic access to the size of the RDD in memory if it is cached.

Thanks,
-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Spark hangs at "Removed broadcast_*"

2016-07-12 Thread dhruve ashar
Looking at the jstack, it seems that it doesn't contain all the threads.
Cannot find the main thread in the jstack.

I am not an expert on analyzing jstacks, but are you creating any threads
in your code? Shutting them down correctly?

This one is a non-daemon and doesn't seem to be coming from Spark.
*"Scheduler-2144644334"* #110 prio=5 os_prio=0 tid=0x7f8104001800
nid=0x715 waiting on condition [0x7f812cf95000]

Also, does the shutdown hook get called?


On Tue, Jul 12, 2016 at 2:35 AM, Anton Sviridov  wrote:

> Hi.
>
> Here's the last few lines before it starts removing broadcasts:
>
> 16/07/11 14:02:11 INFO FileOutputCommitter: Saved output of task
> 'attempt_20160723_0009_m_003209_20886' to
> file:/mnt/rendang/cache-main/RunWikistatsSFCounts727fc9d635f25d0922984e59a0d18fdd/stats/sf_counts/_temporary/0/task_20160723_0009_m_003209
> 16/07/11 14:02:11 INFO SparkHadoopMapRedUtil:
> attempt_20160723_0009_m_003209_20886: Committed
> 16/07/11 14:02:11 INFO TaskSetManager: Finished task 3211.0 in stage 9.0
> (TID 20888) in 95 ms on localhost (3209/3214)
> 16/07/11 14:02:11 INFO Executor: Finished task 3209.0 in stage 9.0 (TID
> 20886). 1721 bytes result sent to driver
> 16/07/11 14:02:11 INFO TaskSetManager: Finished task 3209.0 in stage 9.0
> (TID 20886) in 103 ms on localhost (3210/3214)
> 16/07/11 14:02:11 INFO FileOutputCommitter: Saved output of task
> 'attempt_20160723_0009_m_003208_20885' to
> file:/mnt/rendang/cache-main/RunWikistatsSFCounts727fc9d635f25d0922984e59a0d18fdd/stats/sf_counts/_temporary/0/task_20160723_0009_m_003208
> 16/07/11 14:02:11 INFO SparkHadoopMapRedUtil:
> attempt_20160723_0009_m_003208_20885: Committed
> 16/07/11 14:02:11 INFO Executor: Finished task 3208.0 in stage 9.0 (TID
> 20885). 1721 bytes result sent to driver
> 16/07/11 14:02:11 INFO TaskSetManager: Finished task 3208.0 in stage 9.0
> (TID 20885) in 109 ms on localhost (3211/3214)
> 16/07/11 14:02:11 INFO FileOutputCommitter: Saved output of task
> 'attempt_20160723_0009_m_003212_20889' to
> file:/mnt/rendang/cache-main/RunWikistatsSFCounts727fc9d635f25d0922984e59a0d18fdd/stats/sf_counts/_temporary/0/task_20160723_0009_m_003212
> 16/07/11 14:02:11 INFO SparkHadoopMapRedUtil:
> attempt_20160723_0009_m_003212_20889: Committed
> 16/07/11 14:02:11 INFO Executor: Finished task 3212.0 in stage 9.0 (TID
> 20889). 1721 bytes result sent to driver
> 16/07/11 14:02:11 INFO TaskSetManager: Finished task 3212.0 in stage 9.0
> (TID 20889) in 84 ms on localhost (3212/3214)
> 16/07/11 14:02:11 INFO FileOutputCommitter: Saved output of task
> 'attempt_20160723_0009_m_003210_20887' to
> file:/mnt/rendang/cache-main/RunWikistatsSFCounts727fc9d635f25d0922984e59a0d18fdd/stats/sf_counts/_temporary/0/task_20160723_0009_m_003210
> 16/07/11 14:02:11 INFO SparkHadoopMapRedUtil:
> attempt_20160723_0009_m_003210_20887: Committed
> 16/07/11 14:02:11 INFO Executor: Finished task 3210.0 in stage 9.0 (TID
> 20887). 1721 bytes result sent to driver
> 16/07/11 14:02:11 INFO TaskSetManager: Finished task 3210.0 in stage 9.0
> (TID 20887) in 100 ms on localhost (3213/3214)
> 16/07/11 14:02:11 INFO FileOutputCommitter: File Output Committer
> Algorithm version is 1
> 16/07/11 14:02:11 INFO FileOutputCommitter: Saved output of task
> 'attempt_20160723_0009_m_003213_20890' to
> file:/mnt/rendang/cache-main/RunWikistatsSFCounts727fc9d635f25d0922984e59a0d18fdd/stats/sf_counts/_temporary/0/task_20160723_0009_m_003213
> 16/07/11 14:02:11 INFO SparkHadoopMapRedUtil:
> attempt_20160723_0009_m_003213_20890: Committed
> 16/07/11 14:02:11 INFO Executor: Finished task 3213.0 in stage 9.0 (TID
> 20890). 1721 bytes result sent to driver
> 16/07/11 14:02:11 INFO TaskSetManager: Finished task 3213.0 in stage 9.0
> (TID 20890) in 82 ms on localhost (3214/3214)
> 16/07/11 14:02:11 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks
> have all completed, from pool
> *16/07/11 14:02:11 INFO DAGScheduler: ResultStage 9 (saveAsTextFile at
> SfCountsDumper.scala:13) finished in 42.294 s*
> *16/07/11 14:02:11 INFO DAGScheduler: Job 1 finished: saveAsTextFile at
> SfCountsDumper.scala:13, took 9517.124624 s*
> 16/07/11 14:28:46 INFO BlockManagerInfo: Removed broadcast_0_piece0 on
> 10.101.230.154:35192 in memory (size: 15.8 KB, free: 37.1 GB)
> 16/07/11 14:28:46 INFO ContextCleaner: Cleaned shuffle 7
> 16/07/11 14:28:46 INFO ContextCleaner: Cleaned shuffle 6
> 16/07/11 14:28:46 INFO ContextCleaner: Cleaned shuffle 5
> 16/07/11 14:28:46 INFO ContextCleaner: Cleaned shuffle 4
> 16/07/11 14:28:46 INFO ContextCleaner: Cleaned shuffle 3
> 16/07/11 14:28:46 INFO ContextCleaner: Cleaned shuffle 2
> 16/07/11 14:28:46 INFO ContextCleaner: Cleaned shuffle 1
> 16/07/11 14:28:46 INFO BlockManager: Removing RDD 14
> 16/07/11 14:28:46 INFO ContextCleaner: Cleaned RDD 14
> 16/07/11 14:28:46 INFO BlockManagerInfo: Removed broadcast_11_piece0 on
> 10.101.230.154:35192 in memory (size: 25.5 KB, free: 37.1 GB)
> ...
>
> In fact

Re: Spark cluster tuning recommendation

2016-07-12 Thread Takeshi Yamamuro
Hi,

Have you see a slide in spark summit 2016?
https://spark-summit.org/2016/events/top-5-mistakes-when-writing-spark-applications/
This is a good slide for your capacity planning.

// maropu

On Tue, Jul 12, 2016 at 2:31 PM, Yash Sharma  wrote:

> I would say use the dynamic allocation rather than number of executors.
> Provide some executor memory which you would like.
> Deciding the values requires couple of test runs and checking what works
> best for you.
>
> You could try something like -
>
> --driver-memory 1G \
> --executor-memory 2G \
> --executor-cores 2 \
> --conf spark.dynamicAllocation.enabled=true \
> --conf spark.dynamicAllocation.initialExecutors=8 \
>
>
>
> On Tue, Jul 12, 2016 at 1:27 PM, Anuj Kumar  wrote:
>
>> That configuration looks bad. With only two cores in use and 1GB used by
>> the app. Few points-
>>
>> 1. Please oversubscribe those CPUs to at-least twice the amount of cores
>> you have to start-with and then tune if it freezes
>> 2. Allocate all of the CPU cores and memory to your running app (I assume
>> it is your test environment)
>> 3. Assuming that you are running a quad core machine if you define cores
>> as 8 for your workers you will get 56 cores (CPU threads)
>> 4. Also, it depends on the source from where you are reading the data. If
>> you are reading from HDFS, what is your block size and part count?
>> 5. You may also have to tune the timeouts and frame-size based on the
>> dataset and errors that you are facing
>>
>> We have run terasort with couple of high-end worker machines RW from HDFS
>> with 5-10 mount points allocated for HDFS and Spark local. We have used
>> multiple configuration, like-
>> 10W-10CPU-10GB, 25W-6CPU-6GB running on each of the two machines with
>> HDFS 512MB blocks and 1000-2000 parts. All these guys chatting at 10Gbe,
>> worked well.
>>
>> On Tue, Jul 12, 2016 at 3:39 AM, Kartik Mathur 
>> wrote:
>>
>>> I am trying a run terasort in spark , for a 7 node cluster with only 10g
>>> of data and executors get lost with GC overhead limit exceeded error.
>>>
>>> This is what my cluster looks like -
>>>
>>>
>>>- *Alive Workers:* 7
>>>- *Cores in use:* 28 Total, 2 Used
>>>- *Memory in use:* 56.0 GB Total, 1024.0 MB Used
>>>- *Applications:* 1 Running, 6 Completed
>>>- *Drivers:* 0 Running, 0 Completed
>>>- *Status:* ALIVE
>>>
>>> Each worker has 8 cores and 4GB memory.
>>>
>>> My questions is how do people running in production decide these
>>> properties -
>>>
>>> 1) --num-executors
>>> 2) --executor-cores
>>> 3) --executor-memory
>>> 4) num of partitions
>>> 5) spark.default.parallelism
>>>
>>> Thanks,
>>> Kartik
>>>
>>>
>>>
>>
>


-- 
---
Takeshi Yamamuro


Re: Tools for Balancing Partitions by Size

2016-07-12 Thread Takeshi Yamamuro
Hi,

There is no simple way to access the size in a driver side.
Since the partitions of primitive typed data (e.g., int) are compressed by
`DataFrame#cache`,
the actual size is possibly a little bit different from processing
partitions size.

// maropu

On Wed, Jul 13, 2016 at 4:53 AM, Pedro Rodriguez 
wrote:

> Hi,
>
> Are there any tools for partitioning RDD/DataFrames by size at runtime?
> The idea would be to specify that I would like for each partition to be
> roughly X number of megabytes then write that through to S3. I haven't
> found anything off the shelf, and looking through stack overflow posts
> doesn't seem to yield anything concrete.
>
> Is there a way to programmatically get the size or a size estimate for an
> RDD/DataFrame at runtime (eg size of one record would be sufficient)? I
> gave SizeEstimator a try, but it seems like the results varied quite a bit
> (tried on whole RDD and a sample). It would also be useful to get
> programmatic access to the size of the RDD in memory if it is cached.
>
> Thanks,
> --
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
>
> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
> Github: github.com/EntilZha | LinkedIn:
> https://www.linkedin.com/in/pedrorodriguezscience
>
>


-- 
---
Takeshi Yamamuro


Re: Tools for Balancing Partitions by Size

2016-07-12 Thread Hatim Diab
Hi,

Since the final size depends on data types and compression. I've had to first 
get a rough estimate of data, written to disk, then compute the number of 
partitions.

partitions = int(ceil(size_data * conversion_ratio / block_size))

In my case block size 256mb, source txt & dest is snappy parquet, 
compression_ratio .6

df.repartition(partitions).write.parquet(output)

Which yields files in the range of 230mb.

Another way was to count and come up with an imperial formula.

Cheers,
Hatim


> On Jul 12, 2016, at 9:07 PM, Takeshi Yamamuro  wrote:
> 
> Hi,
> 
> There is no simple way to access the size in a driver side.
> Since the partitions of primitive typed data (e.g., int) are compressed by 
> `DataFrame#cache`,
> the actual size is possibly a little bit different from processing partitions 
> size.
> 
> // maropu
> 
>> On Wed, Jul 13, 2016 at 4:53 AM, Pedro Rodriguez  
>> wrote:
>> Hi,
>> 
>> Are there any tools for partitioning RDD/DataFrames by size at runtime? The 
>> idea would be to specify that I would like for each partition to be roughly 
>> X number of megabytes then write that through to S3. I haven't found 
>> anything off the shelf, and looking through stack overflow posts doesn't 
>> seem to yield anything concrete.
>> 
>> Is there a way to programmatically get the size or a size estimate for an 
>> RDD/DataFrame at runtime (eg size of one record would be sufficient)? I gave 
>> SizeEstimator a try, but it seems like the results varied quite a bit (tried 
>> on whole RDD and a sample). It would also be useful to get programmatic 
>> access to the size of the RDD in memory if it is cached.
>> 
>> Thanks,
>> -- 
>> Pedro Rodriguez
>> PhD Student in Distributed Machine Learning | CU Boulder
>> UC Berkeley AMPLab Alumni
>> 
>> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
>> Github: github.com/EntilZha | LinkedIn: 
>> https://www.linkedin.com/in/pedrorodriguezscience
> 
> 
> 
> -- 
> ---
> Takeshi Yamamuro


Re: Large files with wholetextfile()

2016-07-12 Thread Hyukjin Kwon
Otherwise, please consider using https://github.com/databricks/spark-xml.

Actually, there is a function to find the input file name, which is..

input_file_name function,
https://github.com/apache/spark/blob/5f342049cce9102fb62b4de2d8d8fa691c2e8ac4/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L948

This is available from 1.6.0

Please refer , https://github.com/apache/spark/pull/13806 and
https://github.com/apache/spark/pull/13759
​




2016-07-12 22:04 GMT+09:00 Prashant Sharma :

> Hi Baahu,
>
> That should not be a problem, given you allocate sufficient buffer for
> reading.
>
> I was just working on implementing a patch[1] to support the feature for
> reading wholetextfiles in SQL. This can actually be slightly better
> approach, because here we read to offheap memory for holding data(using
> unsafe interface).
>
> 1. https://github.com/apache/spark/pull/14151
>
> Thanks,
>
>
>
> --Prashant
>
>
> On Tue, Jul 12, 2016 at 6:24 PM, Bahubali Jain  wrote:
>
>> Hi,
>> We have a requirement where in we need to process set of xml files, each
>> of the xml files contain several records (eg:
>> 
>>  data of record 1..
>> 
>>
>> 
>> data of record 2..
>> 
>>
>> Expected output is   
>>
>> Since we needed file name as well in output ,we chose wholetextfile() .
>> We had to go against using StreamXmlRecordReader and StreamInputFormat
>> since I could not find a way to retreive the filename.
>>
>> These xml files could be pretty big, occasionally they could reach a size
>> of 1GB.Since contents of each file would be put into a single partition,would
>> such big files be a issue ?
>> The AWS cluster(50 Nodes) that we use is fairly strong , with each
>> machine having memory of around 60GB.
>>
>> Thanks,
>> Baahu
>>
>
>


?????? Spark hangs at "Removed broadcast_*"

2016-07-12 Thread Sea
please provide your jstack info.




--  --
??: "dhruve ashar";;
: 2016??7??13??(??) 3:53
??: "Anton Sviridov"; 
: "user"; 
: Re: Spark hangs at "Removed broadcast_*"



Looking at the jstack, it seems that it doesn't contain all the threads. Cannot 
find the main thread in the jstack.

I am not an expert on analyzing jstacks, but are you creating any threads in 
your code? Shutting them down correctly?


This one is a non-daemon and doesn't seem to be coming from Spark. 
"Scheduler-2144644334" #110 prio=5 os_prio=0 tid=0x7f8104001800 nid=0x715 
waiting on condition [0x7f812cf95000]



Also, does the shutdown hook get called? 




On Tue, Jul 12, 2016 at 2:35 AM, Anton Sviridov  wrote:
Hi.

Here's the last few lines before it starts removing broadcasts:


16/07/11 14:02:11 INFO FileOutputCommitter: Saved output of task 
'attempt_20160723_0009_m_003209_20886' to 
file:/mnt/rendang/cache-main/RunWikistatsSFCounts727fc9d635f25d0922984e59a0d18fdd/stats/sf_counts/_temporary/0/task_20160723_0009_m_003209
16/07/11 14:02:11 INFO SparkHadoopMapRedUtil: 
attempt_20160723_0009_m_003209_20886: Committed
16/07/11 14:02:11 INFO TaskSetManager: Finished task 3211.0 in stage 9.0 (TID 
20888) in 95 ms on localhost (3209/3214)
16/07/11 14:02:11 INFO Executor: Finished task 3209.0 in stage 9.0 (TID 20886). 
1721 bytes result sent to driver
16/07/11 14:02:11 INFO TaskSetManager: Finished task 3209.0 in stage 9.0 (TID 
20886) in 103 ms on localhost (3210/3214)
16/07/11 14:02:11 INFO FileOutputCommitter: Saved output of task 
'attempt_20160723_0009_m_003208_20885' to 
file:/mnt/rendang/cache-main/RunWikistatsSFCounts727fc9d635f25d0922984e59a0d18fdd/stats/sf_counts/_temporary/0/task_20160723_0009_m_003208
16/07/11 14:02:11 INFO SparkHadoopMapRedUtil: 
attempt_20160723_0009_m_003208_20885: Committed
16/07/11 14:02:11 INFO Executor: Finished task 3208.0 in stage 9.0 (TID 20885). 
1721 bytes result sent to driver
16/07/11 14:02:11 INFO TaskSetManager: Finished task 3208.0 in stage 9.0 (TID 
20885) in 109 ms on localhost (3211/3214)
16/07/11 14:02:11 INFO FileOutputCommitter: Saved output of task 
'attempt_20160723_0009_m_003212_20889' to 
file:/mnt/rendang/cache-main/RunWikistatsSFCounts727fc9d635f25d0922984e59a0d18fdd/stats/sf_counts/_temporary/0/task_20160723_0009_m_003212
16/07/11 14:02:11 INFO SparkHadoopMapRedUtil: 
attempt_20160723_0009_m_003212_20889: Committed
16/07/11 14:02:11 INFO Executor: Finished task 3212.0 in stage 9.0 (TID 20889). 
1721 bytes result sent to driver
16/07/11 14:02:11 INFO TaskSetManager: Finished task 3212.0 in stage 9.0 (TID 
20889) in 84 ms on localhost (3212/3214)
16/07/11 14:02:11 INFO FileOutputCommitter: Saved output of task 
'attempt_20160723_0009_m_003210_20887' to 
file:/mnt/rendang/cache-main/RunWikistatsSFCounts727fc9d635f25d0922984e59a0d18fdd/stats/sf_counts/_temporary/0/task_20160723_0009_m_003210
16/07/11 14:02:11 INFO SparkHadoopMapRedUtil: 
attempt_20160723_0009_m_003210_20887: Committed
16/07/11 14:02:11 INFO Executor: Finished task 3210.0 in stage 9.0 (TID 20887). 
1721 bytes result sent to driver
16/07/11 14:02:11 INFO TaskSetManager: Finished task 3210.0 in stage 9.0 (TID 
20887) in 100 ms on localhost (3213/3214)
16/07/11 14:02:11 INFO FileOutputCommitter: File Output Committer Algorithm 
version is 1
16/07/11 14:02:11 INFO FileOutputCommitter: Saved output of task 
'attempt_20160723_0009_m_003213_20890' to 
file:/mnt/rendang/cache-main/RunWikistatsSFCounts727fc9d635f25d0922984e59a0d18fdd/stats/sf_counts/_temporary/0/task_20160723_0009_m_003213
16/07/11 14:02:11 INFO SparkHadoopMapRedUtil: 
attempt_20160723_0009_m_003213_20890: Committed
16/07/11 14:02:11 INFO Executor: Finished task 3213.0 in stage 9.0 (TID 20890). 
1721 bytes result sent to driver
16/07/11 14:02:11 INFO TaskSetManager: Finished task 3213.0 in stage 9.0 (TID 
20890) in 82 ms on localhost (3214/3214)
16/07/11 14:02:11 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks have 
all completed, from pool
16/07/11 14:02:11 INFO DAGScheduler: ResultStage 9 (saveAsTextFile at 
SfCountsDumper.scala:13) finished in 42.294 s
16/07/11 14:02:11 INFO DAGScheduler: Job 1 finished: saveAsTextFile at 
SfCountsDumper.scala:13, took 9517.124624 s
16/07/11 14:28:46 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 
10.101.230.154:35192 in memory (size: 15.8 KB, free: 37.1 GB)
16/07/11 14:28:46 INFO ContextCleaner: Cleaned shuffle 7
16/07/11 14:28:46 INFO ContextCleaner: Cleaned shuffle 6
16/07/11 14:28:46 INFO ContextCleaner: Cleaned shuffle 5
16/07/11 14:28:46 INFO ContextCleaner: Cleaned shuffle 4
16/07/11 14:28:46 INFO ContextCleaner: Cleaned shuffle 3
16/07/11 14:28:46 INFO ContextCleaner: Cleaned shuffle 2
16/07/11 14:28:46 INFO ContextCleaner: Cleaned shuffle 1
16/07/11 14:28:46 INFO BlockManager: Removing RDD 14
16/07/11 14:28:46 INFO ContextCleaner: Cleaned RDD 14
16/07/11 14:28:4

Inode for STS

2016-07-12 Thread ayan guha
Hi

We are running Spark Thrift Server as a long running application. However,
 it looks like it is filling up /tmp/hive folder with lots of small files
and directories with no file in them, blowing out inode limit and
preventing any connection with "No Space Left in Device" issue.

What is the best way to clean up those small files periodically?

-- 
Best Regards,
Ayan Guha


Re: Spark cache behaviour when the source table is modified

2016-07-12 Thread Chanh Le
Hi Anjali,
The Cached is immutable you can’t update data into. 
They way to update cache is re-create cache.


> On Jun 16, 2016, at 4:24 PM, Anjali Chadha  wrote:
> 
> Hi all,
> 
> I am having a hard time understanding the caching concepts in Spark.
> 
> I have a hive table("person"), which is cached in Spark.
> 
> sqlContext.sql("create table person (name string, age int)") //Create a new 
> table
> //Add some values to the table
> ...
> ...
> //Cache the table in Spark
> sqlContext.cacheTable("person") 
> sqlContext.isCached("person") //Returns true
> sqlContext.sql("insert into table person values ("Foo", 25)") // Insert some 
> other value in the table
> 
> //Check caching status again
> sqlContext.isCached("person") //Returns true
> sqlContext is HiveContext.
> 
> Will the entries inserted after cacheTable("person") statement be cached? In 
> other words, ("Foo", 25) entry is cached in Spark or not?
> 
> If not, how can I cache only the entries inserted later? I don't want to 
> first uncache and then again cache the whole table.
> 
> Any relevant web link or information will be appreciated.
> 
> - Anjali Chadha
> 



Re: Tools for Balancing Partitions by Size

2016-07-12 Thread Pedro Rodriguez
The primary goal for balancing partitions would be for the write to S3. We 
would like to prevent unbalanced partitions (can do with repartition), but also 
avoid partitions that are too small or too large.

So for that case, getting the cache size would work Maropu if its roughly 
accurate, but for data ingest we aren’t caching, just writing straight through 
to S3.

The idea for writing to disk and checking for the size is interesting Hatim. 
For certain jobs, it seems very doable to write a small percentage of the data 
to S3, check the file size through the AWS API, and use that to estimate the 
total size. Thanks for the idea.

—
Pedro Rodriguez
PhD Student in Large-Scale Machine Learning | CU Boulder
Systems Oriented Data Scientist
UC Berkeley AMPLab Alumni

pedrorodriguez.io | 909-353-4423
github.com/EntilZha | LinkedIn

On July 12, 2016 at 7:26:17 PM, Hatim Diab (timd...@gmail.com) wrote:

Hi,

Since the final size depends on data types and compression. I've had to first 
get a rough estimate of data, written to disk, then compute the number of 
partitions.

partitions = int(ceil(size_data * conversion_ratio / block_size))

In my case block size 256mb, source txt & dest is snappy parquet, 
compression_ratio .6

df.repartition(partitions).write.parquet(output)

Which yields files in the range of 230mb.

Another way was to count and come up with an imperial formula.

Cheers,
Hatim


On Jul 12, 2016, at 9:07 PM, Takeshi Yamamuro  wrote:

Hi,

There is no simple way to access the size in a driver side.
Since the partitions of primitive typed data (e.g., int) are compressed by 
`DataFrame#cache`,
the actual size is possibly a little bit different from processing partitions 
size.

// maropu

On Wed, Jul 13, 2016 at 4:53 AM, Pedro Rodriguez  
wrote:
Hi,

Are there any tools for partitioning RDD/DataFrames by size at runtime? The 
idea would be to specify that I would like for each partition to be roughly X 
number of megabytes then write that through to S3. I haven't found anything off 
the shelf, and looking through stack overflow posts doesn't seem to yield 
anything concrete.

Is there a way to programmatically get the size or a size estimate for an 
RDD/DataFrame at runtime (eg size of one record would be sufficient)? I gave 
SizeEstimator a try, but it seems like the results varied quite a bit (tried on 
whole RDD and a sample). It would also be useful to get programmatic access to 
the size of the RDD in memory if it is cached.

Thanks,
--
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn: 
https://www.linkedin.com/in/pedrorodriguezscience




--
---
Takeshi Yamamuro


Spark Streaming: Refreshing broadcast value after each batch

2016-07-12 Thread Daniel Haviv
Hi,
I have a streaming application which uses a broadcast variable which I
populate from a database.
I would like every once in a while (or even every batch) to update/replace
the broadcast variable with the latest data from the database.

Only way I found online to do this is this "hackish" way (
http://stackoverflow.com/questions/28573816/periodic-broadcast-in-apache-spark-streaming)
which I'm not sure gets re-executed per batch anyway:

val broadcastFactory = new TorrentBroadcastFactory()
broadcastFactory.unbroadcast(BroadcastId, true, true)
// append some ids to initIds
val broadcastcontent =
broadcastFactory.newBroadcast[.Set[String]](initIds, false,
BroadcastId)


Is there a proper way to do that?

Thank you,
Daniel


Re: How to run Zeppelin and Spark Thrift Server Together

2016-07-12 Thread Chanh Le
Hi Ayan,
Thank you for replying. 
But I wanna create a table in Zeppelin and store the metadata in Alluxio like I 
tried to do set hive.metastore.warehouse.dir=alluxio://master1:19998/metadb 
   <>So I can share data with STS.

The way you’ve mentioned through JDBC I already did and it works but I can’t 
create table in Spark way easily.

Regards,
Chanh


> On Jul 13, 2016, at 12:06 PM, ayan guha  wrote:
> 
> HI
> 
> I quickly tried with available hive interpreter 
> 
> 
> 
> Please try similarly. 
> 
> I will try with jdbc interpreter but I need to add it to zeppelin :)
> 
> Best
> Ayan
> 
> On Wed, Jul 13, 2016 at 1:53 PM, Chanh Le  > wrote:
> Hi Ayan,
> How to set hive metastore in Zeppelin. I tried but not success.
> The way I do I add into Spark Interpreter
> 
> 
> 
> And also try in a notebook by 
> %sql
> set hive.metastore.metadb.dir=alluxio://master1:19998/metadb <>
> 
> %sql 
> set hive.metastore.warehouse.dir=alluxio://master1:19998/metadb <>
> 
> %spark
> sqlContext.setConf("hive.metastore.warehouse.dir", 
> "alluxio://master1:19998/metadb <>")
> sqlContext.setConf("hive.metastore.metadb.dir", 
> "alluxio://master1:19998/metadb <>")
> sqlContext.read.parquet("alluxio://master1:19998/etl_info/WEBSITE 
> <>").saveAsTable("tests_5”)
> 
> But It’s 
> 
>> On Jul 11, 2016, at 1:26 PM, ayan guha > > wrote:
>> 
>> Hi
>> 
>> When you say "Zeppelin and STS", I am assuming you mean "Spark Interpreter" 
>> and "JDBC interpreter" respectively. 
>> 
>> Through Zeppelin, you can either run your own spark application (by using 
>> Zeppelin's own spark context) using spark interpreter OR you can access STS, 
>> which  is a spark application ie separate Spark Context using JDBC 
>> interpreter. There should not be any need for these 2 contexts to coexist. 
>> 
>> If you want to share data, save it to hive from either context, and you 
>> should be able to see the data from other context. 
>> 
>> Best
>> Ayan
>> 
>> 
>> 
>> On Mon, Jul 11, 2016 at 3:00 PM, Chanh Le > > wrote:
>> Hi Ayan,
>> I tested It works fine but one more confuse is If my (technical) users want 
>> to write some code in zeppelin to apply thing into Hive table? 
>> Zeppelin and STS can’t share Spark Context that mean we need separated 
>> process? Is there anyway to use the same Spark Context of STS?
>> 
>> Regards,
>> Chanh
>> 
>> 
>>> On Jul 11, 2016, at 10:05 AM, Takeshi Yamamuro >> > wrote:
>>> 
>>> Hi,
>>> 
>>> ISTM multiple sparkcontexts are not recommended in spark.
>>> See: https://issues.apache.org/jira/browse/SPARK-2243 
>>> 
>>> 
>>> // maropu
>>> 
>>> 
>>> On Mon, Jul 11, 2016 at 12:01 PM, ayan guha >> > wrote:
>>> Hi
>>> 
>>> Can you try using JDBC interpreter with STS? We are using Zeppelin+STS on 
>>> YARN for few months now without much issue. 
>>> 
>>> On Mon, Jul 11, 2016 at 12:48 PM, Chanh Le >> > wrote:
>>> Hi everybody,
>>> We are using Spark to query big data and currently we’re using Zeppelin to 
>>> provide a UI for technical users.
>>> Now we also need to provide a UI for business users so we use Oracle BI 
>>> tools and set up a Spark Thrift Server (STS) for it.
>>> 
>>> When I run both Zeppelin and STS throw error:
>>> 
>>> INFO [2016-07-11 09:40:21,905] ({pool-2-thread-4} 
>>> SchedulerFactory.java[jobStarted]:131) - Job 
>>> remoteInterpretJob_1468204821905 started by scheduler 
>>> org.apache.zeppelin.spark.SparkInterpreter835015739
>>>  INFO [2016-07-11 09:40:21,911] ({pool-2-thread-4} 
>>> Logging.scala[logInfo]:58) - Changing view acls to: giaosudau
>>>  INFO [2016-07-11 09:40:21,912] ({pool-2-thread-4} 
>>> Logging.scala[logInfo]:58) - Changing modify acls to: giaosudau
>>>  INFO [2016-07-11 09:40:21,912] ({pool-2-thread-4} 
>>> Logging.scala[logInfo]:58) - SecurityManager: authentication disabled; ui 
>>> acls disabled; users with view permissions: Set(giaosudau); users with 
>>> modify permissions: Set(giaosudau)
>>>  INFO [2016-07-11 09:40:21,918] ({pool-2-thread-4} 
>>> Logging.scala[logInfo]:58) - Starting HTTP Server
>>>  INFO [2016-07-11 09:40:21,919] ({pool-2-thread-4} 
>>> Server.java[doStart]:272) - jetty-8.y.z-SNAPSHOT
>>>  INFO [2016-07-11 09:40:21,920] ({pool-2-thread-4} 
>>> AbstractConnector.java[doStart]:338) - Started 
>>> SocketConnector@0.0.0.0:54818 
>>>  INFO [2016-07-11 09:40:21,922] ({pool-2-thread-4} 
>>> Logging.scala[logInfo]:58) - Successfully started service 'HTTP class 
>>> server' on port 54818.
>>>  INFO [2016-07-11 09:40:22,408] ({pool-2-thread-4} 
>>> SparkInterpreter.java[createSparkContext]:233) - -- Create new 
>>> SparkContext local[*] ---
>>>  WARN [2016-07-11 09:40:22,411] ({pool-2-thread-4} 
>>> Logging.scala[logWarning]:70) - Another SparkContext is being constructed 
>>> (or threw an exception in its constr

Any Idea about this error : IllegalArgumentException: File segment length cannot be negative ?

2016-07-12 Thread Dibyendu Bhattacharya
In Spark Streaming job, I see a Batch failed with following error. Haven't
seen anything like this earlier.

This has happened during Shuffle for one Batch (haven't reoccurred after
that).. Just curious to know what can cause this error. I am running Spark
1.5.1

Regards,
Dibyendu


Job aborted due to stage failure: Task 2801 in stage 9421.0 failed 4
times, most recent failure: Lost task 2801.3 in stage 9421.0:
java.lang.IllegalArgumentException: requirement failed: File segment
length cannot be negative (got -68321)
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.storage.FileSegment.(FileSegment.scala:28)
at 
org.apache.spark.storage.DiskBlockObjectWriter.fileSegment(DiskBlockObjectWriter.scala:216)
at 
org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:684)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)