Re: repartition(n) should be deprecated/alerted

2022-06-22 Thread Igor Berman
I'd argue it's strange and unexpected.
I understand there is precision issues here, but I'm fine that result might
be slightly different each time for the specific column
What I'm not expecting(as end user for sure) is that presumably trivial
computation might under retries scenarios cause few hundreds rows to be
duplicated and same amount to be dropped(since one precision shift, might
shift few hundreds of rows in local sort done by repartiton(n))
Maybe what I'm trying to say is that repartition documentation is not
hinting in any way that this might happen and maybe it should.

* I'm aware of coalesce, but it has its own problems due to influence on
parallelism of all the transformations/filters up to last shuffle/exchange





On Wed, 22 Jun 2022 at 20:43, Sean Owen  wrote:

> Eh, there is a huge caveat - you are making your input non-deterministic,
> where determinism is assumed. I don't think that supports such a drastic
> statement.
>
> On Wed, Jun 22, 2022 at 12:39 PM Igor Berman 
> wrote:
>
>> Hi All
>> tldr; IMHO repartition(n) should be deprecated or red-flagged, so that
>> everybody will understand consequences of usage of this method
>>
>> Following conversation in
>> https://issues.apache.org/jira/browse/SPARK-38388 (still relevant for
>> recent versions of spark) I think it's very important to mark this function
>> somehow and to alert end-user about consequences of such usage
>>
>> Basically it may produce duplicates and data loss under retries for
>> several kinds of input: among them non-deterministic input, but more
>> importantly input that deterministic but might produce not exactly same
>> results due to precision of doubles(and floats) in very simple queries like
>> following
>>
>> sqlContext.sql(
>> " SELECT integerColumn, SUM(someDoubleTypeValue) AS value
>>   FROM data
>>   GROUP BY integerColumn "
>> ).repartition(3)
>>
>> (see comment from Tom in ticket)
>>
>> As an end-user I'd expect the retries mechanism to work in a consistent
>> way and not to drop data silently(neither to produce duplicates)
>>
>> Any thoughts?
>> thanks in advance
>> Igor
>>
>>


repartition(n) should be deprecated/alerted

2022-06-22 Thread Igor Berman
Hi All
tldr; IMHO repartition(n) should be deprecated or red-flagged, so that
everybody will understand consequences of usage of this method

Following conversation in https://issues.apache.org/jira/browse/SPARK-38388
(still relevant for recent versions of spark) I think it's very important
to mark this function somehow and to alert end-user about consequences of
such usage

Basically it may produce duplicates and data loss under retries for several
kinds of input: among them non-deterministic input, but more importantly
input that deterministic but might produce not exactly same results due to
precision of doubles(and floats) in very simple queries like following

sqlContext.sql(
" SELECT integerColumn, SUM(someDoubleTypeValue) AS value
  FROM data
  GROUP BY integerColumn "
).repartition(3)

(see comment from Tom in ticket)

As an end-user I'd expect the retries mechanism to work in a consistent way
and not to drop data silently(neither to produce duplicates)

Any thoughts?
thanks in advance
Igor


Initial job has not accepted any resources

2017-01-04 Thread Igor Berman
Hi All,
need your advice:
we see in some very rare cases following error in log
Initial job has not accepted any resources; check your cluster UI to ensure
that workers are registered and have sufficient resources

and in spark UI there are idle workers and application in WAITING state

in json endpoint I see

"cores" : 280,
  "coresused" : 0,
  "memory" : 2006561,
  "memoryused" : 0,
  "activeapps" : [ {
"starttime" : 1483534808858,
"id" : "app-20170104130008-0181",
"name" : "our name",
"cores" : -1,
"user" : "spark",
"memoryperslave" : 31744,
"submitdate" : "Wed Jan 04 13:00:08 UTC 2017",
"state" : "WAITING",
"duration" : 6568575
  } ],


when I kill the application and restart it - everything works fine,
ie. it's not an issue that some workers are not properly connected,
workers are there, and usually work fine

Is there some way to handle this? Maybe some timeout on this WAITING
state, so that it will exit automatically, because currently it might
be "WAITING" indefinitely...

I've thought of implementing periodic check(by calling rest api /json)
that will kill application when waiting time > 10-15 mins for some
activeapp

any advice will be appreciated,

thanks in advance

Igor


Re: need help to have a Java version of this scala script

2016-12-17 Thread Igor Berman
do you mind to show what you have in java?
in general $"bla" is col("bla") as soon as you import appropriate function
import static org.apache.spark.sql.functions.callUDF;
import static org.apache.spark.sql.functions.col;
udf should be callUDF e.g.
ds.withColumn("localMonth", callUDF("toLocalMonth", col("unixTs"),
col("tz")))

On 17 December 2016 at 09:54, Richard Xin 
wrote:

> what I am trying to do:
> I need to add column (could be complicated transformation based on value
> of a column) to a give dataframe.
>
> scala script:
> val hContext = new HiveContext(sc)
> import hContext.implicits._
> val df = hContext.sql("select x,y,cluster_no from test.dc")
> val len = udf((str: String) => str.length)
> val twice = udf { (x: Int) => println(s"Computed: twice($x)"); x * 2 }
> val triple = udf { (x: Int) => println(s"Computed: triple($x)"); x * 3}
> val df1 = df.withColumn("name-len", len($"x"))
> val df2 = df1.withColumn("twice", twice($"cluster_no"))
> val df3 = df2.withColumn("triple", triple($"cluster_no"))
>
> The scala script above seems to work ok, but I am having trouble to do it
> Java way (note that transformation based on value of a column could be
> complicated, not limited to simple add/minus etc.). is there a way in java?
> Thanks.
>


Re: S3 DirectParquetOutputCommitter + PartitionBy + SaveMode.Append

2016-10-01 Thread Igor Berman
Takeshi, why are you saying this, how have you checked it's only used from
2.7.3?
We use spark 2.0 which is shipped with hadoop dependency of 2.7.2 and we
use this setting.
We've sort of "verified" it's used by configuring log of file output
commiter

On 30 September 2016 at 03:12, Takeshi Yamamuro 
wrote:

> Hi,
>
> FYI: Seems 
> `sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version","2”)`
> is only available at hadoop-2.7.3+.
>
> // maropu
>
>
> On Thu, Sep 29, 2016 at 9:28 PM, joffe.tal  wrote:
>
>> You can use partition explicitly by adding "/=> value>" to
>> the end of the path you are writing to and then use overwrite.
>>
>> BTW in Spark 2.0 you just need to use:
>>
>> sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.al
>> gorithm.version","2”)
>> and use s3a://
>>
>> and you can work with regular output committer (actually
>> DirectParquetOutputCommitter is no longer available in Spark 2.0)
>>
>> so if you are planning on upgrading this could be another motivation
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/S3-DirectParquetOutputCommitter-Partit
>> ionBy-SaveMode-Append-tp26398p27810.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Dataset doesn't have partitioner after a repartition on one of the columns

2016-09-28 Thread Igor Berman
Michael, can you explain please why bucketBy is supported when using
writeAsTable() to parquet by not with parquet()
Is it only difference between table api and dataframe/dataset api? or there
are some other?

org.apache.spark.sql.AnalysisException: 'save' does not support bucketing
right now;
at
org.apache.spark.sql.DataFrameWriter.assertNotBucketed(DataFrameWriter.scala:310)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:203)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:194)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:478)


thanks in advance


On 28 September 2016 at 21:26, Michael Armbrust 
wrote:

> Hi Darin,
>
> In SQL we have finer grained information about partitioning, so we don't
> use the RDD Partitioner.  Here's a notebook
> that
> walks through what we do expose and how it is used by the query planner.
>
> Michael
>
> On Tue, Sep 20, 2016 at 11:22 AM, McBeath, Darin W (ELS-STL) <
> d.mcbe...@elsevier.com> wrote:
>
>> I’m using Spark 2.0.
>>
>> I’ve created a dataset from a parquet file and repartition on one of the
>> columns (docId) and persist the repartitioned dataset.
>>
>> val om = ds.repartition($"docId”).persist(StorageLevel.MEMORY_AND_DISK)
>>
>> When I try to confirm the partitioner, with
>>
>> om.rdd.partitioner
>>
>> I get
>>
>> Option[org.apache.spark.Partitioner] = None
>>
>> I would have thought it would be HashPartitioner.
>>
>> Does anyone know why this would be None and not HashPartitioner?
>>
>> Thanks.
>>
>> Darin.
>>
>>
>>
>


Re: Missing output partition file in S3

2016-09-16 Thread Igor Berman
are you using speculation?

On 15 September 2016 at 21:37, Chen, Kevin  wrote:

> Hi,
>
> Has any one encountered an issue of missing output partition file in S3 ?
> My spark job writes output to a S3 location. Occasionally, I noticed one
> partition file is missing. As a result, one chunk of data was lost. If I
> rerun the same job, the problem usually goes away. This has been happening
> pretty random. I observed once or twice a week on a daily run job. I am
> using Spark 1.2.1.
>
> Very much appreciated on any input, suggestion of fix/workaround.
>
>
>
>


Re: Using spark to distribute jobs to standalone servers

2016-08-25 Thread Igor Berman
imho, you'll need to implement custom rdd with your locality settings(i.e.
custom implementation of discovering where each partition is located) +
setting for spark.locality.wait

On 24 August 2016 at 03:48, Mohit Jaggi  wrote:

> It is a bit hacky but possible. A lot depends on what kind of queries etc
> you want to run. You could write a data source that reads your data and
> keeps it partitioned the way you want, then use mapPartitions() to execute
> your code…
>
>
> Mohit Jaggi
> Founder,
> Data Orchard LLC
> www.dataorchardllc.com
>
>
>
>
> On Aug 22, 2016, at 7:59 AM, Larry White  wrote:
>
> Hi,
>
> I have a bit of an unusual use-case and would *greatly* *appreciate* some
> feedback as to whether it is a good fit for spark.
>
> I have a network of compute/data servers configured as a tree as shown
> below
>
>- controller
>- server 1
>   - server 2
>   - server 3
>   - etc.
>
> There are ~20 servers, but the number is increasing to ~100.
>
> Each server contains a different dataset, all in the same format. Each is
> hosted by a different organization, and the data on every individual server
> is unique to that organization.
>
> Data *cannot* be replicated across servers using RDDs or any other means,
> for privacy/ownership reasons.
>
> Data *cannot* be retrieved to the controller, except in aggregate form,
> as the result of a query, for example.
>
> Because of this, there are currently no operations that treats the data as
> if it were a single data set: We could run a classifier on each site
> individually, but cannot for legal reasons, pull all the data into a single
> *physical* dataframe to run the classifier on all of it together.
>
> The servers are located across a wide geographic region (1,000s of miles)
>
> We would like to send jobs from the controller to be executed in parallel
> on all the servers, and retrieve the results to the controller. The jobs
> would consist of SQL-Heavy Java code for 'production' queries, and python
> or R code for ad-hoc queries and predictive modeling.
>
> Spark seems to have the capability to meet many of the individual
> requirements, but is it a reasonable platform overall for building this
> application?
>
> Thank you very much for your assistance.
>
> Larry
>
>
>
>


Re: spark worker continuously trying to connect to master and failed in standalone mode

2016-07-20 Thread Igor Berman
in addition check what ip the master is binding to(with nestat)

On 20 July 2016 at 06:12, Andrew Ehrlich  wrote:

> Troubleshooting steps:
>
> $ telnet localhost 7077 (on master, to confirm port is open)
> $ telnet  7077 (on slave, to confirm port is blocked)
>
> If the port is available on the master from the master, but not on the
> master from the slave, check firewall settings on the master:
> https://help.ubuntu.com/lts/serverguide/firewall.html
>
> On Jul 19, 2016, at 6:25 PM, Neil Chang  wrote:
>
> Hi,
>   I have two virtual pcs on private cloud (ubuntu 14). I installed spark
> 2.0 preview on both machines. I then tried to test it with standalone mode.
> I have no problem start the master. However, when I start the worker
> (slave) on another machine, it makes many attempts to connect to master and
> failed at the end.
>   I can ssh from each machine to another without any problem. I can also
> run a master and worker at the same machine without any problem.
>
> What did I miss? Any clue?
>
> here are the messages:
>
> WARN NativeCodeLoader: Unable to load native-hadoop library for your
> platform ... using builtin-java classes where applicable
> ..
> INFO Worker: Connecting to master ip:7077 ...
> INFO Worker: Retrying connection to master (attempt #1)
> ..
> INFO Worker: Retrying connection to master (attempt #7)
> java.lang.IllegalArgumentException: requirement failed: TransportClient
> has not yet been set.
>at scala.Predef$.require(Predef.scala:224)
> ...
> WARN NettyRocEnv: Ignored failure: java.io.IOException: Connecting to
> ip:7077 timed out
> WARN Worker: Failed to connect to master ip.7077
>
>
>
> Thanks,
> Neil
>
>
>


streaming new data into bigger parquet file

2016-07-06 Thread Igor Berman
Hi
I was reading following tutorial
https://docs.cloud.databricks.com/docs/latest/databricks_guide/07%20Spark%20Streaming/08%20Write%20Output%20To%20S3.html


of streaming data to s3 of databricks_guide
and it states that sometimes I need to do compaction of small files(e.g.
from spark streaming) into compacted big file(I understand why - better
read performance, to solve "many small files" problem etc)

My questions are:
1. what happens when I have big parquet file partitioned by some field and
I want to append new small files into this big file? Is spark overrides
whole data or it can append the new data at the end?
2. while appending process happens - how can I ensure that readers of big
parquet files are not blocked and won't get any errors?(i.e. are files are
"available" when appending new data to them?)

I will highly appreciate any pointers

thanks in advance,
Igor


Re: Spark streaming readind avro from kafka

2016-06-01 Thread Igor Berman
Avro file contains metadata with schema(writer schema)
in Kafka there is no such thing, you should put message that will contain
some reference to known schema(put whole schema will have big overhead)
some people use schema registry solution

On 1 June 2016 at 21:02, justneeraj  wrote:

> +1
>
> I am trying to read avro from kafka and I don't want to limit to a small
> set
> of schema. So I want to dynamically load the schema from avro file (as avro
> contains schema as well). And then from this I want to create a dataframe
> and run some queries on that.
>
> Any help would be really thankful.
>
> Thanks,
> Neeraj
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-readind-avro-from-kafka-tp22425p27067.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


different SqlContext with same udf name with different meaning

2016-05-08 Thread Igor Berman
Hi,
suppose I have multitenant environment and I want to give my users
additional functions
but for each user/tenant the meaning of same function is dependent on
user's specific configuration

is it possible to register same function several times under different
SqlContexts?
are several SqlContexts can live together?


thanks in advance



something like :

SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc)

sqlContext.udf().register("my_function", func1...);


SQLContext sqlContext2 = new org.apache.spark.sql.SQLContext(sc)

sqlContext2.udf().register("my_function", func2...);


Re: Apache Flink

2016-04-17 Thread Igor Berman
latency in Flink is not eliminated, but it might be smaller since Flink
process each event 1-by-1 while Spark does microbatching(so you can't
achieve latency lesser than your microbatch config)
probably Spark will have better throughput due to this microbatching



On 17 April 2016 at 14:47, Ovidiu-Cristian MARCU <
ovidiu-cristian.ma...@inria.fr> wrote:

> You probably read this benchmark at Yahoo, any comments from Spark?
>
> https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at
>
>
> On 17 Apr 2016, at 12:41, andy petrella  wrote:
>
> Just adding one thing to the mix: `that the latency for streaming data is
> eliminated` is insane :-D
>
> On Sun, Apr 17, 2016 at 12:19 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>>  It seems that Flink argues that the latency for streaming data is
>> eliminated whereas with Spark RDD there is this latency.
>>
>> I noticed that Flink does not support interactive shell much like Spark
>> shell where you can add jars to it to do kafka testing. The advice was to
>> add the streaming Kafka jar file to CLASSPATH but that does not work.
>>
>> Most Flink documentation also rather sparce with the usual example of
>> word count which is not exactly what you want.
>>
>> Anyway I will have a look at it further. I have a Spark Scala streaming
>> Kafka program that works fine in Spark and I want to recode it using Scala
>> for Flink with Kafka but have difficulty importing and testing libraries.
>>
>> Cheers
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 17 April 2016 at 02:41, Ascot Moss  wrote:
>>
>>> I compared both last month, seems to me that Flink's MLLib is not yet
>>> ready.
>>>
>>> On Sun, Apr 17, 2016 at 12:23 AM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 Thanks Ted. I was wondering if someone is using both :)

 Dr Mich Talebzadeh


 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *


 http://talebzadehmich.wordpress.com



 On 16 April 2016 at 17:08, Ted Yu  wrote:

> Looks like this question is more relevant on flink mailing list :-)
>
> On Sat, Apr 16, 2016 at 8:52 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi,
>>
>> Has anyone used Apache Flink instead of Spark by any chance
>>
>> I am interested in its set of libraries for Complex Event Processing.
>>
>> Frankly I don't know if it offers far more than Spark offers.
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>
>

>>>
>> --
> andy
>
>
>


Re: streaming app performance when would increasing execution size or adding more cores

2016-03-07 Thread Igor Berman
may be you are experiencing problem with FileOutputCommiter vs
DirectCommiter while working with s3? do you have hdfs so you can try it to
verify?

committing in s3 will copy 1-by-1 all partitions to your final destination
bucket from _temporary, so this stage might become a bottleneck(so reducing
number of partitions might "solve" it)
there is a thread in this mailing list regarding this problem few weeks ago



On 7 March 2016 at 23:53, Andy Davidson 
wrote:

> We just deployed our first streaming apps. The next step is running them
> so they run reliably
>
> We have spend a lot of time reading the various prog guides looking at the
> standalone cluster manager app performance web pages.
>
> Looking at the streaming tab and the stages tab have been the most helpful
> in tuning our app. However we do not understand the connection between
> memory  and # cores will effect throughput and performance. Usually
> adding memory is the cheapest way to improve performance.
>
> When we have a single receiver call spark-submit --total-executor-cores
> 2. Changing the value does not seem to change throughput. our bottle neck
> was s3 write time, saveAsTextFile(). Reducing the number of partitions
> dramatically reduces s3 write times.
>
> Adding memory also does not improve performance
>
> *I would think that adding more cores would allow more concurrent tasks
> run. That is to say reducing num partions would slow things down*
>
> What are best practices?
>
> Kind regards
>
> Andy
>
>
>
>
>
>
>


Re: S3 DirectParquetOutputCommitter + PartitionBy + SaveMode.Append

2016-03-05 Thread Igor Berman
it's not safe to use direct committer with append mode, you may loose your
data..

On 4 March 2016 at 22:59, Jelez Raditchkov  wrote:

> Working on a streaming job with DirectParquetOutputCommitter to S3
> I need to use PartitionBy and hence SaveMode.Append
>
> Apparently when using SaveMode.Append spark automatically defaults to the
> default parquet output committer and ignores DirectParquetOutputCommitter.
>
> My problems are:
> 1. the copying to _temporary takes alot of time
> 2. I get job failures with: java.io.FileNotFoundException: File
> s3n://jelez/parquet-data/_temporary/0/task_201603040904_0544_m_07 does
> not exist.
>
> I have set:
> sparkConfig.set("spark.speculation", "false")
> sc.hadoopConfiguration.set("mapreduce.map.speculative", "false")
> sc.hadoopConfiguration.set("mapreduce.reduce.speculative",
> "false")
>
> Any ideas? Opinions? Best practices?
>
>


Re: Avro SerDe Issue w/ Manual Partitions?

2016-03-03 Thread Igor Berman
your field name is
*enum1_values*

but you have data
{ "foo1": "test123", *"enum1"*: "BLUE" }

i.e. since you defined enum and not union(null, enum)
it tries to find value for enum1_values and doesn't find one...

On 3 March 2016 at 11:30, Chris Miller  wrote:

> I've been digging into this a little deeper. Here's what I've found:
>
> test1.avsc:
> 
> {
>   "namespace": "com.cmiller",
>   "name": "test1",
>   "type": "record",
>   "fields": [
> { "name":"foo1", "type":"string" }
>   ]
> }
> 
>
> test2.avsc:
> 
> {
>   "namespace": "com.cmiller",
>   "name": "test1",
>   "type": "record",
>   "fields": [
> { "name":"foo1", "type":"string" },
> { "name":"enum1", "type": { "type":"enum", "name":"enum1_values",
> "symbols":["BLUE","RED", "GREEN"]} }
>   ]
> }
> 
>
> test1.json (encoded and saved to test/test1.avro):
> 
> { "foo1": "test123" }
> 
>
> test2.json (encoded and saved to test/test1.avro):
> 
> { "foo1": "test123", "enum1": "BLUE" }
> 
>
> Here is how I create the tables and add the data:
>
> 
> CREATE TABLE test1
> PARTITIONED BY (ds STRING)
> ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
> STORED AS INPUTFORMAT
> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
> OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
> TBLPROPERTIES ('avro.schema.url'='hdfs:///path/to/schemas/test1.avsc');
>
> ALTER TABLE test1 ADD PARTITION (ds='1') LOCATION
> 's3://spark-data/dev/test1';
>
>
> CREATE TABLE test2
> PARTITIONED BY (ds STRING)
> ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
> STORED AS INPUTFORMAT
> 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
> OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
> TBLPROPERTIES ('avro.schema.url'='hdfs:///path/to/schemas/test2.avsc');
>
> ALTER TABLE test2 ADD PARTITION (ds='1') LOCATION
> 's3://spark-data/dev/test2';
> 
>
> And here's what I get:
>
> 
> SELECT * FROM test1;
> -- works fine, shows data
>
> SELECT * FROM test2;
>
> org.apache.avro.AvroTypeException: Found com.cmiller.enum1_values,
> expecting union
> at
> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
> at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
> at
> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
> at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
> at
> org.apache.hadoop.hive.serde2.avro.AvroDeserializer$SchemaReEncoder.reencode(AvroDeserializer.java:111)
> at
> org.apache.hadoop.hive.serde2.avro.AvroDeserializer.deserialize(AvroDeserializer.java:175)
> at
> org.apache.hadoop.hive.serde2.avro.AvroSerDe.deserialize(AvroSerDe.java:201)
> at
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:409)
> at
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:408)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to
> (TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
> at 

Re: Add Jars to Master/Worker classpath

2016-03-02 Thread Igor Berman
spark.driver.extraClassPath
spark.executor.extraClassPath

2016-03-02 18:01 GMT+02:00 Matthias Niehoff :

> Hi,
>
> we want to add jars to the Master and Worker class path mainly for logging
> reason (we have a redis appender to send logs to redis -> logstash ->
> elasticsearch).
>
> While it is working with setting SPARK_CLASSPATH, this solution is afaik
> deprecated and should not be used. Furthermore we are also using 
> —driver-java-options
> and spark.executor.extraClassPath which leads to exceptions when running
> our apps in standalone cluster mode.
>
> So what is the best way to add jars to the master and worker classpath?
>
> Thank you
>
> --
> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
> 172.1702676
> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
> www.more4fi.de
>
> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>
> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
> und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
> Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
> bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
> beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
> evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
> nicht gestattet
>


Re: .cache() changes contents of RDD

2016-02-27 Thread Igor Berman
are you using avro format by any chance?
there is some formats that need to be "deep"-copy before caching or
aggregating
try something like
val input = sc.newAPIHadoopRDD(...)
val rdd = input.map(deepCopyTransformation).map(...)
rdd.cache()
rdd.saveAsTextFile(...)

where deepCopyTransformation is function that deep copies every object

On 26 February 2016 at 19:41, Yan Yang  wrote:

> Hi
>
> I am pretty new to Spark, and after experimentation on our pipelines. I
> ran into this weird issue.
>
> The Scala code is as below:
>
> val input = sc.newAPIHadoopRDD(...)
> val rdd = input.map(...)
> rdd.cache()
> rdd.saveAsTextFile(...)
>
> I found rdd to consist of 80+K identical rows. To be more precise, the
> number of rows is right, but all are identical.
>
> The truly weird part is if I remove rdd.cache(), everything works just
> fine. I have encountered this issue on a few occasions.
>
> Thanks
> Yan
>
>
>
>
>


Re: DirectFileOutputCommiter

2016-02-27 Thread Igor Berman
Hi Reynold,
thanks for the response
Yes, speculation mode needs some coordination.
Regarding job failure :
correct me if I wrong - if one of jobs fails - client code will be sort of
"notified" by exception or something similar, so the client can decide to
re-submit action(job), i.e. it won't be "silent" failure.


On 26 February 2016 at 11:50, Reynold Xin <r...@databricks.com> wrote:

> It could lose data in speculation mode, or if any job fails.
>
> On Fri, Feb 26, 2016 at 3:45 AM, Igor Berman <igor.ber...@gmail.com>
> wrote:
>
>> Takeshi, do you know the reason why they wanted to remove this commiter
>> in SPARK-10063?
>> the jira has no info inside
>> as far as I understand the direct committer can't be used when either of
>> two is true
>> 1. speculation mode
>> 2. append mode(ie. not creating new version of data but appending to
>> existing data)
>>
>> On 26 February 2016 at 08:24, Takeshi Yamamuro <linguin@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Great work!
>>> What is the concrete performance gain of the committer on s3?
>>> I'd like to know.
>>>
>>> I think there is no direct committer for files because these kinds of
>>> committer has risks
>>> to loss data (See: SPARK-10063).
>>> Until this resolved, ISTM files cannot support direct commits.
>>>
>>> thanks,
>>>
>>>
>>>
>>> On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu <teng...@gmail.com> wrote:
>>>
>>>> yes, should be this one
>>>> https://gist.github.com/aarondav/c513916e72101bbe14ec
>>>>
>>>> then need to set it in spark-defaults.conf :
>>>> https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13
>>>>
>>>> Am Freitag, 26. Februar 2016 schrieb Yin Yang :
>>>> > The header of DirectOutputCommitter.scala says Databricks.
>>>> > Did you get it from Databricks ?
>>>> > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu <teng...@gmail.com> wrote:
>>>> >>
>>>> >> interesting in this topic as well, why the DirectFileOutputCommitter
>>>> not included?
>>>> >> we added it in our fork,
>>>> under 
>>>> core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
>>>> >> moreover, this DirectFileOutputCommitter is not working for the
>>>> insert operations in HiveContext, since the Committer is called by hive
>>>> (means uses dependencies in hive package)
>>>> >> we made some hack to fix this, you can take a look:
>>>> >>
>>>> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
>>>> >>
>>>> >> may bring some ideas to other spark contributors to find a better
>>>> way to use s3.
>>>> >>
>>>> >> 2016-02-22 23:18 GMT+01:00 igor.berman <igor.ber...@gmail.com>:
>>>> >>>
>>>> >>> Hi,
>>>> >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
>>>> alikes
>>>> >>> especially when working with s3?
>>>> >>> I know that there is one impl in spark distro for parquet format,
>>>> but not
>>>> >>> for files -  why?
>>>> >>>
>>>> >>> Imho, it can bring huge performance boost.
>>>> >>> Using default FileOutputCommiter with s3 has big overhead at commit
>>>> stage
>>>> >>> when all parts are copied one-by-one to destination dir from
>>>> _temporary,
>>>> >>> which is bottleneck when number of partitions is high.
>>>> >>>
>>>> >>> Also, wanted to know if there are some problems when using
>>>> >>> DirectFileOutputCommitter?
>>>> >>> If writing one partition directly will fail in the middle is spark
>>>> will
>>>> >>> notice this and will fail job(say after all retries)?
>>>> >>>
>>>> >>> thanks in advance
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>>
>>>> >>> --
>>>> >>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
>>>> >>> Sent from the Apache Spark User List mailing list archive at
>>>> Nabble.com.
>>>> >>>
>>>> >>>
>>>> -
>>>> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> >>> For additional commands, e-mail: user-h...@spark.apache.org
>>>> >>>
>>>> >>
>>>> >
>>>> >
>>>>
>>>
>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>>
>


Re: Bug in DiskBlockManager subDirs logic?

2016-02-26 Thread Igor Berman
I've experienced such kind of outputs when executor was killed(e.g. by OOM
killer) or was lost for some reason
i.e. try to look at machine if executor wasn't restarted...

On 26 February 2016 at 08:37, Takeshi Yamamuro 
wrote:

> Hi,
>
> Could you make simple codes to reproduce the issue?
> I'm not exactly sure why shuffle data on temp dir. are wrongly deleted.
>
> thanks,
>
>
>
> On Fri, Feb 26, 2016 at 6:00 AM, Zee Chen  wrote:
>
>> Hi,
>>
>> I am debugging a situation where SortShuffleWriter sometimes fail to
>> create a file, with the following stack trace:
>>
>> 16/02/23 11:48:46 ERROR Executor: Exception in task 13.0 in stage
>> 47827.0 (TID 1367089)
>> java.io.FileNotFoundException:
>>
>> /tmp/spark-9dd8dca9-6803-4c6c-bb6a-0e9c0111837c/executor-129dfdb8-9422-4668-989e-e789703526ad/blockmgr-dda6e340-7859-468f-b493-04e4162d341a/00/temp_shuffle_69fe1673-9ff2-462b-92b8-683d04669aad
>> (No such file or directory)
>> at java.io.FileOutputStream.open0(Native Method)
>> at java.io.FileOutputStream.open(FileOutputStream.java:270)
>> at java.io.FileOutputStream.(FileOutputStream.java:213)
>> at
>> org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88)
>> at
>> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:110)
>> at
>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>> at
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> at org.apache.spark.scheduler.Task.run(Task.scala:88)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>> I checked the linux file system (ext4) and saw the /00/ subdir is
>> missing. I went through the heap dump of the
>> CoarseGrainedExecutorBackend jvm proc and found that
>> DiskBlockManager's subDirs list had more non-null 2-hex subdirs than
>> present on the file system! As a test I created all 64 2-hex subdirs
>> by hand and then the problem went away.
>>
>> So had anybody else seen this problem? Looking at the relevant logic
>> in DiskBlockManager and it hasn't changed much since the fix to
>> https://issues.apache.org/jira/browse/SPARK-6468
>>
>> My configuration:
>> spark-1.5.1, hadoop-2.6.0, standalone, oracle jdk8u60
>>
>> Thanks,
>> Zee
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Standalone vs. Mesos for production installation on a smallish cluster

2016-02-26 Thread Igor Berman
Imho most of production clusters are standalone
there was some presentation from spark summit with some stats inside(can't
find right now), so standalone was at 1st place
it was from Matei
https://databricks.com/resources/slides

On 26 February 2016 at 13:40, Petr Novak  wrote:

> Hi all,
> I believe that it used to be in documentation that Standalone mode is not
> for production. I'm either wrong or it was already removed.
>
> Having a small cluster between 5-10 nodes is Standalone recommended for
> production? I would like to go with Mesos but the question is if there is
> real add-on value for production, mainly from stability perspective.
>
> Can I expect that adding Mesos will improve stability compared to
> Standalone to the extent to justify itself according to somewhat increased
> complexity?
>
> I know it is hard to answer because Mesos layer itself is going to add
> some bugs as well.
>
> Are there unique features enabled by Mesos specific to Spark? E.g.
> adaptive resources for jobs or whatever?
>
> In the future once cluster will grow and more services running on Mesos,
> we plan to use Mesos. The question is if it does worth to go with it
> immediately even maybe its utility is not directly needed at this point.
>
> Many thanks,
> Petr
>


Re: DirectFileOutputCommiter

2016-02-26 Thread Igor Berman
Takeshi, do you know the reason why they wanted to remove this commiter in
SPARK-10063?
the jira has no info inside
as far as I understand the direct committer can't be used when either of
two is true
1. speculation mode
2. append mode(ie. not creating new version of data but appending to
existing data)

On 26 February 2016 at 08:24, Takeshi Yamamuro 
wrote:

> Hi,
>
> Great work!
> What is the concrete performance gain of the committer on s3?
> I'd like to know.
>
> I think there is no direct committer for files because these kinds of
> committer has risks
> to loss data (See: SPARK-10063).
> Until this resolved, ISTM files cannot support direct commits.
>
> thanks,
>
>
>
> On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu  wrote:
>
>> yes, should be this one
>> https://gist.github.com/aarondav/c513916e72101bbe14ec
>>
>> then need to set it in spark-defaults.conf :
>> https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13
>>
>> Am Freitag, 26. Februar 2016 schrieb Yin Yang :
>> > The header of DirectOutputCommitter.scala says Databricks.
>> > Did you get it from Databricks ?
>> > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu  wrote:
>> >>
>> >> interesting in this topic as well, why the DirectFileOutputCommitter
>> not included?
>> >> we added it in our fork,
>> under core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
>> >> moreover, this DirectFileOutputCommitter is not working for the insert
>> operations in HiveContext, since the Committer is called by hive (means
>> uses dependencies in hive package)
>> >> we made some hack to fix this, you can take a look:
>> >>
>> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
>> >>
>> >> may bring some ideas to other spark contributors to find a better way
>> to use s3.
>> >>
>> >> 2016-02-22 23:18 GMT+01:00 igor.berman :
>> >>>
>> >>> Hi,
>> >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
>> alikes
>> >>> especially when working with s3?
>> >>> I know that there is one impl in spark distro for parquet format, but
>> not
>> >>> for files -  why?
>> >>>
>> >>> Imho, it can bring huge performance boost.
>> >>> Using default FileOutputCommiter with s3 has big overhead at commit
>> stage
>> >>> when all parts are copied one-by-one to destination dir from
>> _temporary,
>> >>> which is bottleneck when number of partitions is high.
>> >>>
>> >>> Also, wanted to know if there are some problems when using
>> >>> DirectFileOutputCommitter?
>> >>> If writing one partition directly will fail in the middle is spark
>> will
>> >>> notice this and will fail job(say after all retries)?
>> >>>
>> >>> thanks in advance
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
>> >>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >>>
>> >>> -
>> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >>> For additional commands, e-mail: user-h...@spark.apache.org
>> >>>
>> >>
>> >
>> >
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: DirectFileOutputCommiter

2016-02-26 Thread Igor Berman
Alexander,
implementation you've attaches supports both modes configured by property "
mapred.output.direct." + fs.getClass().getSimpleName()
as soon as you see _temporary dir probably the mode is off i.e. the default
impl is working and you experiencing some other problem.

On 26 February 2016 at 10:57, Alexander Pivovarov 
wrote:

> Amazon uses the following impl
> https://gist.github.com/apivovarov/bb215f08318318570567
> But for some reason Spark show error at the end of the job
>
> 16/02/26 08:16:54 INFO scheduler.DAGScheduler: ResultStage 0
> (saveAsTextFile at :28) finished in 14.305 s
> 16/02/26 08:16:54 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose
> tasks have all completed, from pool
> 16/02/26 08:16:54 INFO scheduler.DAGScheduler: Job 0 finished:
> saveAsTextFile at :28, took 14.467271 s
> java.io.FileNotFoundException: File
> s3n://my-backup/test/test1/_temporary/0 does not exist.
> at
> org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:564)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1485)
> at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1525)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:269)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:309)
> at
> org.apache.hadoop.mapred.FileOutputCommitter.commitJob(FileOutputCommitter.java:136)
> at
> org.apache.spark.SparkHadoopWriter.commitJob(SparkHadoopWriter.scala:112)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1214)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156)
> at
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1156)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>
>
> Another implementation works fine
> https://gist.github.com/aarondav/c513916e72101bbe14ec
>
> On Thu, Feb 25, 2016 at 10:24 PM, Takeshi Yamamuro 
> wrote:
>
>> Hi,
>>
>> Great work!
>> What is the concrete performance gain of the committer on s3?
>> I'd like to know.
>>
>> I think there is no direct committer for files because these kinds of
>> committer has risks
>> to loss data (See: SPARK-10063).
>> Until this resolved, ISTM files cannot support direct commits.
>>
>> thanks,
>>
>>
>>
>> On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu  wrote:
>>
>>> yes, should be this one
>>> https://gist.github.com/aarondav/c513916e72101bbe14ec
>>>
>>> then need to set it in spark-defaults.conf :
>>> https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13
>>>
>>> Am Freitag, 26. Februar 2016 schrieb Yin Yang :
>>> > The header of DirectOutputCommitter.scala says Databricks.
>>> > Did you get it from Databricks ?
>>> > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu  wrote:
>>> >>
>>> >> interesting in this topic as well, why the DirectFileOutputCommitter
>>> not included?
>>> >> we added it in our fork,
>>> under 
>>> core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
>>> >> moreover, this DirectFileOutputCommitter is not working for the
>>> insert operations in HiveContext, since the Committer is called by hive
>>> (means uses dependencies in hive package)
>>> >> we made some hack to fix this, you can take a look:
>>> >>
>>> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
>>> >>
>>> >> may bring some ideas to other spark contributors to find a better way
>>> to use s3.
>>> >>
>>> >> 2016-02-22 23:18 GMT+01:00 igor.berman :
>>> >>>
>>> >>> Hi,
>>> >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
>>> alikes
>>> >>> especially when working with s3?
>>> >>> I know that there is one impl in spark distro for parquet format,
>>> but not
>>> >>> for files -  why?
>>> >>>
>>> >>> Imho, it can bring huge performance boost.
>>> >>> Using default FileOutputCommiter with s3 has big overhead at commit
>>> stage
>>> >>> when all parts are copied one-by-one to destination dir from
>>> _temporary,
>>> >>> which is bottleneck when number of partitions is high.
>>> >>>
>>> >>> Also, wanted to know if there are some problems when using
>>> >>> DirectFileOutputCommitter?
>>> >>> If writing one partition directly will fail in the middle is spark
>>> will
>>> >>> notice this and will fail job(say after all retries)?
>>> >>>
>>> >>> thanks in advance
>>> >>>
>>> >>>
>>> >>>
>>> >>>
>>> >>> --
>>> >>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
>>> 

Re: DirectFileOutputCommiter

2016-02-26 Thread Igor Berman
the performance gain is for commit stage when data is moved from _temporary
directory to distination directory
since s3 is key-value really the move operation is like copy operation


On 26 February 2016 at 08:24, Takeshi Yamamuro 
wrote:

> Hi,
>
> Great work!
> What is the concrete performance gain of the committer on s3?
> I'd like to know.
>
> I think there is no direct committer for files because these kinds of
> committer has risks
> to loss data (See: SPARK-10063).
> Until this resolved, ISTM files cannot support direct commits.
>
> thanks,
>
>
>
> On Fri, Feb 26, 2016 at 8:39 AM, Teng Qiu  wrote:
>
>> yes, should be this one
>> https://gist.github.com/aarondav/c513916e72101bbe14ec
>>
>> then need to set it in spark-defaults.conf :
>> https://github.com/zalando/spark/commit/3473f3f1ef27830813c1e0b3686e96a55f49269c#diff-f7a46208be9e80252614369be6617d65R13
>>
>> Am Freitag, 26. Februar 2016 schrieb Yin Yang :
>> > The header of DirectOutputCommitter.scala says Databricks.
>> > Did you get it from Databricks ?
>> > On Thu, Feb 25, 2016 at 3:01 PM, Teng Qiu  wrote:
>> >>
>> >> interesting in this topic as well, why the DirectFileOutputCommitter
>> not included?
>> >> we added it in our fork,
>> under core/src/main/scala/org/apache/spark/mapred/DirectOutputCommitter.scala
>> >> moreover, this DirectFileOutputCommitter is not working for the insert
>> operations in HiveContext, since the Committer is called by hive (means
>> uses dependencies in hive package)
>> >> we made some hack to fix this, you can take a look:
>> >>
>> https://github.com/apache/spark/compare/branch-1.6...zalando:branch-1.6-zalando
>> >>
>> >> may bring some ideas to other spark contributors to find a better way
>> to use s3.
>> >>
>> >> 2016-02-22 23:18 GMT+01:00 igor.berman :
>> >>>
>> >>> Hi,
>> >>> Wanted to understand if anybody uses DirectFileOutputCommitter or
>> alikes
>> >>> especially when working with s3?
>> >>> I know that there is one impl in spark distro for parquet format, but
>> not
>> >>> for files -  why?
>> >>>
>> >>> Imho, it can bring huge performance boost.
>> >>> Using default FileOutputCommiter with s3 has big overhead at commit
>> stage
>> >>> when all parts are copied one-by-one to destination dir from
>> _temporary,
>> >>> which is bottleneck when number of partitions is high.
>> >>>
>> >>> Also, wanted to know if there are some problems when using
>> >>> DirectFileOutputCommitter?
>> >>> If writing one partition directly will fail in the middle is spark
>> will
>> >>> notice this and will fail job(say after all retries)?
>> >>>
>> >>> thanks in advance
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/DirectFileOutputCommiter-tp26296.html
>> >>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>> >>>
>> >>> -
>> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >>> For additional commands, e-mail: user-h...@spark.apache.org
>> >>>
>> >>
>> >
>> >
>>
>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: reasonable number of executors

2016-02-23 Thread Igor Berman
http://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications

there is a section that is connected to your question

On 23 February 2016 at 16:49, Alex Dzhagriev  wrote:

> Hello all,
>
> Can someone please advise me on the pros and cons on how to allocate the
> resources: many small heap machines with 1 core or few machines with big
> heaps and many cores? I'm sure that depends on the data flow and there is
> no best practise solution. E.g. with bigger heap I can perform map-side
> join with bigger table. What other considerations should I keep in mind in
> order to choose the right configuration?
>
> Thanks, Alex.
>


Re: SPARK-9559

2016-02-18 Thread Igor Berman
what are you trying to solve?
killing worker jvm is like killing yarn node manager...why would you do
this?
usually worker jvm is "agent" on each worker machine which opens executors
per each application, so it doesn't works hard or has big memory footprint
yes it can fail, but it rather corner situation which also might be handled
with monitoring/automatic restarts etc



On 18 February 2016 at 17:21, Daniel Darabos <
daniel.dara...@lynxanalytics.com> wrote:

> YARN may be a workaround.
>
> On Thu, Feb 18, 2016 at 4:13 PM, Ashish Soni 
> wrote:
>
>> Hi All ,
>>
>> Just wanted to know if there is any work around or resolution for below
>> issue in Stand alone mode
>>
>> https://issues.apache.org/jira/browse/SPARK-9559
>>
>> Ashish
>>
>
>


Re: newbie unable to write to S3 403 forbidden error

2016-02-12 Thread Igor Berman
 String dirPath = "s3n://s3-us-west-1.amazonaws.com/com.pws.twitter/*json” *

not sure, but
can you try to remove s3-us-west-1.amazonaws.com
 from path ?

On 11 February 2016 at 23:15, Andy Davidson 
wrote:

> I am using spark 1.6.0 in a cluster created using the spark-ec2 script. I
> am using the standalone cluster manager
>
> My java streaming app is not able to write to s3. It appears to be some
> for of permission problem.
>
> Any idea what the problem might be?
>
> I tried use the IAM simulator to test the policy. Everything seems okay.
> Any idea how I can debug this problem?
>
> Thanks in advance
>
> Andy
>
> JavaSparkContext jsc = new JavaSparkContext(conf);
>
> // I did not include the full key in my email
>// the keys do not contain ‘\’
>// these are the keys used to create the cluster. They belong to
> the IAM user andy
>
> jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "AKIAJREX"
> );
>
> jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey",
> "uBh9v1hdUctI23uvq9qR");
>
>
>
>   private static void saveTweets(JavaDStream jsonTweets, String
> outputURI) {
>
> jsonTweets.foreachRDD(new VoidFunction2() {
>
> private static final long serialVersionUID = 1L;
>
>
> @Override
>
> public void call(JavaRDD rdd, Time time) throws
> Exception {
>
> if(!rdd.isEmpty()) {
>
> // bucket name is ‘com.pws.twitter’ it has a folder ‘json'
>
> String dirPath = "s3n://
> s3-us-west-1.amazonaws.com/com.pws.twitter/*json” *+ "-" + time
> .milliseconds();
>
> rdd.saveAsTextFile(dirPath);
>
> }
>
> }
>
> });
>
>
>
>
> Bucket name : com.pws.titter
> Bucket policy (I replaced the account id)
>
> {
> "Version": "2012-10-17",
> "Id": "Policy1455148808376",
> "Statement": [
> {
> "Sid": "Stmt1455148797805",
> "Effect": "Allow",
> "Principal": {
> "AWS": "arn:aws:iam::123456789012:user/andy"
> },
> "Action": "s3:*",
> "Resource": "arn:aws:s3:::com.pws.twitter/*"
> }
> ]
> }
>
>
>


Re: Shuffle memory woes

2016-02-08 Thread Igor Berman
It's interesting to see what spark dev people will say.
Corey do you have presentation available online?

On 8 February 2016 at 05:16, Corey Nolet <cjno...@gmail.com> wrote:

> Charles,
>
> Thank you for chiming in and I'm glad someone else is experiencing this
> too and not just me. I know very well how the Spark shuffles work and I've
> done deep dive presentations @ Spark meetups in the past. This problem is
> somethng that goes beyond that and, I believe, it exposes a fundamental
> paradigm flaw in the design of Spark, unfortunately. Good thing is, I think
> it can be fixed.
>
> Also- in regards to how much data actually gets shuffled- believe it or
> not this problem can take a 30-40 minute job and make it run for 4 or more
> hours. If  let the job run for 4+ hours the amount of data being shuffled
> for this particular dataset will be 100 or more TB. Usually, however, I end
> up killing the job long before that point because I realize it should not
> be taking this long. The particular dataset we're doing is not for
> real-time exploration. These are very large joins we're doing for jobs that
> we run a few times a day.
>
> On Sun, Feb 7, 2016 at 9:56 PM, Charles Chao <xpnc54byp...@gmail.com>
> wrote:
>
>>  "The dataset is 100gb at most, the spills can up to 10T-100T"
>>
>> -- I have had the same experiences, although not to this extreme (the
>> spills were < 10T while the input was ~ 100s gb) and haven't found any
>> solution yet. I don't believe this is related to input data format. in my
>> case, I got my input data by loading from Hive tables.
>>
>> On Sun, Feb 7, 2016 at 6:28 AM, Sea <261810...@qq.com> wrote:
>>
>>> Hi,Corey:
>>>"The dataset is 100gb at most, the spills can up to 10T-100T", Are
>>> your input files lzo format, and you use sc.text() ? If memory is not
>>> enough, spark will spill 3-4x of input data to disk.
>>>
>>>
>>> -- 原始邮件 --
>>> *发件人:* "Corey Nolet";<cjno...@gmail.com>;
>>> *发送时间:* 2016年2月7日(星期天) 晚上8:56
>>> *收件人:* "Igor Berman"<igor.ber...@gmail.com>;
>>> *抄送:* "user"<user@spark.apache.org>;
>>> *主题:* Re: Shuffle memory woes
>>>
>>> As for the second part of your questions- we have a fairly complex join
>>> process which requires a ton of stage orchestration from our driver. I've
>>> written some code to be able to walk down our DAG tree and execute siblings
>>> in the tree concurrently where possible (forcing cache to disk on children
>>> that that have multiple chiildren themselves so that they can be run
>>> concurrently). Ultimatey, we have seen significant speedup in our jobs by
>>> keeping tasks as busy as possible processing concurrent stages. Funny
>>> enough though, the stage that is causing problems with shuffling for us has
>>> a lot of children and doesn't even run concurrently with any other stages
>>> so I ruled out the concurrency of the stages as a culprit for the
>>> shuffliing problem we're seeing.
>>>
>>> On Sun, Feb 7, 2016 at 7:49 AM, Corey Nolet <cjno...@gmail.com> wrote:
>>>
>>>> Igor,
>>>>
>>>> I don't think the question is "why can't it fit stuff in memory". I
>>>> know why it can't fit stuff in memory- because it's a large dataset that
>>>> needs to have a reduceByKey() run on it. My understanding is that when it
>>>> doesn't fit into memory it needs to spill in order to consolidate
>>>> intermediary files into a single file. The more data you need to run
>>>> through this, the more it will need to spill. My findings is that once it
>>>> gets stuck in this spill chain with our dataset it's all over @ that point
>>>> because it will spill and spill and spill and spill and spill. If I give
>>>> the shuffle enough memory it won't- irrespective of the number of
>>>> partitions we have (i've done everything from repartition(500) to
>>>> repartition(2500)). It's not a matter of running out of memory on a single
>>>> node because the data is skewed. It's more a matter of the shuffle buffer
>>>> filling up and needing to spill. I think what may be happening is that it
>>>> gets to a point where it's spending more time reading/writing from disk
>>>> while doing the spills then it is actually processing any data. I can tell
>>>> this because I can see that the spills sometimes get up into the 10's to
>>>> 100's of TB where the input da

Re: Imported CSV file content isn't identical to the original file

2016-02-07 Thread Igor Berman
show has argument of truncate
pass false so it wont truncate your results

On 7 February 2016 at 11:01, SLiZn Liu  wrote:

> Plus, I’m using *Spark 1.5.2*, with *spark-csv 1.3.0*. Also tried
> HiveContext, but the result is exactly the same.
> ​
>
> On Sun, Feb 7, 2016 at 3:44 PM SLiZn Liu  wrote:
>
>> Hi Spark Users Group,
>>
>> I have a csv file to analysis with Spark, but I’m troubling with
>> importing as DataFrame.
>>
>> Here’s the minimal reproducible example. Suppose I’m having a
>> *10(rows)x2(cols)* *space-delimited csv* file, shown as below:
>>
>> 1446566430 2015-11-0400:00:30
>> 1446566430 2015-11-0400:00:30
>> 1446566430 2015-11-0400:00:30
>> 1446566430 2015-11-0400:00:30
>> 1446566430 2015-11-0400:00:30
>> 1446566431 2015-11-0400:00:31
>> 1446566431 2015-11-0400:00:31
>> 1446566431 2015-11-0400:00:31
>> 1446566431 2015-11-0400:00:31
>> 1446566431 2015-11-0400:00:31
>>
>> the  in column 2 represents sub-delimiter within that column, and
>> this file is stored on HDFS, let’s say the path is hdfs:///tmp/1.csv
>>
>> I’m using *spark-csv* to import this file as Spark *DataFrame*:
>>
>> sqlContext.read.format("com.databricks.spark.csv")
>> .option("header", "false") // Use first line of all files as header
>> .option("inferSchema", "false") // Automatically infer data types
>> .option("delimiter", " ")
>> .load("hdfs:///tmp/1.csv")
>> .show
>>
>> Oddly, the output shows only a part of each column:
>>
>> [image: Screenshot from 2016-02-07 15-27-51.png]
>>
>> and even the boundary of the table wasn’t shown correctly. I also used
>> the other way to read csv file, by sc.textFile(...).map(_.split(" "))
>> and sqlContext.createDataFrame, and the result is the same. Can someone
>> point me out where I did it wrong?
>>
>> —
>> BR,
>> Todd Leo
>> ​
>>
>


Re: Shuffle memory woes

2016-02-07 Thread Igor Berman
so can you provide code snippets: especially it's interesting to see what
are your transformation chain, how many partitions are there on each side
of shuffle operation

the question is why it can't fit stuff in memory when you are shuffling -
maybe your partitioner on "reduce" side is not configured properly? I mean
if map side is ok, and you just reducing by key or something it should be
ok, so some detail is missing...skewed data? aggregate by key?

On 6 February 2016 at 20:13, Corey Nolet <cjno...@gmail.com> wrote:

> Igor,
>
> Thank you for the response but unfortunately, the problem I'm referring to
> goes beyond this. I have set the shuffle memory fraction to be 90% and set
> the cache memory to be 0. Repartitioning the RDD helped a tad on the map
> side but didn't do much for the spilling when there was no longer any
> memory left for the shuffle. Also the new auto-memory management doesn't
> seem like it'll have too much of an effect after i've already given most
> the memory i've allocated to the shuffle. The problem I'm having is most
> specifically related to the shuffle performing declining by several orders
> of magnitude when it needs to spill multiple times (it ends up spilling
> several hundred for me when it can't fit stuff into memory).
>
>
>
> On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman <igor.ber...@gmail.com> wrote:
>
>> Hi,
>> usually you can solve this by 2 steps
>> make rdd to have more partitions
>> play with shuffle memory fraction
>>
>> in spark 1.6 cache vs shuffle memory fractions are adjusted automatically
>>
>> On 5 February 2016 at 23:07, Corey Nolet <cjno...@gmail.com> wrote:
>>
>>> I just recently had a discovery that my jobs were taking several hours
>>> to completely because of excess shuffle spills. What I found was that when
>>> I hit the high point where I didn't have enough memory for the shuffles to
>>> store all of their file consolidations at once, it could spill so many
>>> times that it causes my job's runtime to increase by orders of magnitude
>>> (and sometimes fail altogether).
>>>
>>> I've played with all the tuning parameters I can find. To speed the
>>> shuffles up, I tuned the akka threads to different values. I also tuned the
>>> shuffle buffering a tad (both up and down).
>>>
>>> I feel like I see a weak point here. The mappers are sharing memory
>>> space with reducers and the shuffles need enough memory to consolidate and
>>> pull otherwise they will need to spill and spill and spill. What i've
>>> noticed about my jobs is that this is a difference between them taking 30
>>> minutes and 4 hours or more. Same job- just different memory tuning.
>>>
>>> I've found that, as a result of the spilling, I'm better off not caching
>>> any data in memory and lowering my storage fraction to 0 and still hoping I
>>> was able to give my shuffles enough memory that my data doesn't
>>> continuously spill. Is this the way it's supposed to be? It makes it hard
>>> because it seems like it forces the memory limits on my job- otherwise it
>>> could take orders of magnitude longer to execute.
>>>
>>>
>>
>


Re: Shuffle memory woes

2016-02-06 Thread Igor Berman
Hi,
usually you can solve this by 2 steps
make rdd to have more partitions
play with shuffle memory fraction

in spark 1.6 cache vs shuffle memory fractions are adjusted automatically

On 5 February 2016 at 23:07, Corey Nolet  wrote:

> I just recently had a discovery that my jobs were taking several hours to
> completely because of excess shuffle spills. What I found was that when I
> hit the high point where I didn't have enough memory for the shuffles to
> store all of their file consolidations at once, it could spill so many
> times that it causes my job's runtime to increase by orders of magnitude
> (and sometimes fail altogether).
>
> I've played with all the tuning parameters I can find. To speed the
> shuffles up, I tuned the akka threads to different values. I also tuned the
> shuffle buffering a tad (both up and down).
>
> I feel like I see a weak point here. The mappers are sharing memory space
> with reducers and the shuffles need enough memory to consolidate and pull
> otherwise they will need to spill and spill and spill. What i've noticed
> about my jobs is that this is a difference between them taking 30 minutes
> and 4 hours or more. Same job- just different memory tuning.
>
> I've found that, as a result of the spilling, I'm better off not caching
> any data in memory and lowering my storage fraction to 0 and still hoping I
> was able to give my shuffles enough memory that my data doesn't
> continuously spill. Is this the way it's supposed to be? It makes it hard
> because it seems like it forces the memory limits on my job- otherwise it
> could take orders of magnitude longer to execute.
>
>


Re: multi-threaded Spark jobs

2016-01-25 Thread Igor Berman
IMHO, you are making mistake.
spark manages tasks and cores internally. when you open new threads inside
executor - meaning you "over-provisioning" executor(e.g. tasks on other
cores will be preempted)



On 26 January 2016 at 07:59, Elango Cheran  wrote:

> Hi everyone,
> I've gone through the effort of figuring out how to modify a Spark job to
> have an operation become multi-threaded inside an executor.  I've written
> up an explanation of what worked, what didn't work, and why:
>
>
> http://www.elangocheran.com/blog/2016/01/using-clojure-to-create-multi-threaded-spark-jobs/
>
> I think the ideas there should be applicable generally -- which would
> include Scala and Java since the JVM is genuinely multi-threaded -- and
> therefore may be of interest to others.  I will need to convert this code
> to Scala for personal requirements in the near future, anyways.
>
> I hope this helps.
>
> -- Elango
>


Re: Converting CSV files to Avro

2016-01-17 Thread Igor Berman
https://github.com/databricks/spark-avro ?

On 17 January 2016 at 13:46, Gideon  wrote:

> Hi everyone,
>
> I'm writing a Scala program which uses  Spark CSV
>    to read CSV files from a
> directory. After reading the CSVs as data frames I need to convert them to
> Avro format since I need to eventually convert that data to a
> GenericRecord
> <
> https://avro.apache.org/docs/1.7.6/api/java/org/apache/avro/generic/GenericData.Record.html
> >
> for further processing.
> I know I can do toJSON on the DF and get a valid JSON format, however I
> need
> it to be Avro compliant (I have some nullable fields in these CSV files
> which require  special handling
> <
> http://stackoverflow.com/questions/27485580/how-to-fix-expected-start-union-got-value-number-int-when-converting-json-to-av
> >
> when converting to Avro)
>
> Does anyone have any idea on how this can be done besides normalizing all
> the fields by myself?
>
> Thanks in advance
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Converting-CSV-files-to-Avro-tp25985.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark job uses only one Worker

2016-01-07 Thread Igor Berman
read about *--total-executor-cores*
not sure why you specify port 6066 in master...usually it's 7077
verify in master ui(usually port 8080) how many cores are there(depends on
other configs, but usually workers connect to master with all their cores)

On 7 January 2016 at 23:46, Michael Pisula <michael.pis...@tngtech.com>
wrote:

> Hi,
>
> I start the cluster using the spark-ec2 scripts, so the cluster is in
> stand-alone mode.
> Here is how I submit my job:
> spark/bin/spark-submit --class demo.spark.StaticDataAnalysis --master
> spark://:6066 --deploy-mode cluster demo/Demo-1.0-SNAPSHOT-all.jar
>
> Cheers,
> Michael
>
>
> On 07.01.2016 22:41, Igor Berman wrote:
>
> share how you submit your job
> what cluster(yarn, standalone)
>
> On 7 January 2016 at 23:24, Michael Pisula <michael.pis...@tngtech.com>
> wrote:
>
>> Hi there,
>>
>> I ran a simple Batch Application on a Spark Cluster on EC2. Despite
>> having 3
>> Worker Nodes, I could not get the application processed on more than one
>> node, regardless if I submitted the Application in Cluster or Client mode.
>> I also tried manually increasing the number of partitions in the code, no
>> effect. I also pass the master into the application.
>> I verified on the nodes themselves that only one node was active while the
>> job was running.
>> I pass enough data to make the job take 6 minutes to process.
>> The job is simple enough, reading data from two S3 files, joining records
>> on
>> a shared field, filtering out some records and writing the result back to
>> S3.
>>
>> Tried all kinds of stuff, but could not make it work. I did find similar
>> questions, but had already tried the solutions that worked in those cases.
>> Would be really happy about any pointers.
>>
>> Cheers,
>> Michael
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-uses-only-one-Worker-tp25909.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
> --
> Michael Pisula * michael.pis...@tngtech.com * +49-174-3180084
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>


Re: Spark job uses only one Worker

2016-01-07 Thread Igor Berman
do you see in master ui that workers connected to master & before you are
running your app there are 2 available cores in master ui per each worker?
I understand that there are 2 cores on each worker - the question is do
they got registered under master

regarding port it's very strange, please post what is problem connecting to
7077

use *--total-executor-cores 4 in your submit*

if you can post master ui screen after you submitted your app


On 8 January 2016 at 00:02, Michael Pisula <michael.pis...@tngtech.com>
wrote:

> I had tried several parameters, including --total-executor-cores, no
> effect.
> As for the port, I tried 7077, but if I remember correctly I got some kind
> of error that suggested to try 6066, with which it worked just fine (apart
> from this issue here).
>
> Each worker has two cores. I also tried increasing cores, again no effect.
> I was able to increase the number of cores the job was using on one worker,
> but it would not use any other worker (and it would not start if the number
> of cores the job wanted was higher than the number available on one worker).
>
>
> On 07.01.2016 22:51, Igor Berman wrote:
>
> read about *--total-executor-cores*
> not sure why you specify port 6066 in master...usually it's 7077
> verify in master ui(usually port 8080) how many cores are there(depends on
> other configs, but usually workers connect to master with all their cores)
>
> On 7 January 2016 at 23:46, Michael Pisula <michael.pis...@tngtech.com>
> wrote:
>
>> Hi,
>>
>> I start the cluster using the spark-ec2 scripts, so the cluster is in
>> stand-alone mode.
>> Here is how I submit my job:
>> spark/bin/spark-submit --class demo.spark.StaticDataAnalysis --master
>> spark://:6066 --deploy-mode cluster demo/Demo-1.0-SNAPSHOT-all.jar
>>
>> Cheers,
>> Michael
>>
>>
>> On 07.01.2016 22:41, Igor Berman wrote:
>>
>> share how you submit your job
>> what cluster(yarn, standalone)
>>
>> On 7 January 2016 at 23:24, Michael Pisula < <michael.pis...@tngtech.com>
>> michael.pis...@tngtech.com> wrote:
>>
>>> Hi there,
>>>
>>> I ran a simple Batch Application on a Spark Cluster on EC2. Despite
>>> having 3
>>> Worker Nodes, I could not get the application processed on more than one
>>> node, regardless if I submitted the Application in Cluster or Client
>>> mode.
>>> I also tried manually increasing the number of partitions in the code, no
>>> effect. I also pass the master into the application.
>>> I verified on the nodes themselves that only one node was active while
>>> the
>>> job was running.
>>> I pass enough data to make the job take 6 minutes to process.
>>> The job is simple enough, reading data from two S3 files, joining
>>> records on
>>> a shared field, filtering out some records and writing the result back to
>>> S3.
>>>
>>> Tried all kinds of stuff, but could not make it work. I did find similar
>>> questions, but had already tried the solutions that worked in those
>>> cases.
>>> Would be really happy about any pointers.
>>>
>>> Cheers,
>>> Michael
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> <http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-uses-only-one-Worker-tp25909.html>
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-uses-only-one-Worker-tp25909.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: <user-unsubscr...@spark.apache.org>
>>> user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: <user-h...@spark.apache.org>
>>> user-h...@spark.apache.org
>>>
>>>
>>
>> --
>> Michael Pisula * michael.pis...@tngtech.com * +49-174-3180084
>> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
>> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
>> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>>
>>
>
> --
> Michael Pisula * michael.pis...@tngtech.com * +49-174-3180084
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>
>


Re: Spark job uses only one Worker

2016-01-07 Thread Igor Berman
share how you submit your job
what cluster(yarn, standalone)

On 7 January 2016 at 23:24, Michael Pisula 
wrote:

> Hi there,
>
> I ran a simple Batch Application on a Spark Cluster on EC2. Despite having
> 3
> Worker Nodes, I could not get the application processed on more than one
> node, regardless if I submitted the Application in Cluster or Client mode.
> I also tried manually increasing the number of partitions in the code, no
> effect. I also pass the master into the application.
> I verified on the nodes themselves that only one node was active while the
> job was running.
> I pass enough data to make the job take 6 minutes to process.
> The job is simple enough, reading data from two S3 files, joining records
> on
> a shared field, filtering out some records and writing the result back to
> S3.
>
> Tried all kinds of stuff, but could not make it work. I did find similar
> questions, but had already tried the solutions that worked in those cases.
> Would be really happy about any pointers.
>
> Cheers,
> Michael
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-job-uses-only-one-Worker-tp25909.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: coalesce(1).saveAsTextfile() takes forever?

2016-01-05 Thread Igor Berman
another option will be to try
rdd.toLocalIterator()
not sure if it will help though

I had same problem and ended up to move all parts to local disk(with Hadoop
FileSystem api) and then processing them locally


On 5 January 2016 at 22:08, Alexander Pivovarov 
wrote:

> try coalesce(1, true).
>
> On Tue, Jan 5, 2016 at 11:58 AM, unk1102  wrote:
>
>> hi I am trying to save many partitions of Dataframe into one CSV file and
>> it
>> take forever for large data sets of around 5-6 GB.
>>
>>
>> sourceFrame.coalesce(1).write().format("com.databricks.spark.csv").option("gzip").save("/path/hadoop")
>>
>> For small data above code works well but for large data it hangs forever
>> does not move on because of only one partitions has to shuffle data of GBs
>> please help me
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/coalesce-1-saveAsTextfile-takes-forever-tp25886.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: partitioning json data in spark

2015-12-27 Thread Igor Berman
have you tried to specify format of your output, might be parquet is
default format?
df.write().format("json").mode(SaveMode.Overwrite).save("/tmp/path");

On 27 December 2015 at 15:18, Նարեկ Գալստեան  wrote:

> Hey all!
> I am willing to partition *json *data by a column name and store the
> result as a collection of json files to be loaded to another database.
>
> I could use spark's built in *partitonBy *function but it only outputs in
> parquet format which is not desirable for me.
>
> Could you suggest me a way to deal with this problem?
> Narek Galstyan
>
> Նարեկ Գալստյան
>


Re: Stuck with DataFrame df.select("select * from table");

2015-12-25 Thread Igor Berman
sqlContext.sql("select * from table limit 5").show() (not sure if limit 5
supported)

or use Dmitriy's solution. select() defines your projection when you've
specified entire query

On 25 December 2015 at 15:42, Василец Дмитрий 
wrote:

> hello
> you can try to use df.limit(5).show()
> just trick :)
>
> On Fri, Dec 25, 2015 at 2:34 PM, Eugene Morozov <
> evgeny.a.moro...@gmail.com> wrote:
>
>> Hello, I'm basically stuck as I have no idea where to look;
>>
>> Following simple code, given that my Datasource is working gives me an
>> exception.
>>
>> DataFrame df = sqlc.load(filename, "com.epam.parso.spark.ds.DefaultSource");
>> df.cache();
>> df.printSchema();   <-- prints the schema perfectly fine!
>>
>> df.show();  <-- Works perfectly fine (shows table with 
>> 20 lines)!
>> df.registerTempTable("table");
>> df.select("select * from table limit 5").show(); <-- gives weird exception
>>
>> Exception is:
>>
>> AnalysisException: cannot resolve 'select * from table limit 5' given input 
>> columns VER, CREATED, SOC, SOCC, HLTC, HLGTC, STATUS
>>
>> I can do a collect on a dataframe, but cannot select any specific columns
>> either "select * from table" or "select VER, CREATED from table".
>>
>> I use spark 1.5.2.
>> The same code perfectly works through Zeppelin 0.5.5.
>>
>> Thanks.
>> --
>> Be well!
>> Jean Morozov
>>
>
>


Re: Fat jar can't find jdbc

2015-12-22 Thread Igor Berman
David, can you verify that mysql connector classes indeed in your single
jar?
open it with zip tool available at your platform

another options that might be a problem - if there is some dependency in
MANIFEST(not sure though this is the case of mysql connector) then it might
be broken after preparing single jar
so you need to verify that it's ok(in maven usually it's possible to define
merging policy for resources while creating single jar)

On 22 December 2015 at 10:04, Vijay Kiran  wrote:

> Can you paste your libraryDependencies from build.sbt ?
>
> ./Vijay
>
> > On 22 Dec 2015, at 06:12, David Yerrington  wrote:
> >
> > Hi Everyone,
> >
> > I'm building a prototype that fundamentally grabs data from a MySQL
> instance, crunches some numbers, and then moves it on down the pipeline.
> I've been using SBT with assembly tool to build a single jar for deployment.
> >
> > I've gone through the paces of stomping out many dependency problems and
> have come down to one last (hopefully) zinger.
> >
> > java.lang.ClassNotFoundException: Failed to load class for data source:
> jdbc.
> >
> > at
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:67)
> >
> > at
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:87)
> >
> > at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
> >
> > at org.apache.spark.sql.SQLContext.load(SQLContext.scala:1203)
> >
> > at her.recommender.getDataframe(her.recommender.scala:45)
> >
> > at her.recommender.getRecommendations(her.recommender.scala:60)
> >
> >
> > I'm assuming this has to do with mysql-connector because this is the
> problem I run into when I'm working with spark-shell and I forget to
> include my classpath with my mysql-connect jar file.
> >
> > I've tried:
> >   • Using different versions of mysql-connector-java in my build.sbt
> file
> >   • Copying the connector jar to my_project/src/main/lib
> >   • Copying the connector jar to my_project/lib <-- (this is where I
> keep my build.sbt)
> > Everything loads fine and works, except my call that does
> "sqlContext.load("jdbc", myOptions)".  I know this is a total newbie
> question but in my defense, I'm fairly new to Scala, and this is my first
> go at deploying a fat jar with sbt-assembly.
> >
> > Thanks for any advice!
> >
> > --
> > David Yerrington
> > yerrington.net
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Fat jar can't find jdbc

2015-12-22 Thread Igor Berman
imho, if you succeeded to fetch something from your mysql with same jar in
classpath, then Manifest is ok and you indeed should look at your spark sql
- jdbc configs

On 22 December 2015 at 12:21, David Yerrington <da...@yerrington.net> wrote:

> Igor, I think it's available.  After I extract the jar file, I see a
> directory with class files that look very relevant in "/com/mysql/jdbc".
>
> After reading this, I started to wonder if MySQL connector was really the
> problem.  Perhaps it's something to do with SQLcontext?  I just wired a
> test endpoint to run a very basic mysql query, outside of Spark, and it
> worked just fine (yay!).  I copied and pasted this example to verify my
> MySQL connector availability, and it worked just fine:
> https://mkaz.github.io/2011/05/27/using-scala-with-jdbc-to-connect-to-mysql/
>
> As far as the Maven manifest goes, I'm really not sure.  I will research
> it though.  Now I'm wondering if my mergeStrategy is to blame?  I'm going
> to try there next.
>
> Thank you for the help!
>
> On Tue, Dec 22, 2015 at 1:18 AM, Igor Berman <igor.ber...@gmail.com>
> wrote:
>
>> David, can you verify that mysql connector classes indeed in your single
>> jar?
>> open it with zip tool available at your platform
>>
>> another options that might be a problem - if there is some dependency in
>> MANIFEST(not sure though this is the case of mysql connector) then it might
>> be broken after preparing single jar
>> so you need to verify that it's ok(in maven usually it's possible to
>> define merging policy for resources while creating single jar)
>>
>> On 22 December 2015 at 10:04, Vijay Kiran <m...@vijaykiran.com> wrote:
>>
>>> Can you paste your libraryDependencies from build.sbt ?
>>>
>>> ./Vijay
>>>
>>> > On 22 Dec 2015, at 06:12, David Yerrington <da...@yerrington.net>
>>> wrote:
>>> >
>>> > Hi Everyone,
>>> >
>>> > I'm building a prototype that fundamentally grabs data from a MySQL
>>> instance, crunches some numbers, and then moves it on down the pipeline.
>>> I've been using SBT with assembly tool to build a single jar for deployment.
>>> >
>>> > I've gone through the paces of stomping out many dependency problems
>>> and have come down to one last (hopefully) zinger.
>>> >
>>> > java.lang.ClassNotFoundException: Failed to load class for data
>>> source: jdbc.
>>> >
>>> > at
>>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.lookupDataSource(ResolvedDataSource.scala:67)
>>> >
>>> > at
>>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:87)
>>> >
>>> > at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
>>> >
>>> > at org.apache.spark.sql.SQLContext.load(SQLContext.scala:1203)
>>> >
>>> > at her.recommender.getDataframe(her.recommender.scala:45)
>>> >
>>> > at her.recommender.getRecommendations(her.recommender.scala:60)
>>> >
>>> >
>>> > I'm assuming this has to do with mysql-connector because this is the
>>> problem I run into when I'm working with spark-shell and I forget to
>>> include my classpath with my mysql-connect jar file.
>>> >
>>> > I've tried:
>>> >   • Using different versions of mysql-connector-java in my
>>> build.sbt file
>>> >   • Copying the connector jar to my_project/src/main/lib
>>> >   • Copying the connector jar to my_project/lib <-- (this is where
>>> I keep my build.sbt)
>>> > Everything loads fine and works, except my call that does
>>> "sqlContext.load("jdbc", myOptions)".  I know this is a total newbie
>>> question but in my defense, I'm fairly new to Scala, and this is my first
>>> go at deploying a fat jar with sbt-assembly.
>>> >
>>> > Thanks for any advice!
>>> >
>>> > --
>>> > David Yerrington
>>> > yerrington.net
>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>
>
> --
> David Yerrington
> yerrington.net
>


Re: fishing for help!

2015-12-21 Thread Igor Berman
look for differences: packages versions, cpu/network/memory diff etc etc


On 21 December 2015 at 14:53, Eran Witkon  wrote:

> Hi,
> I know it is a wide question but can you think of reasons why a pyspark
> job which runs on from server 1 using user 1 will run faster then the same
> job when running on server 2 with user 1
> Eran
>


Re: Spark with log4j

2015-12-21 Thread Igor Berman
I think log4j.properties that are under conf dir are those that are
relevant for workers jvms and not the one that you pack withing your jar


On 21 December 2015 at 14:07, Kalpesh Jadhav 
wrote:

> Hi Ted,
>
>
>
> Thanks for your response, But it doesn’t solve my issue.
>
> Still print logs on console only.
>
>
>
>
>
> Thanks,
>
> Kalpesh Jadhav.
>
>
>
> *From:* Ted Yu [mailto:yuzhih...@gmail.com]
> *Sent:* Friday, December 18, 2015 9:15 PM
> *To:* Kalpesh Jadhav
> *Cc:* user
> *Subject:* Re: Spark with log4j
>
>
>
> See this thread:
>
> http://search-hadoop.com/m/q3RTtEor1vYWbsW
>
>
>
> which mentioned:
>
> SPARK-11105 Disitribute the log4j.properties files from the client to the
> executors
>
>
>
> FYI
>
>
>
> On Fri, Dec 18, 2015 at 7:23 AM, Kalpesh Jadhav <
> kalpesh.jad...@citiustech.com> wrote:
>
> Hi all,
>
>
>
> I am new to spark, I am trying to use log4j for logging my application.
>
> But any how the logs are not getting written at specified file.
>
>
>
> I have created application using maven, and kept log.properties file at
> resources folder.
>
> Application written in scala .
>
>
>
> If there is any alternative instead of log4j then also it will work, but I
> wanted to see logs in file.
>
>
>
> If any changes need to be done in hortonworks
> 
> for spark configuration, please mentioned that as well.
>
>
>
> If anyone has done before or on github any source available please respond.
>
>
>
>
>
> Thanks,
>
> Kalpesh Jadhav
>
> ===
> DISCLAIMER: The information contained in this message (including any
> attachments) is confidential and may be privileged. If you have received it
> by mistake please notify the sender by return e-mail and permanently delete
> this message and any attachments from your system. Any dissemination, use,
> review, distribution, printing or copying of this message in whole or in
> part is strictly prohibited. Please note that e-mails are susceptible to
> change. CitiusTech shall not be liable for the improper or incomplete
> transmission of the information contained in this communication nor for any
> delay in its receipt or damage to your system. CitiusTech does not
> guarantee that the integrity of this communication has been maintained or
> that this communication is free of viruses, interceptions or interferences.
> 
>
>
>
> ===
> DISCLAIMER: The information contained in this message (including any
> attachments) is confidential and may be privileged. If you have received it
> by mistake please notify the sender by return e-mail and permanently delete
> this message and any attachments from your system. Any dissemination, use,
> review, distribution, printing or copying of this message in whole or in
> part is strictly prohibited. Please note that e-mails are susceptible to
> change. CitiusTech shall not be liable for the improper or incomplete
> transmission of the information contained in this communication nor for any
> delay in its receipt or damage to your system. CitiusTech does not
> guarantee that the integrity of this communication has been maintained or
> that this communication is free of viruses, interceptions or interferences.
> 
>
>


Re: NPE in using AvroKeyValueInputFormat for newAPIHadoopFile

2015-12-16 Thread Igor Berman
check version compatibility
I think avro lib should be 1.7.4
check that no other lib brings transitive dependency of other avro version


On 16 December 2015 at 09:44, Jinyuan Zhou  wrote:

> Hi, I tried to load avro files in hdfs but keep getting NPE.
>  I am using AvroKeyValueInputFormat inside newAPIHadoopFile method. Anyone
> have any clue? Here is stack trace
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 4 in stage 0.0 failed 4 times, most recent
> failure: Lost task 4.3 in stage 0.0 (TID 11, xyz.abc.com):
> java.lang.NullPointerException
>
> at org.apache.avro.Schema.getAliases(Schema.java:1415)
>
> at org.apache.avro.Schema.getAliases(Schema.java:1429)
>
> at org.apache.avro.Schema.applyAliases(Schema.java:1340)
>
> at
> org.apache.avro.generic.GenericDatumReader.getResolver(GenericDatumReader.java:125)
>
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:140)
>
> at org.apache.avro.file.DataFileStream.next(DataFileStream.java:233)
>
> at
> org.apache.avro.mapreduce.AvroRecordReaderBase.nextKeyValue(AvroRecordReaderBase.java:118)
>
> at
> org.apache.avro.mapreduce.AvroKeyValueRecordReader.nextKeyValue(AvroKeyValueRecordReader.java:62)
>
> at
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
>
> at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
> at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1626)
>
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099)
>
> at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099)
>
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
>
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
>
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
> at java.lang.Thread.run(Thread.java:744)
>
>
> Thanks,
>
> Jack
> Jinyuan (Jack) Zhou
>


Re: Preventing an RDD from shuffling

2015-12-16 Thread Igor Berman
imho, you should implement your own rdd with mongo sharding awareness, then
this rdd will have this mongo aware partitioner, and then incoming data
will be partitioned by this partitioner in join
not sure if it's simple task...but you have to get partitioner in you mongo
rdd.

On 16 December 2015 at 12:23, sparkuser2345  wrote:

> Is there a way to prevent an RDD from shuffling in a join operation without
> repartitioning it?
>
> I'm reading an RDD from sharded MongoDB, joining that with an RDD of
> incoming data (+ some additional calculations), and writing the resulting
> RDD back to MongoDB. It would make sense to shuffle only the incoming data
> RDD so that the joined RDD would already be partitioned correctly according
> to the MondoDB shard key.
>
> I know I can prevent an RDD from shuffling in a join operation by
> partitioning it beforehand but partitioning would already shuffle the RDD.
> In addition, I'm only doing the join once per RDD read from MongoDB. Is
> there a way to tell Spark to shuffle only the incoming data RDD?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Preventing-an-RDD-from-shuffling-tp25717.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Need Help Diagnosing/operating/tuning

2015-11-23 Thread Igor Berman
you should check why executor is killed. as soon as it's killed you can get
all kind of strange exceptions...
either give your executors more memory(4G is rather small for spark )
or try to decrease your input or maybe split it into more partitions in
input format
23G in lzo might expand to x? in memory - it depends on your format

in general each executor has 4G of memory, when only part of it is used for
caching/shuffling(see spark configuration of diff fraction params
then you should divide this memory to number of cores in each executor
then you can understand approx what is your partition size...you can make
this arithmetic opposite way from size of partition to memory needed by
each executor

no point to make 300 retries...there is no magic in spark...if it fails
after 3 retry it will fail...

ui metrics can give you hints regarding partition size etc

On 23 November 2015 at 03:30, Jeremy Davis  wrote:

> It seems like the problem is related to —executor-cores. Is there possibly
> some sort of race condition when using multiple cores per executor?
>
>
> On Nov 22, 2015, at 12:38 PM, Jeremy Davis  wrote:
>
>
> Hello,
> I’m at a loss trying to diagnose why my spark job is failing. (works fine
> on small data)
> It is failing during the repartition, or on the subsequent steps.. which
> then seem to fail and fall back to repartitioning..
> I’ve tried adjusting every parameter I can find, but have had no success.
> Input is only 23GB of LZO )probably 8x compression), and I’ve verified all
> files are valid (not corrupted).
> I’ve tried more and less of : memory, partitions, executors, cores...
> I’ve set maxFailures up to 300.
> Setting 4GB heap usually makes it through repartitioning, but fails on
> subsequent steps (Sometimes being killed from running past memory limits).
> Larger Heaps usually don’t even make it through the first repartition due
> to all kinds of weird errors that look like read errors...
>
> I’m at a loss on how to debug this thing.
> Is there a tutorial somewhere?
>
> ——
>
>
> Spark 1.4.1
> Java 7
> Cluster has 3TB of memory, and 400 cores.
>
>
> Here are a collection of exceptions
>
>
> java.io.FileNotFoundException: 
> /var/storage/sdd3/nm-local/usercache/jeremy/appcache/application_1447722466442_1649/blockmgr-9ed5583f-cac1-4701-9f70-810c215b954f/13/shuffle_0_5_0.data
>  (No such file or directory)
>   at java.io.FileOutputStream.open(Native Method)
>   at java.io.FileOutputStream.(FileOutputStream.java:221)
>   at 
> org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:128)
>   at 
> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:215)
>   at 
> org.apache.spark.util.collection.ChainedBuffer.read(ChainedBuffer.scala:56)
>   at 
> org.apache.spark.util.collection.PartitionedSerializedPairBuffer$$anon$2.writeNext(PartitionedSerializedPairBuffer.scala:137)
>   at 
> org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:757)
>   at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>   at org.apache.spark.scheduler.Task.run(Task.scala:70)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:744)
>
>
>
>
> java.lang.InternalError: lzo1x_decompress_safe returned: -6
>   at 
> com.hadoop.compression.lzo.LzoDecompressor.decompressBytesDirect(Native 
> Method)
>   at 
> com.hadoop.compression.lzo.LzoDecompressor.decompress(LzoDecompressor.java:315)
>   at 
> com.hadoop.compression.lzo.LzopDecompressor.decompress(LzopDecompressor.java:122)
>   at 
> com.hadoop.compression.lzo.LzopInputStream.decompress(LzopInputStream.java:247)
>   at 
> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
>   at java.io.InputStream.read(InputStream.java:101)
>   at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
>   at 
> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
>   at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
>   at 
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209)
>   at 
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
>   at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:248)
>   at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:216)
>   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>   at 
> 

Re: newbie: unable to use all my cores and memory

2015-11-20 Thread Igor Berman
u've asked total cores to be 2 + 1 for driver(since you are running in
cluster mode, so it's running on one of the slaves)
change total cores to be 3*2
change submit mode to be client - you'll have full utilization
(btw it's not advisable to use all cores of slave...since there is OS
processes and other processes...)

On 20 November 2015 at 02:02, Andy Davidson 
wrote:

> I am having a heck of a time figuring out how to utilize my cluster
> effectively. I am using the stand alone cluster manager. I have a master
> and 3 slaves. Each machine has 2 cores.
>
> I am trying to run a streaming app in cluster mode and pyspark at the same
> time.
>
> t1) On my console I see
>
> * Alive Workers: 3
> * Cores in use: 6 Total, 0 Used
> * Memory in use: 18.8 GB Total, 0.0 B Used
> * Applications: 0 Running, 15 Completed
> * Drivers: 0 Running, 2 Completed
> * Status: ALIVE
>
> t2) I start my streaming app
>
> $SPARK_ROOT/bin/spark-submit \
> --class "com.pws.spark.streaming.IngestDriver" \
> --master $MASTER_URL \
> --total-executor-cores 2 \
> --deploy-mode cluster \
> $jarPath --clusterMode  $*
>
> t3) on my console I see
>
> * Alive Workers: 3
> * Cores in use: 6 Total, 3 Used
> * Memory in use: 18.8 GB Total, 13.0 GB Used
> * Applications: 1 Running, 15 Completed
> * Drivers: 1 Running, 2 Completed
> * Status: ALIVE
>
> Looks like pyspark should be able to use the 3 remaining cores and 5.8 GB
> of memory
>
> t4) I start pyspark
>
> export PYSPARK_PYTHON=python3.4
> export PYSPARK_DRIVER_PYTHON=python3.4
> export IPYTHON_OPTS="notebook --no-browser --port=7000
> --log-level=WARN"
>
> $SPARK_ROOT/bin/pyspark --master $MASTER_URL
> --total-executor-cores 3
> --executor-memory 2g
>
> t5) on my console I see
>
> * Alive Workers: 3
> * Cores in use: 6 Total, 4 Used
> * Memory in use: 18.8 GB Total, 15.0 GB Used
> * Applications: 2 Running, 18 Completed
> * Drivers: 1 Running, 2 Completed
> * Status: ALIVE
>
>
> I have 2 unused cores and a lot of memory left over. My pyspark
> application is going getting 1 core. If streaming app is not running
> pyspark would be assigned 2 cores each on a different worker. I have tried
> using various combinations of --executor-cores and --total-executor-cores.
> Any idea how to get pyspark to use more cores and memory?
>
>
> Kind regards
>
> Andy
>
> P.s.  Using different values I have wound up with  pyspark status ==
> ³waiting² I think this is because there are not enough cores available?
>
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Configuring Log4J (Spark 1.5 on EMR 4.1)

2015-11-20 Thread Igor Berman
try to assemble log4j.xml or log4j.properties in your jar...probably you'll
get what you want, however pay attention that when you'll move to multinode
cluster - there will be difference

On 20 November 2015 at 05:10, Afshartous, Nick 
wrote:

>
> < log4j.properties file only exists on the master and not the slave nodes,
> so you are probably running into
> https://issues.apache.org/jira/browse/SPARK-11105, which has already been
> fixed in the not-yet-released Spark 1.6.0. EMR will upgrade to Spark 1.6.0
> once it is released.
>
> Thanks for the info, though this is a single-node cluster so that can't be
> the cause of the error (which is in the driver log).
> --
>   Nick
> 
> From: Jonathan Kelly [jonathaka...@gmail.com]
> Sent: Thursday, November 19, 2015 6:45 PM
> To: Afshartous, Nick
> Cc: user@spark.apache.org
> Subject: Re: Configuring Log4J (Spark 1.5 on EMR 4.1)
>
> This file only exists on the master and not the slave nodes, so you are
> probably running into https://issues.apache.org/jira/browse/SPARK-11105,
> which has already been fixed in the not-yet-released Spark 1.6.0. EMR will
> upgrade to Spark 1.6.0 once it is released.
>
> ~ Jonathan
>
> On Thu, Nov 19, 2015 at 1:30 PM, Afshartous, Nick  > wrote:
>
> Hi,
>
> On Spark 1.5 on EMR 4.1 the message below appears in stderr in the Yarn UI.
>
>   ERROR StatusLogger No log4j2 configuration file found. Using default
> configuration: logging only errors to the console.
>
> I do see that there is
>
>/usr/lib/spark/conf/log4j.properties
>
> Can someone please advise on how to setup log4j properly.
>
> Thanks,
> --
>   Nick
>
> Notice: This communication is for the intended recipient(s) only and may
> contain confidential, proprietary, legally protected or privileged
> information of Turbine, Inc. If you are not the intended recipient(s),
> please notify the sender at once and delete this communication.
> Unauthorized use of the information in this communication is strictly
> prohibited and may be unlawful. For those recipients under contract with
> Turbine, Inc., the information in this communication is subject to the
> terms and conditions of any applicable contracts or agreements.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org user-unsubscr...@spark.apache.org>
> For additional commands, e-mail: user-h...@spark.apache.org user-h...@spark.apache.org>
>
>
>
> Notice: This communication is for the intended recipient(s) only and may
> contain confidential, proprietary, legally protected or privileged
> information of Turbine, Inc. If you are not the intended recipient(s),
> please notify the sender at once and delete this communication.
> Unauthorized use of the information in this communication is strictly
> prohibited and may be unlawful. For those recipients under contract with
> Turbine, Inc., the information in this communication is subject to the
> terms and conditions of any applicable contracts or agreements.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: status of slaves in standalone cluster rest/rpc call

2015-11-09 Thread Igor Berman
further reading code of MasterPage gave me what I want:
http://:8080/json returns json view of all info presented in
main page

On 9 November 2015 at 22:41, Igor Berman <igor.ber...@gmail.com> wrote:

> Hi,
> How do I get status of workers(slaves) from driver?
> why I need it - I want to autoscale new workers and want to poll status of
> cluster(e.g. number of alive slaves connected) so that I'll submit job only
> after expected number of slaves joined cluster
>
>
> I've found MasterPage class that produces ui html status page
> Is there any possibility to get it with rest call or somehow to get
> actorSystem(rpcEndpoint) and get this status from it?
>
> thanks in advance.
>
>
>


status of slaves in standalone cluster rest/rpc call

2015-11-09 Thread Igor Berman
Hi,
How do I get status of workers(slaves) from driver?
why I need it - I want to autoscale new workers and want to poll status of
cluster(e.g. number of alive slaves connected) so that I'll submit job only
after expected number of slaves joined cluster


I've found MasterPage class that produces ui html status page
Is there any possibility to get it with rest call or somehow to get
actorSystem(rpcEndpoint) and get this status from it?

thanks in advance.


Re: Whether Spark is appropriate for our use case.

2015-11-07 Thread Igor Berman
1. if you have join by some specific field(e.g. user id or account-id or
whatever) you may try to partition parquet file by this field and then join
will be more efficient.
2. you need to see in spark metrics what is performance of particular join,
how much partitions is there, what is shuffle size...in general - tune for
the shuffle performance(e.g. shuffle fraction)

On 21 October 2015 at 00:29, Aliaksei Tsyvunchyk 
wrote:

> Hello all community members,
>
> I need opinion of people who was using Spark before and can share there
> experience to help me select technical approach.
> I have a project in Proof Of Concept phase, where we are evaluating
> possibility of Spark usage for our use case.
> Here is brief task description.
> We should process big amount of raw data to calculate ratings. We have
> different type of textual source data. This is just text lines which
> represents different type of input data (we call them type 20, type 24,
> type 26, type 33, etc).
> To perform calculations we should make joins between diffrerent type of
> raw data - event records (which represents actual user action) and users
> description records (which represents person which performs action) and
> sometimes with userGroup record (we group all users by some criteria).
> All ratings are calculated on daily basis and our dataset could be
> partitioned by date (except probably reference data).
>
>
> So we have tried to implement it using possibly most obvious way, we parse
> text file, store data in parquet format and trying to use sparkSQL to query
> data and perform calculation.
> Experimenting with sparkSQL I’ve noticed that SQL query speed decreased
> proportionally to data size growth. Base on this I assume that SparkSQL
> performs full records scan while servicing my SQL queries.
>
> So here are the questions I’m trying to find answers:
> 1.  Is parquet format appropriate for storing data in our case (to
> efficiently query data)? Could it be more suitable to have some DB as
> storage which could filter data efficiently before it gets to Spark
> processing engine ?
> 2.  For now we assume that joins we are doing for calculations slowing
> down execution. As alternatives we consider denormalizing data and join it
> on parsing phase, but this increase data volume Spark should handle (due to
> the duplicates we will get). Is it valid approach? Would it be better if we
> create 2 RDD, from Parquet files filter them out and next join without
> sparkSQL involvement?  Or joins in SparkSQL are fine and we should look for
> performance bottlenecks in different place?
> 3.  Should we look closer on Cloudera Impala? As I know it is working over
> the same parquet files and I’m wondering whether it gives better
> performance for data querying ?
> 4.  90% of results we need could be pre-calculated since they are not
> change after one day of data is loaded. So I think it makes sense to keep
> this pre-calculated data in some DB storage which give me best performance
> while querying by key. Now I’m consider to use Cassandra for this purpose
> due to it’s perfect scalability and performance. Could somebody provide any
> other options we can consider ?
>
> Thanks in Advance,
> Any opinion will be helpful and greatly appreciated
> --
>
>
> CONFIDENTIALITY NOTICE: This email and files attached to it are
> confidential. If you are not the intended recipient you are hereby notified
> that using, copying, distributing or taking any action in reliance on the
> contents of this information is strictly prohibited. If you have received
> this email in error please notify the sender and delete this email.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Efficient approach to store an RDD as a file in HDFS and read it back as an RDD?

2015-11-05 Thread Igor Berman
Hi,
we are using avro with compression(snappy). As soon as you have enough
partitions, the saving won't be a problem imho.
in general hdfs is pretty fast, s3 is less so
the issue with storing data is that you will loose your partitioner(even
though rdd has it) at loading moment. There is PR that tries to solve this.


On 5 November 2015 at 01:09, swetha  wrote:

> Hi,
>
> What is the efficient approach to save an RDD as a file in HDFS and
> retrieve
> it back? I was thinking between Avro, Parquet and SequenceFileFormart. We
> currently use SequenceFileFormart for one of our use cases.
>
> Any example on how to store and retrieve an RDD in an Avro and Parquet file
> formats would be of great help.
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-approach-to-store-an-RDD-as-a-file-in-HDFS-and-read-it-back-as-an-RDD-tp25279.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Efficient approach to store an RDD as a file in HDFS and read it back as an RDD?

2015-11-05 Thread Igor Berman
java/scala? I think there is everything in dataframes tutorial
*e.g. if u have dataframe and working from java - toJavaRDD
<https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/sql/DataFrame.html#toJavaRDD()>*
()

On 5 November 2015 at 21:13, swetha kasireddy <swethakasire...@gmail.com>
wrote:

> How to convert a parquet file that is saved in hdfs to an RDD after
> reading the file from hdfs?
>
> On Thu, Nov 5, 2015 at 10:02 AM, Igor Berman <igor.ber...@gmail.com>
> wrote:
>
>> Hi,
>> we are using avro with compression(snappy). As soon as you have enough
>> partitions, the saving won't be a problem imho.
>> in general hdfs is pretty fast, s3 is less so
>> the issue with storing data is that you will loose your partitioner(even
>> though rdd has it) at loading moment. There is PR that tries to solve this.
>>
>>
>> On 5 November 2015 at 01:09, swetha <swethakasire...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> What is the efficient approach to save an RDD as a file in HDFS and
>>> retrieve
>>> it back? I was thinking between Avro, Parquet and SequenceFileFormart. We
>>> currently use SequenceFileFormart for one of our use cases.
>>>
>>> Any example on how to store and retrieve an RDD in an Avro and Parquet
>>> file
>>> formats would be of great help.
>>>
>>> Thanks,
>>> Swetha
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Efficient-approach-to-store-an-RDD-as-a-file-in-HDFS-and-read-it-back-as-an-RDD-tp25279.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Storing Compressed data in HDFS into Spark

2015-10-22 Thread Igor Berman
check spark.rdd.compress

On 19 October 2015 at 21:13, ahaider3  wrote:

> Hi,
> A lot of the data I have in HDFS is compressed. I noticed when I load this
> data into spark and cache it, Spark unrolls the data like normal but stores
> the data uncompressed in memory. For example, suppose /data/ is an RDD with
> compressed partitions on HDFS. I then cache the data. When I call
> /data.count()/, the data is rightly decompressed since it needs to find the
> value of /.count()/. But, the data that is cached is also decompressed. Can
> a partition be compressed in spark? I know spark allows for data to be
> compressed, after serialization. But what if, I only want the partitions
> compressed.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Storing-Compressed-data-in-HDFS-into-Spark-tp25123.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark 1.5.1+Hadoop2.6 .. unable to write to S3 (HADOOP-12420)

2015-10-22 Thread Igor Berman
many use it.
how do you add aws sdk to classpath?
check in environment ui what is in cp.
you should make sure that in your cp the version is compatible with one
that spark compiled with
I think 1.7.4 is compatible(at least we use it)

make sure that you don't get other versions from other transitive
dependencies(effective pom)

On 22 October 2015 at 17:12, Ashish Shrowty 
wrote:

> I understand that there is some incompatibility with the API between Hadoop
> 2.6/2.7 and Amazon AWS SDK where they changed a signature of
>
> com.amazonaws.services.s3.transfer.TransferManagerConfiguration.setMultipartUploadThreshold.
> The JIRA indicates that this would be fixed in Hadoop 2.8.
> (https://issues.apache.org/jira/browse/HADOOP-12420)
>
> My question is - what are people doing today to access S3? I am unable to
> find an older JAR of the AWS SDK to test with.
>
> Thanks,
> Ashish
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-5-1-Hadoop2-6-unable-to-write-to-S3-HADOOP-12420-tp25163.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark straggle task

2015-10-20 Thread Igor Berman
We know that the JobScheduler have the function to assign the straggle task
to another node. - only if you enable and configure spark.speculation

On 20 October 2015 at 15:20, Triones,Deng(vip.com)  wrote:

> Hi All
>
> We run an application with version 1.4.1 standalone mode. We saw two tasks
> in one stage which runs very slow seems it is hang. We know that the
> JobScheduler have the function to assign the straggle task to another node.
> But what we saw it does not reassign. So we want to know is there anyone
> know that is there ant para to open this function.
> What's more, we saw that these two straggle tasks hangs at socket read. Is
> it because it cannot be interrupted so the reassign function does not work,
> the Thread stack as below:
>
>
> java.net.SocketInputStream.socketRead0(Native Method)
> java.net.SocketInputStream.read(SocketInputStream.java:152)
> java.net.SocketInputStream.read(SocketInputStream.java:122)
> java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
> java.io.BufferedInputStream.read(BufferedInputStream.java:254)
> org.apache.commons.httpclient.HttpParser.readRawLine(HttpParser.java:78)
> org.apache.commons.httpclient.HttpParser.readLine(HttpParser.java:106)
>
> org.apache.commons.httpclient.HttpConnection.readLine(HttpConnection.java:1116)
>
> org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$HttpConnectionAdapter.readLine(MultiThreadedHttpConnectionManager.java:1413)
>
> org.apache.commons.httpclient.HttpMethodBase.readStatusLine(HttpMethodBase.java:1973)
>
> org.apache.commons.httpclient.HttpMethodBase.readResponse(HttpMethodBase.java:1735)
>
> org.apache.commons.httpclient.HttpMethodBase.execute(HttpMethodBase.java:1098)
>
> org.apache.commons.httpclient.HttpMethodDirector.executeWithRetry(HttpMethodDirector.java:398)
>
> org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:171)
> org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
> org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
> com.vip.logview.tsdb.HttpClientUtils.postHandler(HttpClientUtils.java:80)
>
> com.vip.logview.function.DomainHostStatusSaveFunction$1.call(DomainHostStatusSaveFunction.java:75)
>
> com.vip.logview.function.DomainHostStatusSaveFunction$1.call(DomainHostStatusSaveFunction.java:30)
>
> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:219)
>
> org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:219)
>
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:878)
>
> org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:878)
>
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
>
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> org.apache.spark.scheduler.Task.run(Task.scala:70)
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> java.lang.Thread.run(Thread.java:745)
>
>
> Thanks
>
>
>
> 本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
> This communication is intended only for the addressee(s) and may contain
> information that is privileged and confidential. You are hereby notified
> that, if you are not an intended recipient listed above, or an authorized
> employee or agent of an addressee of this communication responsible for
> delivering e-mail messages to an intended recipient, any dissemination,
> distribution or reproduction of this communication (including any
> attachments hereto) is strictly prohibited. If you have received this
> communication in error, please notify us immediately by a reply e-mail
> addressed to the sender and permanently delete the original e-mail
> communication and any attachments from all storage devices without making
> or otherwise retaining a copy.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: In-memory computing and cache() in Spark

2015-10-19 Thread Igor Berman
Does ur iterations really submit job? I dont see any action there
On Oct 17, 2015 00:03, "Jia Zhan"  wrote:

> Hi all,
>
> I am running Spark locally in one node and trying to sweep the memory size
> for performance tuning. The machine has 8 CPUs and 16G main memory, the
> dataset in my local disk is about 10GB. I have several quick questions and
> appreciate any comments.
>
> 1. Spark performs in-memory computing, but without using RDD.cache(), will
> anything be cached in memory at all? My guess is that, without RDD.cache(),
> only a small amount of data will be stored in OS buffer cache, and every
> iteration of computation will still need to fetch most data from disk every
> time, is that right?
>
> 2. To evaluate how caching helps with iterative computation, I wrote a
> simple program as shown below, which basically consists of one saveAsText()
> and three reduce() actions/stages. I specify "spark.driver.memory" to
> "15g", others by default. Then I run three experiments.
>
> *   val* *conf* = *new* *SparkConf*().setAppName(*"wordCount"*)
>
>*val* *sc* = *new* *SparkContext*(conf)
>
>*val* *input* = sc.textFile(*"/InputFiles"*)
>
>   *val* *words* = input.flatMap(line *=>* line.split(*" "*)).map(word
> *=>* (word, *1*)).reduceByKey(_+_).saveAsTextFile(*"/OutputFiles"*)
>
>   *val* *ITERATIONS* = *3*
>
>   *for* (i *<-* *1* to *ITERATIONS*) {
>
>   *val* *totallength* = input.filter(line*=>*line.contains(*"the"*
> )).map(s*=>*s.length).reduce((a,b)*=>*a+b)
>
>   }
>
> (I) The first run: no caching at all. The application finishes in ~12
> minutes (2.6min+3.3min+3.2min+3.3min)
>
> (II) The second run, I modified the code so that the input will be cached:
>  *val input = sc.textFile("/InputFiles").cache()*
>  The application finishes in ~11 mins!! (5.4min+1.9min+1.9min+2.0min)!
>  The storage page in Web UI shows 48% of the dataset  is cached, which
> makes sense due to large java object overhead, and
> spark.storage.memoryFraction is 0.6 by default.
>
> (III) However, the third run, same program as the second one, but I
> changed "spark.driver.memory" to be "2g".
>The application finishes in just 3.6 minutes (3.0min + 9s + 9s + 9s)!!
> And UI shows 6% of the data is cached.
>
> *From the results we can see the reduce stages finish in seconds, how
> could that happen with only 6% cached? Can anyone explain?*
>
> I am new to Spark and would appreciate any help on this. Thanks!
>
> Jia
>
>
>
>


Re: our spark gotchas report while creating batch pipeline

2015-10-18 Thread Igor Berman
thanks Ted :)


On 18 October 2015 at 19:07, Ted Yu  wrote:

> Interesting reading material.
>
> bq. transformations that loose partitioner
>
> lose partitioner
>
> bq. Spark looses the partitioner
>
> loses the partitioner
>
> bq. Tunning number of partitions
>
> Should be tuning.
>
> bq. or increase shuffle fraction
> bq. ShuffleMemoryManager: Thread 61 ...
>
> Hopefully SPARK-1 would alleviate the above situation.
>
> Cheers
>
> On Sun, Oct 18, 2015 at 8:51 AM, igor.berman 
> wrote:
>
>> might be somebody will find it useful
>> goo.gl/0yfvBd
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/our-spark-gotchas-report-while-creating-batch-pipeline-tp25112.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Question about GraphX connected-components

2015-10-10 Thread Igor Berman
let's start from some basics: might be u need to split your data into more
partitions?
spilling depends on your configuration when you create graph(look for
storage level param) and your global configuration.
in addition, you assumption of 64GB/100M is probably wrong, since spark
divides memory into 3 regions - for in memory caching, for shuffling and
for "workspace" of serialization/deserialization etc see fraction
parameters.

so depending on number of your partitions might be worker will try to
ingest too much data at once(#cores * memory pressure of one task per one
partition)

there is no such thing as "right" configuration. It depends on your
application. You can post your configuration and people will suggest some
tunning, still best way is to try what is best for ur case depending on
what u see in spark ui metrics(as starting point)

On 10 October 2015 at 00:13, John Lilley  wrote:

> Greetings,
>
> We are looking into using the GraphX connected-components algorithm on
> Hadoop for grouping operations.  Our typical data is on the order of
> 50-200M vertices with an edge:vertex ratio between 2 and 30.  While there
> are pathological cases of very large groups, they tend to be small.  I am
> trying to get a handle on the level of performance and scaling we should
> expect, and how to best configure GraphX/Spark to get there.  After some
> trying, we cannot get to 100M vertices/edges without running out of memory
> on a small cluster (8 nodes with 4 cores and 8GB available for YARN on each
> node).  This limit seems low, as 64GB/100M is 640 bytes per vertex, which
> should be enough.  Is this within reason?  Does anyone have sample they can
> share that has the right configurations for succeeding with this size of
> data and cluster?  What level of performance should we expect?  What
> happens when the data set exceed memory, does it spill to disk “nicely” or
> degrade catastrophically?
>
>
>
> Thanks,
>
> *John Lilley*
>
>
>


Re: Issue with the class generated from avro schema

2015-10-09 Thread Igor Berman
I think there is deepCopy method of generated avro classes.

On 9 October 2015 at 23:32, Bartłomiej Alberski <albers...@gmail.com> wrote:

> I knew that one possible solution will be to map loaded object into
> another class just after reading from HDFS.
> I was looking for solution enabling reuse of avro generated classes.
> It could be useful in situation when your record have more 22 records,
> because you do not need to write boilerplate code for mapping from and to
> the class,  i.e loading class as instance of class generated from avro,
> updating some fields, removing duplicates, and saving those results with
> exactly the same schema.
>
> Thank you for the answer, at least I know that there is no way to make it
> works.
>
>
> 2015-10-09 20:19 GMT+02:00 Igor Berman <igor.ber...@gmail.com>:
>
>> u should create copy of your avro data before working with it, i.e. just
>> after loadFromHDFS map it into new instance that is deap copy of the object
>> it's connected to the way spark/avro reader reads avro files(it reuses
>> some buffer or something)
>>
>> On 9 October 2015 at 19:05, alberskib <albers...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I have piece of code written in spark that loads data from HDFS into java
>>> classes generated from avro idl. On RDD created in that way I am
>>> executing
>>> simple operation which results depends on fact whether I cache RDD
>>> before it
>>> or not i.e if I run code below
>>>
>>> val loadedData = loadFromHDFS[Data](path,...)
>>> println(loadedData.map(x => x.getUserId + x.getDate).distinct().count())
>>> //
>>> 20
>>> program will print 20, on the other hand executing next code
>>>
>>> val loadedData = loadFromHDFS[Data](path,...).cache()
>>> println(loadedData.map(x => x.getUserId + x.getDate).distinct().count())
>>> //
>>> 1
>>> result in 1 printed to stdout.
>>>
>>> When I inspect values of the fields after reading cached data it seems
>>>
>>> I am pretty sure that root cause of described problem is issue with
>>> serialization of classes generated from avro idl, but I do not know how
>>> to
>>> resolve it. I tried to use Kryo, registering generated class (Data),
>>> registering different serializers from chill_avro for given class
>>> (SpecificRecordSerializer, SpecificRecordBinarySerializer, etc), but
>>> none of
>>> those ideas helps me.
>>>
>>> I post exactly the same question on stackoverflow but I did not receive
>>> any
>>> repsponse.  link
>>> <
>>> http://stackoverflow.com/questions/33027851/spark-issue-with-the-class-generated-from-avro-schema
>>> >
>>>
>>> What is more I created minimal working example, thanks to which it will
>>> be
>>> easy to reproduce problem.
>>> link <https://github.com/alberskib/spark-avro-serialization-issue>
>>>
>>> How I can solve this problem?
>>>
>>>
>>> Thanks,
>>> Bartek
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Issue-with-the-class-generated-from-avro-schema-tp24997.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Issue with the class generated from avro schema

2015-10-09 Thread Igor Berman
u should create copy of your avro data before working with it, i.e. just
after loadFromHDFS map it into new instance that is deap copy of the object
it's connected to the way spark/avro reader reads avro files(it reuses some
buffer or something)

On 9 October 2015 at 19:05, alberskib  wrote:

> Hi all,
>
> I have piece of code written in spark that loads data from HDFS into java
> classes generated from avro idl. On RDD created in that way I am executing
> simple operation which results depends on fact whether I cache RDD before
> it
> or not i.e if I run code below
>
> val loadedData = loadFromHDFS[Data](path,...)
> println(loadedData.map(x => x.getUserId + x.getDate).distinct().count()) //
> 20
> program will print 20, on the other hand executing next code
>
> val loadedData = loadFromHDFS[Data](path,...).cache()
> println(loadedData.map(x => x.getUserId + x.getDate).distinct().count()) //
> 1
> result in 1 printed to stdout.
>
> When I inspect values of the fields after reading cached data it seems
>
> I am pretty sure that root cause of described problem is issue with
> serialization of classes generated from avro idl, but I do not know how to
> resolve it. I tried to use Kryo, registering generated class (Data),
> registering different serializers from chill_avro for given class
> (SpecificRecordSerializer, SpecificRecordBinarySerializer, etc), but none
> of
> those ideas helps me.
>
> I post exactly the same question on stackoverflow but I did not receive any
> repsponse.  link
> <
> http://stackoverflow.com/questions/33027851/spark-issue-with-the-class-generated-from-avro-schema
> >
>
> What is more I created minimal working example, thanks to which it will be
> easy to reproduce problem.
> link 
>
> How I can solve this problem?
>
>
> Thanks,
> Bartek
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Issue-with-the-class-generated-from-avro-schema-tp24997.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: RDD of ImmutableList

2015-10-05 Thread Igor Berman
kryo doesn't support guava's collections by default
I remember encountered project in github that fixes this(not sure though).
I've ended to stop using guava collections as soon as spark rdds are
concerned.

On 5 October 2015 at 21:04, Jakub Dubovsky 
wrote:

> Hi all,
>
>   I would like to have an advice on how to use ImmutableList with RDD. Small
> presentation of an essence of my problem in spark-shell with guava jar
> added:
>
> scala> import com.google.common.collect.ImmutableList
> import com.google.common.collect.ImmutableList
>
> scala> val arr = Array(ImmutableList.of(1,2), ImmutableList.of(2,4),
> ImmutableList.of(3,6))
> arr: Array[com.google.common.collect.ImmutableList[Int]] = Array([1, 2],
> [2, 4], [3, 6])
>
> scala> val rdd = sc.parallelize(arr)
> rdd:
> org.apache.spark.rdd.RDD[com.google.common.collect.ImmutableList[Int]] =
> ParallelCollectionRDD[0] at parallelize at :24
>
> scala> rdd.count
>
>  This results in kryo exception saying that it cannot add a new element to
> list instance while deserialization:
>
> java.io.IOException: java.lang.UnsupportedOperationException
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
> at
> org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
> ...
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.UnsupportedOperationException
> at
> com.google.common.collect.ImmutableCollection.add(ImmutableCollection.java:91)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
> at
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
> ...
>
>   It somehow makes sense. But I cannot think of a workaround and I do not
> believe that using ImmutableList with RDD is not possible. How this is
> solved?
>
>   Thank you in advance!
>
>Jakub Dubovsky
>
>


Re: Getting spark application driver ID programmatically

2015-10-02 Thread Igor Berman
if driver id is application id then yes you can do it with
String appId = ctx.sc().applicationId(); //when ctx is java context


On 1 October 2015 at 20:44, Snehal Nagmote  wrote:

> Hi ,
>
> I have use case where we need to automate start/stop of spark streaming
> application.
>
> To stop spark job, we need driver/application id of the job .
>
> For example :
>
> /app/spark-master/bin/spark-class org.apache.spark.deploy.Client kill
> spark://10.65.169.242:7077 $driver_id
>
> I am thinking to get the driver id when we submit the job in verbose mode
> , by parsing the output .
>
> Does spark provide any api where it provides driver id of application .
>
> Is there any better or cleaner way to get driver ID ?
>
>
> Any suggestions would be helpful  ,
>
> Thanks,
> Snehal
>
>


Re: Troubleshooting "Task not serializable" in Spark/Scala environments

2015-09-21 Thread Igor Berman
Try to broadcasr header
On Sep 22, 2015 08:07, "Balaji Vijayan"  wrote:

> Howdy,
>
> I'm a relative novice at Spark/Scala and I'm puzzled by some behavior that
> I'm seeing in 2 of my local Spark/Scala environments (Scala for Jupyter and
> Scala IDE) but not the 3rd (Spark Shell). The following code throws the
> following stack trace error in the former 2 environments but executes
> successfully in the 3rd. I'm not sure how to go about troubleshooting my
> former 2 environments so any assistance is greatly appreciated.
>
> Code:
>
> //get file
> val logFile = "s3n://file"
> val logData  = sc.textFile(logFile)
> // header
> val header =  logData.first
> // filter out header
> val sample = logData.filter(!_.contains(header)).map {
>  line => line.replaceAll("['\"]","").substring(0,line.length()-1)
> }.takeSample(false,100,12L)
>
> Stack Trace:
>
> org.apache.spark.SparkException: Task not serializable
>   
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
>   
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
>   org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
>   org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
>   org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
>   org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)
>   
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>   
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>   org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
>   org.apache.spark.rdd.RDD.filter(RDD.scala:310)
>   cmd6$$user$$anonfun$3.apply(Main.scala:134)
>   cmd6$$user$$anonfun$3.apply(Main.scala:133)
> java.io.NotSerializableException: org.apache.spark.SparkConf
> Serialization stack:
>   - object not serializable (class: org.apache.spark.SparkConf, value: 
> org.apache.spark.SparkConf@309ed441)
>   - field (class: cmd2$$user, name: conf, type: class 
> org.apache.spark.SparkConf)
>   - object (class cmd2$$user, cmd2$$user@75a88665)
>   - field (class: cmd6, name: $ref$cmd2, type: class cmd2$$user)
>   - object (class cmd6, cmd6@5e9e8f0b)
>   - field (class: cmd6$$user, name: $outer, type: class cmd6)
>   - object (class cmd6$$user, cmd6$$user@692f81c)
>   - field (class: cmd6$$user$$anonfun$3, name: $outer, type: class 
> cmd6$$user)
>   - object (class cmd6$$user$$anonfun$3, )
>   - field (class: cmd6$$user$$anonfun$3$$anonfun$apply$1, name: $outer, 
> type: class cmd6$$user$$anonfun$3)
>   - object (class cmd6$$user$$anonfun$3$$anonfun$apply$1, )
>   
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>   
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>   
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
>   
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)
>   
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
>   org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
>   org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
>   org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:311)
>   org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:310)
>   
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>   
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>   org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
>   org.apache.spark.rdd.RDD.filter(RDD.scala:310)
>   cmd6$$user$$anonfun$3.apply(Main.scala:134)
>   cmd6$$user$$anonfun$3.apply(Main.scala:133)
>
> Thanks,
> Balaji
>


Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar

2015-09-08 Thread Igor Berman
as a starting point, attach your stacktrace...
ps: look for duplicates in your classpath, maybe you include another jar
with same class

On 8 September 2015 at 06:38, Nicholas R. Peterson 
wrote:

> I'm trying to run a Spark 1.4.1 job on my CDH5.4 cluster, through Yarn.
> Serialization is set to use Kryo.
>
> I have a large object which I send to the executors as a Broadcast. The
> object seems to serialize just fine. When it attempts to deserialize,
> though, Kryo throws a ClassNotFoundException... for a class that I include
> in the fat jar that I spark-submit.
>
> What could be causing this classpath issue with Kryo on the executors?
> Where should I even start looking to try to diagnose the problem? I
> appreciate any help you can provide.
>
> Thank you!
>
> -- Nick
>


Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar

2015-09-08 Thread Igor Berman
olver.java:115)
>   at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
>   at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)
>   at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:134)
>   at 
> com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
>   at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:626)
>   at 
> com.lumiata.patientanalysis.utils.CachedGraph.loadCacheFromSerializedData(CachedGraph.java:221)
>   at 
> com.lumiata.patientanalysis.utils.CachedGraph.(CachedGraph.java:182)
>   at 
> com.lumiata.patientanalysis.utils.CachedGraph.(CachedGraph.java:178)
>   ... 38 more
> Caused by: java.lang.ClassNotFoundException: com.i2028.Document.Document
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:348)
>   at 
> com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
>   ... 47 more
>
>
>
>> On Tue, Sep 8, 2015 at 6:01 AM Igor Berman <igor.ber...@gmail.com> wrote:
>>
>>> I wouldn't build on this. local mode & yarn are different so that jars
>>> you use in spark submit are handled differently
>>>
>>> On 8 September 2015 at 15:43, Nicholas R. Peterson <nrpeter...@gmail.com
>>> > wrote:
>>>
>>>> Thans, Igor; I've got it running again right now, and can attach the
>>>> stack trace when it finishes.
>>>>
>>>> In the mean time, I've noticed something interesting: in the Spark UI,
>>>> the application jar that I submit is not being included on the classpath.
>>>> It has been successfully uploaded to the nodes -- in the nodemanager
>>>> directory for the application, I see __app__.jar and __spark__.jar.  The
>>>> directory itself is on the classpath, and __spark__.jar and __hadoop_conf__
>>>> are as well.  When I do everything the same but switch the master to
>>>> local[*], the jar I submit IS added to the classpath.
>>>>
>>>> This seems like a likely culprit.  What could cause this, and how can I
>>>> fix it?
>>>>
>>>> Best,
>>>> Nick
>>>>
>>>> On Tue, Sep 8, 2015 at 1:14 AM Igor Berman <igor.ber...@gmail.com>
>>>> wrote:
>>>>
>>>>> as a starting point, attach your stacktrace...
>>>>> ps: look for duplicates in your classpath, maybe you include another
>>>>> jar with same class
>>>>>
>>>>> On 8 September 2015 at 06:38, Nicholas R. Peterson <
>>>>> nrpeter...@gmail.com> wrote:
>>>>>
>>>>>> I'm trying to run a Spark 1.4.1 job on my CDH5.4 cluster, through
>>>>>> Yarn. Serialization is set to use Kryo.
>>>>>>
>>>>>> I have a large object which I send to the executors as a Broadcast.
>>>>>> The object seems to serialize just fine. When it attempts to deserialize,
>>>>>> though, Kryo throws a ClassNotFoundException... for a class that I 
>>>>>> include
>>>>>> in the fat jar that I spark-submit.
>>>>>>
>>>>>> What could be causing this classpath issue with Kryo on the
>>>>>> executors? Where should I even start looking to try to diagnose the
>>>>>> problem? I appreciate any help you can provide.
>>>>>>
>>>>>> Thank you!
>>>>>>
>>>>>> -- Nick
>>>>>>
>>>>>
>>>>>
>>>


Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar

2015-09-08 Thread Igor Berman
hmm...out of ideas.
can you check in spark ui environment tab that this jar is not somehow
appears 2 times or more...? or more generally - any 2 jars that can contain
this class by any chance

regarding your question about classloader - no idea, probably there is, I
remember stackoverflow has some examples on how to print all classes, but
how to print all classes of kryo classloader - no idea.

On 8 September 2015 at 16:43, Nick Peterson <nrpeter...@gmail.com> wrote:

> Yes, the jar contains the class:
>
> $ jar -tf lumiata-evaluation-assembly-1.0.jar | grep 2028/Document/Document
> com/i2028/Document/Document$1.class
> com/i2028/Document/Document.class
>
> What else can I do?  Is there any way to get more information about the
> classes available to the particular classloader kryo is using?
>
> On Tue, Sep 8, 2015 at 6:34 AM Igor Berman <igor.ber...@gmail.com> wrote:
>
>> java.lang.ClassNotFoundException: com.i2028.Document.Document
>>
>> 1. so have you checked that jar that you create(fat jar) contains this class?
>>
>> 2. might be there is some stale cache issue...not sure though
>>
>>
>> On 8 September 2015 at 16:12, Nicholas R. Peterson <nrpeter...@gmail.com>
>> wrote:
>>
>>> Here is the stack trace:  (Sorry for the duplicate, Igor -- I forgot to 
>>> include the list.)
>>>
>>>
>>> 15/09/08 05:56:43 WARN scheduler.TaskSetManager: Lost task 183.0 in stage 
>>> 41.0 (TID 193386, ds-compute2.lumiata.com): java.io.IOException: 
>>> com.esotericsoftware.kryo.KryoException: Error constructing instance of 
>>> class: com.lumiata.patientanalysis.utils.CachedGraph
>>> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257)
>>> at 
>>> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
>>> at 
>>> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>>> at 
>>> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>>> at 
>>> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
>>> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>>> at 
>>> com.lumiata.evaluation.analysis.prod.ProductionAnalyzer$$anonfun$apply$1.apply(ProductionAnalyzer.scala:44)
>>> at 
>>> com.lumiata.evaluation.analysis.prod.ProductionAnalyzer$$anonfun$apply$1.apply(ProductionAnalyzer.scala:43)
>>> at 
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>>> at 
>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>>> at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>> Caused by: com.esotericsoftware.kryo.KryoException: Error constructing 
>>> instance of class: com.lumiata.patientanalysis.utils.CachedGraph
>>> at 
>>> com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:126)
>>> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1065)
>>> at 
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:228)
>>> at 
>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:217)
>>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>>> at 
>>> org.apache.spark.serializer.Kr

Re: Java vs. Scala for Spark

2015-09-08 Thread Igor Berman
we are using java7..its much more verbose that java8 or scala examples
in addition there sometimes libraries that has no java  api, so you need to
write them by yourself(e.g. graphx)
on the other hand, scala is not trivial language like java, so it depends
on your team

On 8 September 2015 at 17:44, Bryan Jeffrey  wrote:

> Thank you for the quick responses.  It's useful to have some insight from
> folks already extensively using Spark.
>
> Regards,
>
> Bryan Jeffrey
>
> On Tue, Sep 8, 2015 at 10:28 AM, Sean Owen  wrote:
>
>> Why would Scala vs Java performance be different Ted? Relatively
>> speaking there is almost no runtime difference; it's the same APIs or
>> calls via a thin wrapper. Scala/Java vs Python is a different story.
>>
>> Java libraries can be used in Scala. Vice-versa too, though calling
>> Scala-generated classes can be clunky in Java. What's your concern
>> about interoperability Jeffrey?
>>
>> I disagree that Java 7 vs Scala usability is sooo different, but it's
>> certainly much more natural to use Spark in Scala. Java 8 closes a lot
>> of the usability gap with Scala, but not all of it. Enough that it's
>> not crazy for a Java shop to stick to Java 8 + Spark and not be at a
>> big disadvantage.
>>
>> The downsides of Scala IMHO are that it provides too much: lots of
>> nice features (closures! superb collections!), lots of rope to hang
>> yourself too (implicits sometimes!) and some WTF features (XML
>> literals!) Learning the good useful bits of Scala isn't hard. You can
>> always write Scala code as much like Java as you like, I find.
>>
>> Scala tooling is different from Java tooling; that's an
>> underappreciated barrier. For example I think SBT is good for
>> development, bad for general project lifecycle management compared to
>> Maven, but in any event still less developed. SBT/scalac are huge
>> resource hogs, since so much of Scala is really implemented in the
>> compiler; prepare to update your laptop to develop in Scala on your
>> IDE of choice, and start to think about running long-running compile
>> servers like we did in the year 2000.
>>
>> Still net-net I would choose Scala, FWIW.
>>
>> On Tue, Sep 8, 2015 at 3:07 PM, Ted Yu  wrote:
>> > Performance wise, Scala is by far the best choice when you use Spark.
>> >
>> > The cost of learning Scala is not negligible but not insurmountable
>> either.
>> >
>> > My personal opinion.
>> >
>> > On Tue, Sep 8, 2015 at 6:50 AM, Bryan Jeffrey 
>> > wrote:
>> >>
>> >> All,
>> >>
>> >> We're looking at language choice in developing a simple streaming
>> >> processing application in spark.  We've got a small set of example code
>> >> built in Scala.  Articles like the following:
>> >>
>> http://www.bigdatatidbits.cc/2015/02/navigating-from-scala-to-spark-for.html
>> >> would seem to indicate that Scala is great for use in distributed
>> >> programming (including Spark).  However, there is a large group of
>> folks
>> >> that seem to feel that interoperability with other Java libraries is
>> much to
>> >> be desired, and that the cost of learning (yet another) language is
>> quite
>> >> high.
>> >>
>> >> Has anyone looked at Scala for Spark dev in an enterprise environment?
>> >> What was the outcome?
>> >>
>> >> Regards,
>> >>
>> >> Bryan Jeffrey
>> >
>> >
>>
>
>


Re: Spark on Yarn: Kryo throws ClassNotFoundException for class included in fat jar

2015-09-08 Thread Igor Berman
another idea - you can add this fat jar explicitly to the classpath of
executors...it's not a solution, but might be it work...
I mean place it somewhere locally on executors and add it to cp with
spark.executor.extraClassPath

On 8 September 2015 at 18:30, Nick Peterson <nrpeter...@gmail.com> wrote:

> Yeah... none of the jars listed on the classpath contain this class.  The
> only jar that does is the fat jar that I'm submitting with spark-submit,
> which as mentioned isn't showing up on the classpath anywhere.
>
> -- Nick
>
> On Tue, Sep 8, 2015 at 8:26 AM Igor Berman <igor.ber...@gmail.com> wrote:
>
>> hmm...out of ideas.
>> can you check in spark ui environment tab that this jar is not somehow
>> appears 2 times or more...? or more generally - any 2 jars that can contain
>> this class by any chance
>>
>> regarding your question about classloader - no idea, probably there is, I
>> remember stackoverflow has some examples on how to print all classes, but
>> how to print all classes of kryo classloader - no idea.
>>
>> On 8 September 2015 at 16:43, Nick Peterson <nrpeter...@gmail.com> wrote:
>>
>>> Yes, the jar contains the class:
>>>
>>> $ jar -tf lumiata-evaluation-assembly-1.0.jar | grep
>>> 2028/Document/Document
>>> com/i2028/Document/Document$1.class
>>> com/i2028/Document/Document.class
>>>
>>> What else can I do?  Is there any way to get more information about the
>>> classes available to the particular classloader kryo is using?
>>>
>>> On Tue, Sep 8, 2015 at 6:34 AM Igor Berman <igor.ber...@gmail.com>
>>> wrote:
>>>
>>>> java.lang.ClassNotFoundException: com.i2028.Document.Document
>>>>
>>>> 1. so have you checked that jar that you create(fat jar) contains this 
>>>> class?
>>>>
>>>> 2. might be there is some stale cache issue...not sure though
>>>>
>>>>
>>>> On 8 September 2015 at 16:12, Nicholas R. Peterson <
>>>> nrpeter...@gmail.com> wrote:
>>>>
>>>>> Here is the stack trace:  (Sorry for the duplicate, Igor -- I forgot to 
>>>>> include the list.)
>>>>>
>>>>>
>>>>> 15/09/08 05:56:43 WARN scheduler.TaskSetManager: Lost task 183.0 in stage 
>>>>> 41.0 (TID 193386, ds-compute2.lumiata.com): java.io.IOException: 
>>>>> com.esotericsoftware.kryo.KryoException: Error constructing instance of 
>>>>> class: com.lumiata.patientanalysis.utils.CachedGraph
>>>>>   at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1257)
>>>>>   at 
>>>>> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
>>>>>   at 
>>>>> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
>>>>>   at 
>>>>> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
>>>>>   at 
>>>>> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
>>>>>   at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
>>>>>   at 
>>>>> com.lumiata.evaluation.analysis.prod.ProductionAnalyzer$$anonfun$apply$1.apply(ProductionAnalyzer.scala:44)
>>>>>   at 
>>>>> com.lumiata.evaluation.analysis.prod.ProductionAnalyzer$$anonfun$apply$1.apply(ProductionAnalyzer.scala:43)
>>>>>   at 
>>>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>>>>>   at 
>>>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
>>>>>   at 
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>   at 
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>   at 
>>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>>>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
>>>>>   at org.apache.spark

Re: Problem to persist Hibernate entity from Spark job

2015-09-06 Thread Igor Berman
how do you create your session? do you reuse it across threads? how do you
create/close session manager?
look for the problem in session creation, probably something deadlocked, as
far as I remember hib.session should be created per thread

On 6 September 2015 at 07:11, Zoran Jeremic  wrote:

> Hi,
>
> I'm developing long running process that should find RSS feeds that all
> users in the system have registered to follow, parse these RSS feeds,
> extract new entries and store it back to the database as Hibernate
> entities, so user can retrieve it. I want to use Apache Spark to enable
> parallel processing, since this process might take several hours depending
> on the number of users.
>
> The approach I thought should work was to use
> *useridsRDD.foreachPartition*, so I can have separate hibernate session
> for each partition. I created Database session manager that is initialized
> for each partition which keeps hibernate session alive until the process is
> over.
>
> Once all RSS feeds from one source are parsed and Feed entities are
> created, I'm sending the whole list to Database Manager method that saves
> the whole list in batch:
>
>> public   void saveInBatch(List entities) {
>> try{
>>   boolean isActive = session.getTransaction().isActive();
>> if ( !isActive) {
>> session.beginTransaction();
>> }
>>for(Object entity:entities){
>>  session.save(entity);
>> }
>>session.getTransaction().commit();
>>  }catch(Exception ex){
>> if(session.getTransaction()!=null) {
>> session.getTransaction().rollback();
>> ex.printStackTrace();
>>}
>>   }
>>
>> However, this works only if I have one Spark partition. If there are two
> or more partitions, the whole process is blocked once I try to save the
> first entity. In order to make the things simpler, I tried to simplify Feed
> entity, so it doesn't refer and is not referred from any other entity. It
> also doesn't have any collection.
>
> I hope that some of you have already tried something similar and could
> give me idea how to solve this problem
>
> Thanks,
> Zoran
>
>


Re: Tuning - tasks per core

2015-09-03 Thread Igor Berman
suppose you have 1 job that do some transformation, suppose you have X
cores in your cluster and you are willing to give all of them to your job
suppose you have no shuffles(to keep it simple)

set number of partitions of your input data to be 3X or 2X, thus you'll get
2/3 tasks per each core

On 3 September 2015 at 15:56, Hans van den Bogert 
wrote:

> The tuning documentations tells us to have 2-3 tasks per CPU core
>
> > In general, we recommend 2-3 tasks per CPU core in your cluster.
>
> I’m wondering how you’d actually accomplish this.
> Setting spark.task.cpus to a fraction like 0.5 or 0.3 does not work.
>
> Perhaps I’m misunderstanding, any advice is welcome,
>
> Regards,
>
> Hans
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Managing httpcomponent dependency in Spark/Solr

2015-09-03 Thread Igor Berman
not sure if it will help, but have you checked
https://maven.apache.org/plugins/maven-shade-plugin/examples/class-relocation.html

On 31 August 2015 at 19:33, Oliver Schrenk  wrote:

> Hi,
>
> We are running a distibuted indexing service for Solr (4.7) on a Spark
> (1.2) cluster. Now we wanted to upgrade to Solr 5.3 and are running into
> problems with dependencies.
>
> Solr 5 brings in org.apache.httpcomponents httpclient 4.4.1 (1) and the
> prebuilt binary for Spark 1.2.2 for CDH 4 brings in httpclient 4.2.5 which
> crashes indexing Solr via SolrJ.
>
>
>
> Is there a way of not bringing this dependency in? Is there a way of
> having a different classloader for my client application? I saw that there
> is `spark.driver.userClassPathFirst`, is it something that would help?
>
>
>
> Cheers,
> Olive
>
>
>
> (1)
> https://github.com/apache/lucene-solr/blob/trunk/lucene/ivy-versions.properties#L163
> (2) cd $HOME/Downloads
> wget http://apache.xl-mirror.nl/spark/spark-1.2.2/spark-1.2.2-bin-cdh4.tgz
> tar xzvf spark-1.2.2-bin-cdh4.tgz
> unzip -p
> spark-1.2.2-bin-cdh4/lib/spark-assembly-1.2.2-hadoop2.0.0-mr1-cdh4.2.0.jar
> org/apache/http/version.properties | grep release
> info.release   = 4.2.5
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: bulk upload to Elasticsearch and shuffle behavior

2015-09-01 Thread Igor Berman
Hi Eric,
I see that you solved your problem. Imho, when you do repartition you split
your work into 2 stages, so your hbase lookup happens at first stage, and
upload to ES happens after shuffle on next stage, so without repartition
it's hard to tell where is ES upload and where is Hbase lookup time.

If you don't mind it's interesting if you reduce number of partitions
before uploading to ES? Do you have some rule of thumb on how much
partitions should be there before uploading to ES?
We have kind of same pipeline and we reduce # of partitions to 8 or so
before uploading to ES(probably depends on ES cluster strength)


On 1 September 2015 at 06:05, Eric Walker  wrote:

> I think I have found out what was causing me difficulties.  It seems I was
> reading too much into the stage description shown in the "Stages" tab of
> the Spark application UI.  While it said "repartition at
> NativeMethodAccessorImpl.java:-2", I can infer from the network traffic and
> from its response to changes I subsequently made that the actual code that
> was running was the code doing the HBase lookups.  I suspect the actual
> shuffle, once it occurred, required on the same order of network IO as the
> upload to Elasticsearch that followed.
>
> Eric
>
>
>
> On Mon, Aug 31, 2015 at 6:09 PM, Eric Walker 
> wrote:
>
>> Hi,
>>
>> I am working on a pipeline that carries out a number of stages, the last
>> of which is to build some large JSON objects from information in the
>> preceding stages.  The JSON objects are then uploaded to Elasticsearch in
>> bulk.
>>
>> If I carry out a shuffle via a `repartition` call after the JSON
>> documents have been created, the upload to ES is fast.  But the shuffle
>> itself takes many tens of minutes and is IO-bound.
>>
>> If I omit the repartition, the upload to ES takes a long time due to a
>> complete lack of parallelism.
>>
>> Currently, the step that precedes the assembling of the JSON documents,
>> which goes into the final repartition call, is the querying of pairs of
>> object ids.  In a mapper the ids are resolved to documents by querying
>> HBase.  The initial pairs of ids are obtained via a query against the SQL
>> context, and the query result is repartitioned before going into the mapper
>> that resolves the ids into documents.
>>
>> It's not clear to me why the final repartition preceding the upload to ES
>> is required.  I would like to omit it, since it is so expensive and
>> involves so much network IO, but have not found a way to do this yet.  If I
>> omit the repartition, the job takes much longer.
>>
>> Does anyone know what might be going on here, and what I might be able to
>> do to get rid of the last `repartition` call before the upload to ES?
>>
>> Eric
>>
>>
>


Re: Help Explain Tasks in WebUI:4040

2015-08-31 Thread Igor Berman
are there other processes on sk3? or more generally are you sharing
resources with somebody else, virtualization etc

does your transformation consumes other services?(e.g. reading from s3, so
it can happen that s3 latency plays the role...)
can it be that task per some key will take longer than same task on other
key(I mean your business logic...) I see that some tasks take ~1min and
other ~1h which is strange




On 28 August 2015 at 21:47, Muler  wrote:

> I have a 7 node cluster running in standalone mode (1 executor per node,
> 100g/executor, 18 cores/executor)
>
> Attached is the Task status for two of my nodes. I'm not clear why some of
> my tasks are taking too long:
>
>1. [node sk5, green] task 197 took 35 mins while task 218 took less
>than 2 mins. But if you look into the size of output size/records they have
>almost same size. Even more strange, the size of shuffle spill for memory
>and disk is 0 for task 197 and yet it is taking a long time
>
> Same issue for my other node (sk3, red)
>
> Can you please explain what is going on?
>
> Thanks,
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>


Re: spark-submit issue

2015-08-31 Thread Igor Berman
might be you need to drain stdout/stderr of subprocess...otherwise
subprocess can deadlock
http://stackoverflow.com/questions/3054531/correct-usage-of-processbuilder

On 27 August 2015 at 16:11, pranay  wrote:

> I have a java program that does this - (using Spark 1.3.1 ) Create a
> command
> string that uses "spark-submit" in it ( with my Class file etc ), and i
> store this string in a temp file somewhere as a shell script Using
> Runtime.exec, i execute this script and wait for its completion, using
> process.waitFor Doing ps -ef shows me SparkSubmitDriverBootstrapper , the
> script running my class ... parent child relationship..
>
> The job gets triggered on spark-cluster and gets over but
> SparkSubmitDriverBootstrapper still shows up, due to this the
> process.waitFor never comes out and i can't detect the execution end...
>
> If i run the /temp file independently. things work file... only when i
> trigger /temp scrict inside Runtime.exec , this issue occurs... Any
> comments
> ?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-issue-tp24474.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark-submit issue

2015-08-31 Thread Igor Berman
1. think once again if you want to call spark submit in such way...I'm not
sure why you do it, but please consider just opening spark context inside
your jvm(you need to add spark jars to classpath..)
2. use https://commons.apache.org/proper/commons-exec/ with
PumpStreamHandler

On 31 August 2015 at 10:42, Pranay Tonpay <pranay.ton...@impetus.co.in>
wrote:

> Igor,, this seems to be the cause, however i am not sure at the moment how
> to resolve it ... what i tried just now was that after "
>
> SparkSubmitDriverBootstrapper" process reaches the hung stage... i went
> inside /proc//fd  and just tailed "2" (stderr) and the process
> immediately exits .
>
>
> *From:* Igor Berman <igor.ber...@gmail.com>
> *Sent:* Monday, August 31, 2015 12:41 PM
> *To:* Pranay Tonpay
> *Cc:* user
> *Subject:* Re: spark-submit issue
>
> might be you need to drain stdout/stderr of subprocess...otherwise
> subprocess can deadlock
> http://stackoverflow.com/questions/3054531/correct-usage-of-processbuilder
>
> On 27 August 2015 at 16:11, pranay <pranay.ton...@impetus.co.in> wrote:
>
>> I have a java program that does this - (using Spark 1.3.1 ) Create a
>> command
>> string that uses "spark-submit" in it ( with my Class file etc ), and i
>> store this string in a temp file somewhere as a shell script Using
>> Runtime.exec, i execute this script and wait for its completion, using
>> process.waitFor Doing ps -ef shows me SparkSubmitDriverBootstrapper , the
>> script running my class ... parent child relationship..
>>
>> The job gets triggered on spark-cluster and gets over but
>> SparkSubmitDriverBootstrapper still shows up, due to this the
>> process.waitFor never comes out and i can't detect the execution end...
>>
>> If i run the /temp file independently. things work file... only when i
>> trigger /temp scrict inside Runtime.exec , this issue occurs... Any
>> comments
>> ?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-issue-tp24474.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
> --
>
>
>
>
>
>
> NOTE: This message may contain information that is confidential,
> proprietary, privileged or otherwise protected by law. The message is
> intended solely for the named addressee. If received in error, please
> destroy and notify the sender. Any use of this email is prohibited when
> received in error. Impetus does not represent, warrant and/or guarantee,
> that the integrity of this communication has been maintained nor that the
> communication is free of errors, virus, interception or interference.
>


Re: Parallel execution of RDDs

2015-08-31 Thread Igor Berman
what is size of the pool you submitting spark jobs from(futures you've
mentioned)? is it 8? I think you have fixed thread pool of 8 so there can't
be more than 8 parallel jobs running...so try to increase it
what is number of partitions of each of your rdds?
how many cores has your worker machine(those 15 you've mentioned)
e.g. if you have 15 * 8 cores but your rdd with 1000 partitions - there is
no way you'll get parallel job execution since only 1 job already fills all
cores with tasks(unless you are going to manage resources per each
submit/job)



On 31 August 2015 at 16:51, Brian Parker  wrote:

> Hi, I have a large number of RDDs that I need to process separately.
> Instead of submitting these jobs to the Spark scheduler one by one, I'd
> like to submit them in parallel in order to maximize cluster utilization.
>
> I've tried to process the RDDs as Futures, but the number of Active jobs
> maxes out at 8 and the run time is no faster than serial processing (even
> with a 15 node cluster).  What is the limitation on number of Active jobs
> in the Spark scheduler?
>
> What are some strategies to maximize cluster utilization with
> many(possibly small) RDDs ?  Is this a good use case for Spark Streaming?
>


Re: Strange shuffle behaviour difference between Zeppelin and Spark-shell

2015-08-19 Thread Igor Berman
any differences in number of cores, memory settings for executors?


On 19 August 2015 at 09:49, Rick Moritz rah...@gmail.com wrote:

 Dear list,

 I am observing a very strange difference in behaviour between a Spark
 1.4.0-rc4 REPL (locally compiled with Java 7) and a Spark 1.4.0 zeppelin
 interpreter (compiled with Java 6 and sourced from maven central).

 The workflow loads data from Hive, applies a number of transformations
 (including quite a lot of shuffle operations) and then presents an enriched
 dataset. The code (an resulting DAGs) are identical in each case.

 The following particularities are noted:
 Importing the HiveRDD and caching it yields identical results on both
 platforms.
 Applying case classes, leads to a 2-2.5MB increase in dataset size per
 partition (excepting empty partitions).

 Writing shuffles shows this much more significant result:

 Zeppelin:
 *Total Time Across All Tasks: * 2,6 min
 *Input Size / Records: * 2.4 GB / 7314771
 *Shuffle Write: * 673.5 MB / 7314771

 vs

 Spark-shell:
 *Total Time Across All Tasks: * 28 min
 *Input Size / Records: * 3.6 GB / 7314771
 *Shuffle Write: * 9.0 GB / 7314771

 This is one of the early stages, which reads from a cached partition and
 then feeds into a join-stage. The latter stages show similar behaviour in
 producing excessive shuffle spills.

 Quite often the excessive shuffle volume will lead to massive shuffle
 spills which ultimately kill not only performance, but the actual executors
 as well.

 I have examined the Environment tab in the SParkUI and identified no
 notable difference besides FAIR (Zeppelin) vs FIFO (spark-shell) scheduling
 mode. I fail to see how this would impact shuffle writes in such a drastic
 way, since it should be on the inter-job level, while this happens at the
 inter-stage level.

 I was somewhat supicious of maybe compression or serialization playing a
 role, but the SparkConf points to those being set to the default. Also
 Zeppelin's interpreter adds no relevant additional default parameters.
 I performed a diff between rc4 (which was later released) and 1.4.0 and as
 expected there were no differences, besides a single class (remarkably, a
 shuffle-relevant class:
 /org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.class )
 differing in its binary representation due to being compiled with Java 7
 instead of Java 6. The decompiled sources of those two are again identical.

 I may attempt as a next step to simply replace that file in the packaged
 jar, to ascertain that indeed there is no difference between the two
 versions, but would consider this to be a major bg, if a simple compiler
 change leads to this kind of issue.

 I a also open for any other ideas, in particular to verify that the same
 compression/serialization is indeed happening, and regarding ways to
 determin what exactly is written into these shuffles -- currently I only
 know that the tuples are bigger (or smaller) than they ought to be. The
 Zeppelin-obtained results do appear to be consistent at least, thus the
 suspicion is, that there is an issue with the process launched from
 spark-shell. I will also attempt to build a spark job and spark-submit it
 using different spark-binaries to further explore the issue.

 Best Regards,

 Rick Moritz

 PS: I already tried to send this mail yesterday, but it never made it onto
 the list, as far as I can tell -- I apologize should anyone receive this as
 a second copy.




Re: Strange shuffle behaviour difference between Zeppelin and Spark-shell

2015-08-19 Thread Igor Berman
i would compare spark ui metrics for both cases and see any
differences(number of partitions, number of spills etc)
why can't you make repl to be consistent with zepellin spark version?
 might be rc has issues...




On 19 August 2015 at 14:42, Rick Moritz rah...@gmail.com wrote:

 No, the setup is one driver with 32g of memory, and three executors each
 with 8g of memory in both cases. No core-number has been specified, thus it
 should default to single-core (though I've seen the yarn-owned jvms
 wrapping the executors take up to 3 cores in top). That is, unless, as I
 suggested, there are different defaults for the two means of job submission
 that come into play in a non-transparent fashion (i.e. not visible in
 SparkConf).

 On Wed, Aug 19, 2015 at 1:36 PM, Igor Berman igor.ber...@gmail.com
 wrote:

 any differences in number of cores, memory settings for executors?


 On 19 August 2015 at 09:49, Rick Moritz rah...@gmail.com wrote:

 Dear list,

 I am observing a very strange difference in behaviour between a Spark
 1.4.0-rc4 REPL (locally compiled with Java 7) and a Spark 1.4.0 zeppelin
 interpreter (compiled with Java 6 and sourced from maven central).

 The workflow loads data from Hive, applies a number of transformations
 (including quite a lot of shuffle operations) and then presents an enriched
 dataset. The code (an resulting DAGs) are identical in each case.

 The following particularities are noted:
 Importing the HiveRDD and caching it yields identical results on both
 platforms.
 Applying case classes, leads to a 2-2.5MB increase in dataset size per
 partition (excepting empty partitions).

 Writing shuffles shows this much more significant result:

 Zeppelin:
 *Total Time Across All Tasks: * 2,6 min
 *Input Size / Records: * 2.4 GB / 7314771
 *Shuffle Write: * 673.5 MB / 7314771

 vs

 Spark-shell:
 *Total Time Across All Tasks: * 28 min
 *Input Size / Records: * 3.6 GB / 7314771
 *Shuffle Write: * 9.0 GB / 7314771

 This is one of the early stages, which reads from a cached partition and
 then feeds into a join-stage. The latter stages show similar behaviour in
 producing excessive shuffle spills.

 Quite often the excessive shuffle volume will lead to massive shuffle
 spills which ultimately kill not only performance, but the actual executors
 as well.

 I have examined the Environment tab in the SParkUI and identified no
 notable difference besides FAIR (Zeppelin) vs FIFO (spark-shell) scheduling
 mode. I fail to see how this would impact shuffle writes in such a drastic
 way, since it should be on the inter-job level, while this happens at the
 inter-stage level.

 I was somewhat supicious of maybe compression or serialization playing a
 role, but the SparkConf points to those being set to the default. Also
 Zeppelin's interpreter adds no relevant additional default parameters.
 I performed a diff between rc4 (which was later released) and 1.4.0 and
 as expected there were no differences, besides a single class (remarkably,
 a shuffle-relevant class:
 /org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.class )
 differing in its binary representation due to being compiled with Java 7
 instead of Java 6. The decompiled sources of those two are again identical.

 I may attempt as a next step to simply replace that file in the packaged
 jar, to ascertain that indeed there is no difference between the two
 versions, but would consider this to be a major bg, if a simple compiler
 change leads to this kind of issue.

 I a also open for any other ideas, in particular to verify that the same
 compression/serialization is indeed happening, and regarding ways to
 determin what exactly is written into these shuffles -- currently I only
 know that the tuples are bigger (or smaller) than they ought to be. The
 Zeppelin-obtained results do appear to be consistent at least, thus the
 suspicion is, that there is an issue with the process launched from
 spark-shell. I will also attempt to build a spark job and spark-submit it
 using different spark-binaries to further explore the issue.

 Best Regards,

 Rick Moritz

 PS: I already tried to send this mail yesterday, but it never made it
 onto the list, as far as I can tell -- I apologize should anyone receive
 this as a second copy.






Re: blogs/articles/videos on how to analyse spark performance

2015-08-19 Thread Igor Berman
you don't need to register, search in youtube for this video...

On 19 August 2015 at 18:34, Gourav Sengupta gourav.sengu...@gmail.com
wrote:

 Excellent resource: http://www.oreilly.com/pub/e/3330

 And more amazing is the fact that the presenter actually responds to your
 questions.

 Regards,
 Gourav Sengupta

 On Wed, Aug 19, 2015 at 4:12 PM, Todd bit1...@163.com wrote:

 Hi,
 I would ask if there are some blogs/articles/videos on how to analyse
 spark performance during runtime,eg, tools that can be used or something
 related.





Re: how do I execute a job on a single worker node in standalone mode

2015-08-18 Thread Igor Berman
by default standalone creates 1 executor on every worker machine per
application
number of overall cores is configured with --total-executor-cores
so in general if you'll specify --total-executor-cores=1 then there would
be only 1 core on some executor and you'll get what you want

on the other hand, if you application needs all cores of your cluster and
only some specific job should run on single executor there are few methods
to achieve this
e.g. coallesce(1) or dummyRddWithOnePartitionOnly.foreachPartition


On 18 August 2015 at 01:36, Axel Dahl a...@whisperstream.com wrote:

 I have a 4 node cluster and have been playing around with the
 num-executors parameters, executor-memory and executor-cores

 I set the following:
 --executor-memory=10G
 --num-executors=1
 --executor-cores=8

 But when I run the job, I see that each worker, is running one executor
 which has  2 cores and 2.5G memory.

 What I'd like to do instead is have Spark just allocate the job to a
 single worker node?

 Is that possible in standalone mode or do I need a job/resource scheduler
 like Yarn to do that?

 Thanks in advance,

 -Axel





Re: All masters are unresponsive! Giving up.

2015-08-07 Thread Igor Berman
check on which ip/port master listens
netstat -a -t --numeric-ports


On 7 August 2015 at 20:48, Jeff Jones jjo...@adaptivebiotech.com wrote:

 Thanks. Added this to both the client and the master but still not getting
 any more information. I confirmed the flag with ps.



 jjones53222  2.7  0.1 19399412 549656 pts/3 Sl   17:17   0:44
 /opt/jdk1.8/bin/java -cp
 /home/jjones/bin/spark-1.4.1-bin-hadoop2.6/sbin/../conf/:/home/jjones/bin/spark-1.4.1-bin-hadoop2.6/lib/spark-assembly-1.4.1-hadoop2.6.0.jar:/home/jjones/bin/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/home/jjones/bin/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/home/jjones/bin/spark-1.4.1-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar
 -Dsun.io.serialization.extendedDebugInfo=true -Xms512m -Xmx512m
 org.apache.spark.deploy.master.Master --ip p3.ourdomain.com --port 7077
 --webui-port 8080’



 Error message(s) the same:



 15/08/07 17:23:26 ERROR Remoting: org.apache.spark.deploy.Command; local
 class incompatible: stream classdesc serialVersionUID =
 -7098307370860582211, local class serialVersionUID = -3335312719467547622

 java.io.InvalidClassException: org.apache.spark.deploy.Command; local
 class incompatible: stream classdesc serialVersionUID =
 -7098307370860582211, local class serialVersionUID = -3335312719467547622

 at
 java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:621)

 at
 java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)

 at
 java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)

 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)

 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)

 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)

 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

 at
 java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)

 at
 java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)

 at
 java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)

 at
 java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)

 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)

 at
 akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)

 at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)

 at
 akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)

 at
 akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)

 at scala.util.Try$.apply(Try.scala:161)

 at
 akka.serialization.Serialization.deserialize(Serialization.scala:98)

 at
 akka.remote.serialization.MessageContainerSerializer.fromBinary(MessageContainerSerializer.scala:63)

 at
 akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)

 at scala.util.Try$.apply(Try.scala:161)

 at
 akka.serialization.Serialization.deserialize(Serialization.scala:98)

 at
 akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)

 at
 akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58)

 at
 akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58)

 at
 akka.remote.DefaultMessageDispatcher.payloadClass$1(Endpoint.scala:59)

 at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:99)

 at
 akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937)

 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)

 at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415)

 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

 at akka.actor.ActorCell.invoke(ActorCell.scala:487)

 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)

 at akka.dispatch.Mailbox.run(Mailbox.scala:220)

 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)



 *From:* Sonal Goyal [mailto:sonalgoy...@gmail.com]
 *Sent:* Thursday, August 6, 2015 11:22 PM
 *To:* Jeff Jones
 *Cc:* user@spark.apache.org
 *Subject:* Re: All masters are unresponsive! Giving up.



 There seems  to be a version mismatch somewhere. You can try and find out
 the cause with debug serialization information. I think the jvm flag
 -Dsun.io.*serialization*.*extendedDebugInfo*=true should help.


 Best Regards,
 Sonal
 Founder, Nube Technologies http://www.nubetech.co

 Check out Reifier at Spark Summit 2015
 https://spark-summit.org/2015/events/real-time-fuzzy-matching-with-spark-and-elastic-search/





 On 

Re: Enum values in custom objects mess up RDD operations

2015-08-06 Thread Igor Berman
enums hashcode is jvm instance specific(ie. different jvms will give you
different values), so  you can use ordinal in hashCode computation or use
hashCode on enums ordinal as part of hashCode computation

On 6 August 2015 at 11:41, Warfish sebastian.ka...@gmail.com wrote:

 Hi everyone,

 I was working with Spark for a little while now and have encountered a very
 strange behaviour that caused me a lot of headaches:

 I have written my own POJOs to encapsulate my data and this data is held in
 some JavaRDDs. Part of these POJOs is a member variable of a custom enum
 type. Whenever I do some operations on these RDDs such as subtract,
 groupByKey, reduce or similar things, the results are inconsistent and
 non-sensical. However, this happens only when the application runs in
 standalone cluster mode (10 nodes). When running locally on my developer
 machine, the code executes just fine. If you want to reproduce this
 behaviour,  here
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n24149/SparkTest.zip
 
 is the complete Maven project that you can run out of the box. I am running
 Spark 1.4.0 and submitting the application using
 /usr/local/spark-1.4.0-bin-hadoop2.4/bin/spark-submit --class
 de.spark.test.Main ./SparkTest-1.0-SNAPSHOT.jar



 Consider the following code for my custom object:


 package de.spark.test;

 import java.io.Serializable;
 import java.util.Objects;

 public class MyObject implements Serializable {

 private MyEnum myEnum;

 public MyObject(MyEnum myEnum) {
 this.myEnum = myEnum;
 }

 public MyEnum getMyEnum() {
 return myEnum;
 }

 public void setMyEnum(MyEnum myEnum) {
 this.myEnum = myEnum;
 }

 @Override
 public int hashCode() {
 int hash = 5;
 hash = 41 * hash + Objects.hashCode(this.myEnum);
 return hash;
 }

 @Override
 public boolean equals(Object obj) {
 if (obj == null) {
 return false;
 }
 if (getClass() != obj.getClass()) {
 return false;
 }
 final MyObject other = (MyObject) obj;
 if (this.myEnum != other.myEnum) {
 return false;
 }
 return true;
 }

 @Override
 public String toString() {
 return MyObject{ + myEnum= + myEnum + '}';
 }

 }


 As you can see, I have overriden equals() and hashCode() (both are
 auto-generated). The enum is given as follows:


 package de.spark.test;

 import java.io.Serializable;

 public enum MyEnum implements Serializable {
   VALUE1, VALUE2
 }


 The main() method is defined by:


 package de.spark.test;

 import java.util.ArrayList;
 import java.util.List;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;

 public class Main {

   public static void main(String[] args) {
 SparkConf conf = new SparkConf().setAppName(Spark Test)
 .setMaster(myMaster);

 JavaSparkContext jsc = new JavaSparkContext(conf);

 System.out.println(///
 Object generation);

 ListMyObject l1 = new ArrayList();

 for(int i = 0; i  1000; i++) {
 l1.add(new MyObject(MyEnum.VALUE1));
 }

 JavaRDDMyObject myObjectRDD1 = jsc.parallelize(l1);
 JavaRDDMyObject myObjectRDD2 = jsc.parallelize(l1);

 System.out.println(myObjectRDD1 count  =  +
 myObjectRDD1.count());
 System.out.println(myObjectRDD2 count  =  +
 myObjectRDD2.count());

 System.out.println(///
 Distinct);

 JavaRDDMyObject myObjectRDD1Distinct = myObjectRDD1.distinct();
 JavaRDDMyObject myObjectRDD2Distinct = myObjectRDD2.distinct();

 System.out.println(myObjectRDD1Distinct count  =  +
 myObjectRDD1Distinct.count());
 System.out.println(myObjectRDD2Distinct count  =  +
 myObjectRDD2Distinct.count());

 System.out.println(///
 Subtract);

 JavaRDDMyObject myObjectRDD1Minus1 =
 myObjectRDD1.subtract(myObjectRDD1);
 JavaRDDMyObject myObjectRDD1Minus2 =
 myObjectRDD1.subtract(myObjectRDD2);
 JavaRDDMyObject myObjectRDD2Minus1 =
 myObjectRDD2.subtract(myObjectRDD1);

 System.out.println(myObjectRDD1Minus1 count=  +
 myObjectRDD1Minus1.count());
 System.out.println(myObjectRDD1Minus2 count=  +
 myObjectRDD1Minus2.count());
 System.out.println(myObjectRDD2Minus1 count=  +
 myObjectRDD2Minus1.count());

 System.out.println(///
 End);
   }

 }


 Both RDDs contain 1000 exactly equal objects, one would expect each call of
 distinct() to result in 1 and subtract(JavaRDDMyObject) to result in
 empty
 RDDs. However here is some sample output:


 /// Object generation
 myObjectRDD1 count  = 1000
 myObjectRDD2 

Re: Combining Spark Files with saveAsTextFile

2015-08-05 Thread Igor Berman
using coalesce might be dangerous, since 1 worker process will need to
handle whole file and if the file is huge you'll get OOM, however it
depends on implementation, I'm not sure how it will be done
nevertheless, worse to try the coallesce method(please post your results)

another option would be to use FileUtil.copyMerge which copies each
partition one after another into destination stream(file); so as soon as
you've written your hdfs file with spark with multiple partitions in
parallel(as usual), you can then make another step to merge it into any
destination you want

On 5 August 2015 at 07:43, Mohammed Guller moham...@glassbeam.com wrote:

 Just to further clarify, you can first call coalesce with argument 1 and
 then call saveAsTextFile. For example,



 rdd.coalesce(1).saveAsTextFile(...)







 Mohammed



 *From:* Mohammed Guller
 *Sent:* Tuesday, August 4, 2015 9:39 PM
 *To:* 'Brandon White'; user
 *Subject:* RE: Combining Spark Files with saveAsTextFile



 One options is to use the coalesce method in the RDD class.



 Mohammed



 *From:* Brandon White [mailto:bwwintheho...@gmail.com
 bwwintheho...@gmail.com]
 *Sent:* Tuesday, August 4, 2015 7:23 PM
 *To:* user
 *Subject:* Combining Spark Files with saveAsTextFile



 What is the best way to make saveAsTextFile save as only a single file?



Re: Combining Spark Files with saveAsTextFile

2015-08-05 Thread Igor Berman
seems that coallesce do work, see following thread
https://www.mail-archive.com/user%40spark.apache.org/msg00928.html

On 5 August 2015 at 09:47, Igor Berman igor.ber...@gmail.com wrote:

 using coalesce might be dangerous, since 1 worker process will need to
 handle whole file and if the file is huge you'll get OOM, however it
 depends on implementation, I'm not sure how it will be done
 nevertheless, worse to try the coallesce method(please post your results)

 another option would be to use FileUtil.copyMerge which copies each
 partition one after another into destination stream(file); so as soon as
 you've written your hdfs file with spark with multiple partitions in
 parallel(as usual), you can then make another step to merge it into any
 destination you want

 On 5 August 2015 at 07:43, Mohammed Guller moham...@glassbeam.com wrote:

 Just to further clarify, you can first call coalesce with argument 1 and
 then call saveAsTextFile. For example,



 rdd.coalesce(1).saveAsTextFile(...)







 Mohammed



 *From:* Mohammed Guller
 *Sent:* Tuesday, August 4, 2015 9:39 PM
 *To:* 'Brandon White'; user
 *Subject:* RE: Combining Spark Files with saveAsTextFile



 One options is to use the coalesce method in the RDD class.



 Mohammed



 *From:* Brandon White [mailto:bwwintheho...@gmail.com
 bwwintheho...@gmail.com]
 *Sent:* Tuesday, August 4, 2015 7:23 PM
 *To:* user
 *Subject:* Combining Spark Files with saveAsTextFile



 What is the best way to make saveAsTextFile save as only a single file?





Re: About memory leak in spark 1.4.1

2015-08-04 Thread Igor Berman
sorry, can't disclose info about my prod cluster

nothing jumps into my mind regarding your config
we don't use lz4 compression, don't know what is spark.deploy.spreadOut(there
is no documentation regarding this)

If you are sure that you don't have memory leak in your business logic I
would try to reset each property to default(or just remove it from your
config) and try to run your job to see if it's not
somehow connected

my config(nothing special really)
spark.shuffle.consolidateFiles true
spark.speculation false
spark.executor.extraJavaOptions -XX:+UseStringCache
-XX:+UseCompressedStrings -XX:+PrintGC -XX:+PrintGCDetails
-XX:+PrintGCTimeStamps -Xloggc:gc.log -verbose:gc
spark.executor.logs.rolling.maxRetainedFiles 1000
spark.executor.logs.rolling.strategy time
spark.worker.cleanup.enabled true
spark.logConf true
spark.rdd.compress true





On 4 August 2015 at 12:59, Sea 261810...@qq.com wrote:

 How much machines are there in your standalone cluster?
 I am not using tachyon.

 GC can not help me... Can anyone help ?

 my configuration:

 spark.deploy.spreadOut false
 spark.eventLog.enabled true
 spark.executor.cores 24

 spark.ui.retainedJobs 10
 spark.ui.retainedStages 10
 spark.history.retainedApplications 5
 spark.deploy.retainedApplications 10
 spark.deploy.retainedDrivers  10
 spark.streaming.ui.retainedBatches 10
 spark.sql.thriftserver.ui.retainedSessions 10
 spark.sql.thriftserver.ui.retainedStatements 100

 spark.file.transferTo false
 spark.driver.maxResultSize 4g
 spark.sql.hive.metastore.jars=/spark/spark-1.4.1/hive/*

 spark.eventLog.dirhdfs://mycluster/user/spark/historylog
 spark.history.fs.logDirectory hdfs://mycluster/user/spark/historylog

 spark.driver.extraClassPath=/spark/spark-1.4.1/extlib/*
 spark.executor.extraClassPath=/spark/spark-1.4.1/extlib/*

 spark.sql.parquet.binaryAsString true
 spark.serializerorg.apache.spark.serializer.KryoSerializer
 spark.kryoserializer.buffer 32
 spark.kryoserializer.buffer.max 256
 spark.shuffle.consolidateFiles true
 spark.io.compression.codec org.apache.spark.io.LZ4CompressionCodec





 -- 原始邮件 --
 *发件人:* Igor Berman;igor.ber...@gmail.com;
 *发送时间:* 2015年8月3日(星期一) 晚上7:56
 *收件人:* Sea261810...@qq.com;
 *抄送:* Barak Gitsisbar...@similarweb.com; Ted Yuyuzhih...@gmail.com;
 user@spark.apache.orguser@spark.apache.org; rxinr...@databricks.com;
 joshrosenjoshro...@databricks.com; daviesdav...@databricks.com;
 *主题:* Re: About memory leak in spark 1.4.1

 in general, what is your configuration? use --conf spark.logConf=true

 we have 1.4.1 in production standalone cluster and haven't experienced
 what you are describing
 can you verify in web-ui that indeed spark got your 50g per executor
 limit? I mean in configuration page..

 might be you are using offheap storage(Tachyon)?


 On 3 August 2015 at 04:58, Sea 261810...@qq.com wrote:

 spark uses a lot more than heap memory, it is the expected behavior.
  It didn't exist in spark 1.3.x
 What does a lot more than means?  It means that I lose control of it!
 I try to  apply 31g, but it still grows to 55g and continues to grow!!!
 That is the point!
 I have tried set memoryFraction to 0.2,but it didn't help.
 I don't know whether it will still exist in the next release 1.5, I wish
 not.



 -- 原始邮件 --
 *发件人:* Barak Gitsis;bar...@similarweb.com;
 *发送时间:* 2015年8月2日(星期天) 晚上9:55
 *收件人:* Sea261810...@qq.com; Ted Yuyuzhih...@gmail.com;
 *抄送:* user@spark.apache.orguser@spark.apache.org; rxin
 r...@databricks.com; joshrosenjoshro...@databricks.com; davies
 dav...@databricks.com;
 *主题:* Re: About memory leak in spark 1.4.1

 spark uses a lot more than heap memory, it is the expected behavior.
 in 1.4 off-heap memory usage is supposed to grow in comparison to 1.3

 Better use as little memory as you can for heap, and since you are not
 utilizing it already, it is safe for you to reduce it.
 memoryFraction helps you optimize heap usage for your data/application
 profile while keeping it tight.






 On Sun, Aug 2, 2015 at 12:54 PM Sea 261810...@qq.com wrote:

 spark.storage.memoryFraction is in heap memory, but my situation is that
 the memory is more than heap memory !

 Anyone else use spark 1.4.1 in production?


 -- 原始邮件 --
 *发件人:* Ted Yu;yuzhih...@gmail.com;
 *发送时间:* 2015年8月2日(星期天) 下午5:45
 *收件人:* Sea261810...@qq.com;
 *抄送:* Barak Gitsisbar...@similarweb.com; user@spark.apache.org
 user@spark.apache.org; rxinr...@databricks.com; joshrosen
 joshro...@databricks.com; daviesdav...@databricks.com;
 *主题:* Re: About memory leak in spark 1.4.1

 http://spark.apache.org/docs/latest/tuning.html does mention 
 spark.storage.memoryFraction
 in two places.
 One is under Cache Size Tuning section.

 FYI

 On Sun, Aug 2, 2015 at 2:16 AM, Sea 261810...@qq.com wrote:

 Hi, Barak
 It is ok with spark 1.3.0, the problem is with spark 1.4.1.
 I don't think spark.storage.memoryFraction will make any

Re: About memory leak in spark 1.4.1

2015-08-03 Thread Igor Berman
in general, what is your configuration? use --conf spark.logConf=true

we have 1.4.1 in production standalone cluster and haven't experienced what
you are describing
can you verify in web-ui that indeed spark got your 50g per executor limit?
I mean in configuration page..

might be you are using offheap storage(Tachyon)?


On 3 August 2015 at 04:58, Sea 261810...@qq.com wrote:

 spark uses a lot more than heap memory, it is the expected behavior.
  It didn't exist in spark 1.3.x
 What does a lot more than means?  It means that I lose control of it!
 I try to  apply 31g, but it still grows to 55g and continues to grow!!!
 That is the point!
 I have tried set memoryFraction to 0.2,but it didn't help.
 I don't know whether it will still exist in the next release 1.5, I wish
 not.



 -- 原始邮件 --
 *发件人:* Barak Gitsis;bar...@similarweb.com;
 *发送时间:* 2015年8月2日(星期天) 晚上9:55
 *收件人:* Sea261810...@qq.com; Ted Yuyuzhih...@gmail.com;
 *抄送:* user@spark.apache.orguser@spark.apache.org; rxin
 r...@databricks.com; joshrosenjoshro...@databricks.com; davies
 dav...@databricks.com;
 *主题:* Re: About memory leak in spark 1.4.1

 spark uses a lot more than heap memory, it is the expected behavior.
 in 1.4 off-heap memory usage is supposed to grow in comparison to 1.3

 Better use as little memory as you can for heap, and since you are not
 utilizing it already, it is safe for you to reduce it.
 memoryFraction helps you optimize heap usage for your data/application
 profile while keeping it tight.






 On Sun, Aug 2, 2015 at 12:54 PM Sea 261810...@qq.com wrote:

 spark.storage.memoryFraction is in heap memory, but my situation is that
 the memory is more than heap memory !

 Anyone else use spark 1.4.1 in production?


 -- 原始邮件 --
 *发件人:* Ted Yu;yuzhih...@gmail.com;
 *发送时间:* 2015年8月2日(星期天) 下午5:45
 *收件人:* Sea261810...@qq.com;
 *抄送:* Barak Gitsisbar...@similarweb.com; user@spark.apache.org
 user@spark.apache.org; rxinr...@databricks.com; joshrosen
 joshro...@databricks.com; daviesdav...@databricks.com;
 *主题:* Re: About memory leak in spark 1.4.1

 http://spark.apache.org/docs/latest/tuning.html does mention 
 spark.storage.memoryFraction
 in two places.
 One is under Cache Size Tuning section.

 FYI

 On Sun, Aug 2, 2015 at 2:16 AM, Sea 261810...@qq.com wrote:

 Hi, Barak
 It is ok with spark 1.3.0, the problem is with spark 1.4.1.
 I don't think spark.storage.memoryFraction will make any sense,
 because it is still in heap memory.


 -- 原始邮件 --
 *发件人:* Barak Gitsis;bar...@similarweb.com;
 *发送时间:* 2015年8月2日(星期天) 下午4:11
 *收件人:* Sea261810...@qq.com; useruser@spark.apache.org;
 *抄送:* rxinr...@databricks.com; joshrosenjoshro...@databricks.com;
 daviesdav...@databricks.com;
 *主题:* Re: About memory leak in spark 1.4.1

 Hi,
 reducing spark.storage.memoryFraction did the trick for me. Heap
 doesn't get filled because it is reserved..
 My reasoning is:
 I give executor all the memory i can give it, so that makes it a
 boundary.
 From here i try to make the best use of memory I can.
 storage.memoryFraction is in a sense user data space.  The rest can be used
 by the system.
 If you don't have so much data that you MUST store in memory for
 performance, better give spark more space..
 ended up setting it to 0.3

 All that said, it is on spark 1.3 on cluster

 hope that helps

 On Sat, Aug 1, 2015 at 5:43 PM Sea 261810...@qq.com wrote:

 Hi, all
 I upgrage spark to 1.4.1, many applications failed... I find the heap
 memory is not full , but the process of CoarseGrainedExecutorBackend will
 take more memory than I expect, and it will increase as time goes on,
 finally more than max limited of the server, the worker will die.

 Any can help?

 Mode:standalone

 spark.executor.memory 50g

 25583 xiaoju20   0 75.5g  55g  28m S 1729.3 88.1   2172:52 java

 55g more than 50g I apply

 --
 *-Barak*


 --
 *-Barak*



Re: How to increase parallelism of a Spark cluster?

2015-08-02 Thread Igor Berman
What kind of cluster? How many cores on each worker? Is there config for
http solr client? I remember standard httpclient has limit per route/host.
On Aug 2, 2015 8:17 PM, Sujit Pal sujitatgt...@gmail.com wrote:

 No one has any ideas?

 Is there some more information I should provide?

 I am looking for ways to increase the parallelism among workers. Currently
 I just see number of simultaneous connections to Solr equal to the number
 of workers. My number of partitions is (2.5x) larger than number of
 workers, and the workers seem to be large enough to handle more than one
 task at a time.

 I am creating a single client per partition in my mapPartition call. Not
 sure if that is creating the gating situation? Perhaps I should use a Pool
 of clients instead?

 Would really appreciate some pointers.

 Thanks in advance for any help you can provide.

 -sujit


 On Fri, Jul 31, 2015 at 1:03 PM, Sujit Pal sujitatgt...@gmail.com wrote:

 Hello,

 I am trying to run a Spark job that hits an external webservice to get
 back some information. The cluster is 1 master + 4 workers, each worker has
 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server,
 and is accessed using code similar to that shown below.

 def getResults(keyValues: Iterator[(String, Array[String])]):
 Iterator[(String, String)] = {
 val solr = new HttpSolrClient()
 initializeSolrParameters(solr)
 keyValues.map(keyValue = (keyValue._1, process(solr, keyValue)))
 }
 myRDD.repartition(10)

  .mapPartitions(keyValues = getResults(keyValues))


 The mapPartitions does some initialization to the SolrJ client per
 partition and then hits it for each record in the partition via the
 getResults() call.

 I repartitioned in the hope that this will result in 10 clients hitting
 Solr simultaneously (I would like to go upto maybe 30-40 simultaneous
 clients if I can). However, I counted the number of open connections using
 netstat -anp | grep :8983.*ESTABLISHED in a loop on the Solr box and
 observed that Solr has a constant 4 clients (ie, equal to the number of
 workers) over the lifetime of the run.

 My observation leads me to believe that each worker processes a single
 stream of work sequentially. However, from what I understand about how
 Spark works, each worker should be able to process number of tasks
 parallelly, and that repartition() is a hint for it to do so.

 Is there some SparkConf environment variable I should set to increase
 parallelism in these workers, or should I just configure a cluster with
 multiple workers per machine? Or is there something I am doing wrong?

 Thank you in advance for any pointers you can provide.

 -sujit





Re: How to increase parallelism of a Spark cluster?

2015-08-02 Thread Igor Berman
so how many cores you configure per node?
do u have something like total-executor-cores or maybe
--num-executors config(I'm
not sure what kind of cluster databricks platform provides, if it's
standalone then first option should be used)? if you have 4 cores at total,
then even though you have 4 cores per machine only 1 is working on each
machine...which could be a cause.
another option - you are hitting some default config of limiting number of
concurrent routes or max total connection from jvm,
look at
https://hc.apache.org/httpcomponents-client-ga/tutorial/html/connmgmt.html
 (assuming you are using HttpClient from 4.x and not 3.x version)
not sure what are the defaults...



On 2 August 2015 at 23:42, Sujit Pal sujitatgt...@gmail.com wrote:

 Hi Igor,

 The cluster is a Databricks Spark cluster. It consists of 1 master + 4
 workers, each worker has 60GB RAM and 4 CPUs. The original mail has some
 more details (also the reference to the HttpSolrClient in there should be
 HttpSolrServer, sorry about that, mistake while writing the email).

 There is no additional configuration on the external Solr host from my
 code, I am using the default HttpClient provided by HttpSolrServer.
 According to the Javadocs, you can pass in a HttpClient object as well. Is
 there some specific configuration you would suggest to get past any limits?

 On another project, I faced a similar problem but I had more leeway (was
 using a Spark cluster from EC2) and less time, my workaround was to use
 python multiprocessing to create a program that started up 30 python
 JSON/HTTP clients and wrote output into 30 output files, which were then
 processed by Spark. Reason I mention this is that I was using default
 configurations there as well, just needed to increase the number of
 connections against Solr to a higher number.

 This time round, I would like to do this through Spark because it makes
 the pipeline less complex.

 -sujit


 On Sun, Aug 2, 2015 at 10:52 AM, Igor Berman igor.ber...@gmail.com
 wrote:

 What kind of cluster? How many cores on each worker? Is there config for
 http solr client? I remember standard httpclient has limit per route/host.
 On Aug 2, 2015 8:17 PM, Sujit Pal sujitatgt...@gmail.com wrote:

 No one has any ideas?

 Is there some more information I should provide?

 I am looking for ways to increase the parallelism among workers.
 Currently I just see number of simultaneous connections to Solr equal to
 the number of workers. My number of partitions is (2.5x) larger than number
 of workers, and the workers seem to be large enough to handle more than one
 task at a time.

 I am creating a single client per partition in my mapPartition call. Not
 sure if that is creating the gating situation? Perhaps I should use a Pool
 of clients instead?

 Would really appreciate some pointers.

 Thanks in advance for any help you can provide.

 -sujit


 On Fri, Jul 31, 2015 at 1:03 PM, Sujit Pal sujitatgt...@gmail.com
 wrote:

 Hello,

 I am trying to run a Spark job that hits an external webservice to get
 back some information. The cluster is 1 master + 4 workers, each worker has
 60GB RAM and 4 CPUs. The external webservice is a standalone Solr server,
 and is accessed using code similar to that shown below.

 def getResults(keyValues: Iterator[(String, Array[String])]):
 Iterator[(String, String)] = {
 val solr = new HttpSolrClient()
 initializeSolrParameters(solr)
 keyValues.map(keyValue = (keyValue._1, process(solr, keyValue)))
 }
 myRDD.repartition(10)

  .mapPartitions(keyValues = getResults(keyValues))


 The mapPartitions does some initialization to the SolrJ client per
 partition and then hits it for each record in the partition via the
 getResults() call.

 I repartitioned in the hope that this will result in 10 clients hitting
 Solr simultaneously (I would like to go upto maybe 30-40 simultaneous
 clients if I can). However, I counted the number of open connections using
 netstat -anp | grep :8983.*ESTABLISHED in a loop on the Solr box and
 observed that Solr has a constant 4 clients (ie, equal to the number of
 workers) over the lifetime of the run.

 My observation leads me to believe that each worker processes a single
 stream of work sequentially. However, from what I understand about how
 Spark works, each worker should be able to process number of tasks
 parallelly, and that repartition() is a hint for it to do so.

 Is there some SparkConf environment variable I should set to increase
 parallelism in these workers, or should I just configure a cluster with
 multiple workers per machine? Or is there something I am doing wrong?

 Thank you in advance for any pointers you can provide.

 -sujit






Re: Too many open files

2015-07-29 Thread Igor Berman
you probably should increase file handles limit for user that all processes
are running with(spark master  workers)
e.g.
http://www.cyberciti.biz/faq/linux-increase-the-maximum-number-of-open-files/

On 29 July 2015 at 18:39, saif.a.ell...@wellsfargo.com wrote:

  Hello,

 I’ve seen a couple emails on this issue but could not find anything to
 solve my situation.

 Tried to reduce the partitioning level, enable consolidateFiles and
 increase the sizeInFlight limit, but still no help. Spill manager is sort,
 which is the default, any advice?

 15/07/29 10:37:01 WARN TaskSetManager: Lost task 34.0 in stage 11.0 (TID
 331, localhost): FetchFailed(BlockManagerId(driver, localhost, 43437),
 shuffleId=3, mapId=0, reduceId=34, message=
 org.apache.spark.shuffle.FetchFailedException:
 /tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/0d/shuffle_3_0_0.index
 (Too many open files)
 ..
 ..
 15/07/29 10:37:01 INFO Executor: Executor is trying to kill task 9.0 in
 stage 11.0 (TID 306)
 org.apache.spark.SparkException: Job aborted due to stage failure: Task 20
 in stage 11.0 failed 1 times, most recent failure: Lost task 20.0 in stage
 11.0 (TID 317, localhost): java.io.FileNotFoundException:
 /tmp/spark-71109b28-0f89-4e07-a521-5ff0a943472a/blockmgr-eda0751d-fd21-4229-93b0-2ee2546edf5a/1b/temp_shuffle_a3a9815a-677a-4342-94a2-1e083d758bcc
 (Too many open files)

 my fs is ext4 and currently ulist –n is 1024

 Thanks
 Saif




Re: Spark Number of Partitions Recommendations

2015-07-29 Thread Igor Berman
imho, you need to take into account size of your data too
if your cluster is relatively small, you may cause memory pressure on your
executors if trying to repartition to some #cores connected number of
partitions

better to take some max between initial number of partitions(assuming your
data is on hdfs with 64Mb block size) and between number you get from your
formula



On 29 July 2015 at 12:31, ponkin alexey.pon...@ya.ru wrote:

 Hi Rahul,

 Where did you see such a recommendation?
 I personally define partitions with the following formula

 partitions = nextPrimeNumberAbove( K*(--num-executors * --executor-cores )
 )

 where
 nextPrimeNumberAbove(x) - prime number which is greater than x
 K - multiplicator  to calculate start with 1 and encrease untill join
 perfomance start to degrade




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Number-of-Partitions-Recommendations-tp24022p24059.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Is spark suitable for real time query

2015-07-22 Thread Igor Berman
you can use spark rest job server(or any other solution that provides long
running spark context) so that you won't pay this bootstrap time on each
query
in addition : if you have some rdd that u want your queries to be executed
on, you can cache this rdd in memory(depends on ur cluster memory size) so
that you wont pay reading from disk time


On 22 July 2015 at 14:46, Louis Hust louis.h...@gmail.com wrote:

 I do a simple test using spark in standalone mode(not cluster),
  and found a simple action take a few seconds, the data size is small,
 just few rows.
 So each spark job will cost some time for init or prepare work no matter
 what the job is?
 I mean if the basic framework of spark job will cost seconds?

 2015-07-22 19:17 GMT+08:00 Robin East robin.e...@xense.co.uk:

 Real-time is, of course, relative but you’ve mentioned microsecond level.
 Spark is designed to process large amounts of data in a distributed
 fashion. No distributed system I know of could give any kind of guarantees
 at the microsecond level.

 Robin

  On 22 Jul 2015, at 11:14, Louis Hust louis.h...@gmail.com wrote:
 
  Hi, all
 
  I am using spark jar in standalone mode, fetch data from different
 mysql instance and do some action, but i found the time is at second level.
 
  So i want to know if spark job is suitable for real time query which at
 microseconds?





Re: Create RDD from output of unix command

2015-07-14 Thread Igor Berman
haven't you thought about spark streaming? there is thread that could help
https://www.mail-archive.com/user%40spark.apache.org/msg30105.html

On 14 July 2015 at 18:20, Hafsa Asif hafsa.a...@matchinguu.com wrote:

 Your question is very interesting. What I suggest is, that copy your output
 in some text file. Read text file in your code and apply RDD. Just consider
 wordcount example by Spark. I love this example with Java client. Well,
 Spark is an analytical engine and it has a slogan to analyze big big data
 so
 from my point of view your assumption is wrong.

 You can also save your data in any respository in some structured form.
 This
 will give you more exposure of Spark behavior.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Create-RDD-from-output-of-unix-command-tp23723p23830.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Dependency Injection with Spark Java

2015-06-26 Thread Igor Berman
asked myself same question today...actually depends on what you are trying
to do
if you want injection into workers code I think it will be a bit hard...
if only in code that driver executes i.e. in main, it's straight forward
imho, just create your classes from injector(e.g. spring's application
context)

On 26 June 2015 at 15:49, Michal Čizmazia mici...@gmail.com wrote:

 How to use Dependency Injection with Spark Java? Please could you point me
 to any articles/frameworks?

 Thanks!



Re: Spark standalone cluster - resource management

2015-06-23 Thread Igor Berman
probably there are already running jobs there
in addition, memory is also a resource, so if you are running 1 application
that took all your memory and then you are trying to run another
application that asks
for the memory the cluster doesn't have then the second app wont be running

so why are u specifying 22g as executor memory? how much memory you have
for each machine?

On 23 June 2015 at 09:33, nizang ni...@windward.eu wrote:

 to give a bit more data on what I'm trying to get -

 I have many tasks I want to run in parallel, so I want each task to catch
 small part of the cluster (- only limited part of my 20 cores in the
 cluster)

 I have important tasks that I want them to get 10 cores, and I have small
 tasks that I want to run with only 1 or 2 cores)



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-cluster-resource-management-tp23444p23445.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: How to set KryoRegistrator class in spark-shell

2015-06-11 Thread Igor Berman
Another option would be to close sc and open new context with your custom
configuration
On Jun 11, 2015 01:17, bhomass bhom...@gmail.com wrote:

 you need to register using spark-default.xml as explained here


 https://books.google.com/books?id=WE_GBwAAQBAJpg=PA239lpg=PA239dq=spark+shell+register+kryo+serializationsource=blots=vCxgEfz1-2sig=dHU8FY81zVoBqYIJbCFuRwyFjAwhl=ensa=Xved=0CEwQ6AEwB2oVChMIn_iujpCGxgIVDZmICh3kYADW#v=onepageq=spark%20shell%20register%20kryo%20serializationf=false



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-KryoRegistrator-class-in-spark-shell-tp12498p23265.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: SparkContext Threading

2015-06-05 Thread Igor Berman
+1 to question about serializaiton. SparkContext is still in driver
process(even if it has several threads from which you submit jobs)
as for the problem, check your classpath, scala version, spark version etc.
such errors usually happens when there is some conflict in classpath. Maybe
you compiled your jar with different versions?

On 5 June 2015 at 21:55, Lee McFadden splee...@gmail.com wrote:

 You can see an example of the constructor for the class which executes a
 job in my opening post.

 I'm attempting to instantiate and run the class using the code below:

 ```
 val conf = new SparkConf()
   .setAppName(appNameBase.format(Test))

 val connector = CassandraConnector(conf)

 val sc = new SparkContext(conf)

 // Set up the threadpool for running Jobs.
 val pool = Executors.newFixedThreadPool(10)

 pool.execute(new SecondRollup(sc, connector, start))
 ```

 There is some surrounding code that then waits for all the jobs entered
 into the thread pool to complete, although it's not really required at the
 moment as I am only submitting one job until I get this issue straightened
 out :)

 Thanks,

 Lee

 On Fri, Jun 5, 2015 at 11:50 AM Marcelo Vanzin van...@cloudera.com
 wrote:

 On Fri, Jun 5, 2015 at 11:48 AM, Lee McFadden splee...@gmail.com wrote:

 Initially I had issues passing the SparkContext to other threads as it
 is not serializable.  Eventually I found that adding the @transient
 annotation prevents a NotSerializableException.


 This is really puzzling. How are you passing the context around that you
 need to do serialization?

 Threads run all in the same process so serialization should not be needed
 at all.

 --
 Marcelo




Re: SparkContext Threading

2015-06-05 Thread Igor Berman
Lee, what cluster do you use? standalone, yarn-cluster, yarn-client, mesos?
in yarn-cluster the driver program is executed inside one of nodes in
cluster, so might be that driver code needs to be serialized to be sent to
some node

On 5 June 2015 at 22:55, Lee McFadden splee...@gmail.com wrote:


 On Fri, Jun 5, 2015 at 12:30 PM Marcelo Vanzin van...@cloudera.com
 wrote:

 Ignoring the serialization thing (seems like a red herring):


 People seem surprised that I'm getting the Serialization exception at all
 - I'm not convinced it's a red herring per se, but on to the blocking
 issue...




 You might be using this Cassandra library with an incompatible version of
 Spark; the `TaskMetrics` class has changed in the past, and the method it's
 looking for does not exist at least in 1.4.


 You are correct, I was being a bone head.  We recently downgraded to Spark
 1.2.1 and I was running the compiled jar using Spark 1.3.1 on my local
 machine.  Running the job with threading on my 1.2.1 cluster worked.  Thank
 you for finding the obvious mistake :)

 Regarding serialization, I'm still confused as to why I was getting a
 serialization error in the first place as I'm executing these Runnable
 classes from a java thread pool.  I'm fairly new to Scala/JVM world and
 there doesn't seem to be any Spark documentation to explain *why* I need to
 declare the sc variable as @transient (or even that I should).

 I was under the impression that objects only need to be serializable when
 they are sent over the network, and that doesn't seem to be occurring as
 far as I can tell.

 Apologies if this is simple stuff, but I don't like fixing things
 without knowing the full reason why the changes I made fixed things :)

 Thanks again for your time!



Re: Managing spark processes via supervisord

2015-06-03 Thread Igor Berman
assuming you are talking about standalone cluster
imho, with workers you won't get any problems and it's straightforward
since they are usually foreground processes
with master it's a bit more complicated, ./sbin/start-master.sh goes
background which is not good for supervisor, but anyway I think it's
doable(going to setup it too in a few days)

On 3 June 2015 at 21:46, Mike Trienis mike.trie...@orcsol.com wrote:

 Hi All,

 I am curious to know if anyone has successfully deployed a spark cluster
 using supervisord?

- http://supervisord.org/

 Currently I am using the cluster launch scripts which are working greater,
 however, every time I reboot my VM or development environment I need to
 re-launch the cluster.

 I am considering using supervisord to control all the processes (worker,
 master, ect.. ) in order to have the cluster up an running after boot-up;
 although I'd like to understand if it will cause more issues than it
 solves.

 Thanks, Mike.




Re: union and reduceByKey wrong shuffle?

2015-06-02 Thread Igor Berman
Hi,
small mock data doesn't reproduce the problem. IMHO problem is reproduced
when we make shuffle big enough to split data into disk.
We will work on it to understand and reproduce the problem(not first
priority though...)


On 1 June 2015 at 23:02, Josh Rosen rosenvi...@gmail.com wrote:

 How much work is to produce a small standalone reproduction?  Can you
 create an Avro file with some mock data, maybe 10 or so records, then
 reproduce this locally?

 On Mon, Jun 1, 2015 at 12:31 PM, Igor Berman igor.ber...@gmail.com
 wrote:

 switching to use simple pojos instead of using avro for spark
 serialization solved the problem(I mean reading avro from s3 and than
 mapping each avro object to it's pojo serializable counterpart with same
 fields, pojo is registered withing kryo)
 Any thought where to look for a problem/misconfiguration?

 On 31 May 2015 at 22:48, Igor Berman igor.ber...@gmail.com wrote:

 Hi
 We are using spark 1.3.1
 Avro-chill (tomorrow will check if its important) we register avro
 classes from java
 Avro 1.7.6
 On May 31, 2015 22:37, Josh Rosen rosenvi...@gmail.com wrote:

 Which Spark version are you using?  I'd like to understand whether this
 change could be caused by recent Kryo serializer re-use changes in master /
 Spark 1.4.

 On Sun, May 31, 2015 at 11:31 AM, igor.berman igor.ber...@gmail.com
 wrote:

 after investigation the problem is somehow connected to avro
 serialization
 with kryo + chill-avro(mapping avro object to simple scala case class
 and
 running reduce on these case class objects solves the problem)




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/union-and-reduceByKey-wrong-shuffle-tp23092p23093.html
 Sent from the Apache Spark User List mailing list archive at
 Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org







  1   2   >