Executors killed in Workers with Error: invalid log directory

2016-06-22 Thread Yiannis Gkoufas
Hi there,

I have been getting a strange error in spark-1.6.1
The job submitted uses only the executor launched on the Master node while
the other workers are idle.
When I check the errors from the web ui to investigate on the killed
executors I see the error:
Error: invalid log directory
/disk/spark-1.6.1-bin-hadoop2.6/work/app-20160622195714-0001/2/

I made sure I gave the correct permissions on the directory, plus the
Worker logs are clean.
Any hints on that?

Thanks a lot!


Re: Spark Metrics Framework?

2016-04-01 Thread Yiannis Gkoufas
Hi Mike,

I am forwarding you a mail I sent a while ago regarding some related work I
did, hope you find it useful

Hi all,

I recently sent to the dev mailing list about this contribution, but I
thought it might be useful to post it here, since I have seen a lot of
people asking about OS-level metrics of Spark. This is the result of the
work we have been doing recently in IBM Research around Spark.

Essentially, we have extended Spark metrics system to utilize Hyperic Sigar
library to capture OS-level metrics and modified the Web UI to visualize
those metrics per application.

The above functionalities can be configured in the metrics.properties and
spark-defaults.conf files.

We have recorded a small demo that shows those capabilities which you can
find here :https
://
ibm.app.box.com

/s/vyaedlyb444a4zna1215c7puhxliqxdg


There is a blog post which gives more details on the functionality here:
www.spark.tc

/

sparkoscope-enabling-spark-optimization-through-

cross-stack-monitoring-and-visualization-2

/


and also there is a public repo where anyone can try it: https
://
github.com
/
ibm-research-ireland
/
sparkoscope


Hope someone finds it useful!

Thanks a lot!

Yiannis
On 1 Apr 2016 19:10, "Mike Sukmanowsky"  wrote:

> Thanks Silvio, JIRA submitted
> https://issues.apache.org/jira/browse/SPARK-14332.
>
> On Fri, 25 Mar 2016 at 12:46 Silvio Fiorito 
> wrote:
>
>> Hi Mike,
>>
>> Sorry got swamped with work and didn’t get a chance to reply.
>>
>> I misunderstood what you were trying to do. I thought you were just
>> looking to create custom metrics vs looking for the existing Hadoop Output
>> Format counters.
>>
>> I’m not familiar enough with the Hadoop APIs but I think it would require
>> a change to the SparkHadoopWriter
>> 
>> class since it generates the JobContext which is required to read the
>> counters. Then it could publish the counters to the Spark metrics system.
>>
>> I would suggest going ahead and submitting a JIRA request if there isn’t
>> one already.
>>
>> Thanks,
>> Silvio
>>
>> From: Mike Sukmanowsky 
>> Date: Friday, March 25, 2016 at 10:48 AM
>>
>> To: Silvio Fiorito , "
>> user@spark.apache.org" 
>> Subject: Re: Spark Metrics Framework?
>>
>> Pinging again - any thoughts?
>>
>> On Wed, 23 Mar 2016 at 09:17 Mike Sukmanowsky 
>> wrote:
>>
>>> Thanks Ted and Silvio. I think I'll need a bit more hand holding here,
>>> sorry. The way we use ES Hadoop is in pyspark via
>>> org.elasticsearch.hadoop.mr.EsOutputFormat in a saveAsNewAPIHadoopFile
>>> call. Given the Hadoop interop, I wouldn't assume that the EsOutputFormat
>>> class
>>> 
>>> could be modified to define a new Source and register it via
>>> MetricsSystem.createMetricsSystem. This feels like a good feature request
>>> for Spark actually: "Support Hadoop Counters in Input/OutputFormats as
>>> Spark metrics" but I wanted some feedback first to see if that makes sense.
>>>
>>> That said, some of the custom RDD classes
>>> 
>>>  could
>>> probably be modified to register a new Source when they perform
>>> reading/writing from/to Elasticsearch.
>>>
>>> On Tue, 22 Mar 2016 at 15:17 Silvio Fiorito <
>>> silvio.fior...@granturing.com> wrote:
>>>
 Hi Mike,

 It’s been a while since I worked on a custom Source but I think all you
 need to do is make your Source 

Re: spark metrics question

2016-02-03 Thread Yiannis Gkoufas
Hi Matt,

there is some related work I recently did in IBM Research for visualizing
the metrics produced.
You can read about it here
http://www.spark.tc/sparkoscope-enabling-spark-optimization-through-cross-stack-monitoring-and-visualization-2/
We recently opensourced it if you are interested to have a deeper look to
it: https://github.com/ibm-research-ireland/sparkoscope

Thanks,
Yiannis

On 3 February 2016 at 13:32, Matt K  wrote:

> Hi guys,
>
> I'm looking to create a custom sync based on Spark's Metrics System:
>
> https://github.com/apache/spark/blob/9f603fce78fcc997926e9a72dec44d48cbc396fc/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
>
> If I want to collect metrics from the Driver, Master, and Executor nodes,
> should the jar with the custom class be installed on Driver, Master, and
> Executor nodes?
>
> Also, on Executor nodes, does the MetricsSystem run inside the Executor's
> JVM?
>
> Thanks,
> -Matt
>


Re: spark metrics question

2016-02-03 Thread Yiannis Gkoufas
Hi Matt,

does the custom class you want to package reports metrics of each Executor?

Thanks

On 3 February 2016 at 15:56, Matt K <matvey1...@gmail.com> wrote:

> Thanks for sharing Yiannis, looks very promising!
>
> Do you know if I can package a custom class with my application, or does
> it have to be pre-deployed on all Executor nodes?
>
> On Wed, Feb 3, 2016 at 10:36 AM, Yiannis Gkoufas <johngou...@gmail.com>
> wrote:
>
>> Hi Matt,
>>
>> there is some related work I recently did in IBM Research for visualizing
>> the metrics produced.
>> You can read about it here
>> http://www.spark.tc/sparkoscope-enabling-spark-optimization-through-cross-stack-monitoring-and-visualization-2/
>> We recently opensourced it if you are interested to have a deeper look to
>> it: https://github.com/ibm-research-ireland/sparkoscope
>>
>> Thanks,
>> Yiannis
>>
>> On 3 February 2016 at 13:32, Matt K <matvey1...@gmail.com> wrote:
>>
>>> Hi guys,
>>>
>>> I'm looking to create a custom sync based on Spark's Metrics System:
>>>
>>> https://github.com/apache/spark/blob/9f603fce78fcc997926e9a72dec44d48cbc396fc/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
>>>
>>> If I want to collect metrics from the Driver, Master, and Executor
>>> nodes, should the jar with the custom class be installed on Driver, Master,
>>> and Executor nodes?
>>>
>>> Also, on Executor nodes, does the MetricsSystem run inside the
>>> Executor's JVM?
>>>
>>> Thanks,
>>> -Matt
>>>
>>
>>
>
>
> --
> www.calcmachine.com - easy online calculator.
>


SparkOscope: Enabling Spark Optimization through Cross-stack Monitoring and Visualization

2016-02-03 Thread Yiannis Gkoufas
Hi all,

I recently sent to the dev mailing list about this contribution, but I
thought it might be useful to post it here, since I have seen a lot of
people asking about OS-level metrics of Spark. This is the result of the
work we have been doing recently in IBM Research around Spark.
Essentially, we have extended Spark metrics system to utilize Hyperic Sigar
library to capture OS-level metrics and modified the Web UI to visualize
those metrics per application.
The above functionalities can be configured in the metrics.properties and
spark-defaults.conf files.
We have recorded a small demo that shows those capabilities which you can
find here : https://ibm.app.box.com/s/vyaedlyb444a4zna1215c7puhxliqxdg
There is a blog post which gives more details on the functionality here:
www.spark.tc/sparkoscope-enabling-spark-optimization-through-cross-stack-monitoring-and-visualization-2/
and also there is a public repo where anyone can try it:
https://github.com/ibm-research-ireland/sparkoscope

Hope someone finds it useful!

Thanks a lot!
Yiannis


Trying to understand dynamic resource allocation

2016-01-11 Thread Yiannis Gkoufas
Hi,

I am exploring a bit the dynamic resource allocation provided by the
Standalone Cluster Mode and I was wondering whether this behavior I am
experiencing is expected.
In my configuration I have 3 slaves with 24 cores each.
I have in my spark-defaults.conf:

spark.shuffle.service.enabled true
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 1
spark.dynamicAllocation.maxExecutors 6
spark.executor.cores 4

When I submit a first Job it takes up all of the 72 cores of the cluster.
When I submit the second Job while the first one is running I get the error:

Initial job has not accepted any resources; check your cluster UI to ensure
that workers are registered and have sufficient resources

Is this the expected behavior?

Thanks a lot


Networking problems in Spark 1.6.0

2016-01-05 Thread Yiannis Gkoufas
Hi there,

I have been using Spark 1.5.2 on my cluster without a problem and wanted to
try Spark 1.6.0.
I have the exact same configuration on both clusters.
I am able to start the Standalone Cluster but I fail to submit a job
getting errors like the following:

16/01/05 14:24:14 INFO AppClient$ClientEndpoint: Connecting to master
spark://my-ip:7077...
16/01/05 14:24:34 INFO AppClient$ClientEndpoint: Connecting to master
spark://my-ip:7077...
16/01/05 14:24:54 INFO AppClient$ClientEndpoint: Connecting to master
spark://my-ip:7077...
16/01/05 14:24:54 INFO AppClient$ClientEndpoint: Connecting to master
spark://my-ip:7077...
16/01/05 14:24:54 WARN TransportChannelHandler: Exception in connection
from my-ip/X.XXX.XX.XX:7077
java.lang.NoSuchMethodError:
java.util.concurrent.ConcurrentHashMap.keySet()Ljava/util/concurrent/ConcurrentHashMap$KeySetView;
at org.apache.spark.rpc.netty.Dispatcher.postToAll(Dispatcher.scala:106)
at
org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:586)
at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:577)
at
org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:170)
at
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:104)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)

Has anyone else had similar problems?

Thanks a lot


Re: Networking problems in Spark 1.6.0

2016-01-05 Thread Yiannis Gkoufas
Yes, that was the case, the app was built with java 8.
But that was the case with Spark 1.5.2 as well and it didn't complain.

On 5 January 2016 at 16:40, Dean Wampler <deanwamp...@gmail.com> wrote:

> ​Still, it would be good to know what happened exactly. Why did the netty
> dependency expect Java 8?  Did you build your app on a machine with Java 8
> and deploy on a Java 7 machine?​
>
> Anyway, I played with the 1.6.0 spark-shell using Java 7 and it worked
> fine. I also looked at the distribution's class files using e.g.,
>
> $ cd $HOME/spark/spark-1.6.0-bin-hadoop2.6
> ​$
>  jar xf lib/spark-assembly-1.6.0-hadoop2.6.0.jar
> org/apache/spark/rpc/netty/Dispatcher.class
> $ javap -classpath . -verbose org.apache.spark.rpc.netty.Dispatcher | grep
> version
>   minor version: 0
>   major version: 50
>
> So, it was compiled with Java 6 (see
> https://en.wikipedia.org/wiki/Java_class_file). So, it doesn't appear to
> be a Spark build issue.
>
> dean
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
> Typesafe <http://typesafe.com>
> @deanwampler <http://twitter.com/deanwampler>
> http://polyglotprogramming.com
>
> On Tue, Jan 5, 2016 at 9:01 AM, Yiannis Gkoufas <johngou...@gmail.com>
> wrote:
>
>> Hi Dean,
>>
>> thanks so much for the response! It works without a problem now!
>>
>> On 5 January 2016 at 14:33, Dean Wampler <deanwamp...@gmail.com> wrote:
>>
>>> ConcurrentHashMap.keySet() returning a KeySetView is a Java 8 method.
>>> The Java 7 method returns a Set. Are you running Java 7? What happens if
>>> you run Java 8?
>>>
>>> Dean Wampler, Ph.D.
>>> Author: Programming Scala, 2nd Edition
>>> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
>>> Typesafe <http://typesafe.com>
>>> @deanwampler <http://twitter.com/deanwampler>
>>> http://polyglotprogramming.com
>>>
>>> On Tue, Jan 5, 2016 at 8:29 AM, Yiannis Gkoufas <johngou...@gmail.com>
>>> wrote:
>>>
>>>> Hi there,
>>>>
>>>> I have been using Spark 1.5.2 on my cluster without a problem and
>>>> wanted to try Spark 1.6.0.
>>>> I have the exact same configuration on both clusters.
>>>> I am able to start the Standalone Cluster but I fail to submit a job
>>>> getting errors like the following:
>>>>
>>>> 16/01/05 14:24:14 INFO AppClient$ClientEndpoint: Connecting to master
>>>> spark://my-ip:7077...
>>>> 16/01/05 14:24:34 INFO AppClient$ClientEndpoint: Connecting to master
>>>> spark://my-ip:7077...
>>>> 16/01/05 14:24:54 INFO AppClient$ClientEndpoint: Connecting to master
>>>> spark://my-ip:7077...
>>>> 16/01/05 14:24:54 INFO AppClient$ClientEndpoint: Connecting to master
>>>> spark://my-ip:7077...
>>>> 16/01/05 14:24:54 WARN TransportChannelHandler: Exception in connection
>>>> from my-ip/X.XXX.XX.XX:7077
>>>> java.lang.NoSuchMethodError:
>>>> java.util.concurrent.ConcurrentHashMap.keySet()Ljava/util/concurrent/ConcurrentHashMap$KeySetView;
>>>> at org.apache.spark.rpc.netty.Dispatcher.postToAll(Dispatcher.scala:106)
>>>> at
>>>> org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:586)
>>>> at
>>>> org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:577)
>>>> at
>>>> org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:170)
>>>> at
>>>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:104)
>>>> at
>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
>>>> at
>>>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>>>> at
>>>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>>> at
>>>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>>>> at
>>>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>>>> at
>>>> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
>>>> at
>>>> io.netty.channel.AbstractCha

Re: Networking problems in Spark 1.6.0

2016-01-05 Thread Yiannis Gkoufas
Hi Dean,

thanks so much for the response! It works without a problem now!

On 5 January 2016 at 14:33, Dean Wampler <deanwamp...@gmail.com> wrote:

> ConcurrentHashMap.keySet() returning a KeySetView is a Java 8 method. The
> Java 7 method returns a Set. Are you running Java 7? What happens if you
> run Java 8?
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
> <http://shop.oreilly.com/product/0636920033073.do> (O'Reilly)
> Typesafe <http://typesafe.com>
> @deanwampler <http://twitter.com/deanwampler>
> http://polyglotprogramming.com
>
> On Tue, Jan 5, 2016 at 8:29 AM, Yiannis Gkoufas <johngou...@gmail.com>
> wrote:
>
>> Hi there,
>>
>> I have been using Spark 1.5.2 on my cluster without a problem and wanted
>> to try Spark 1.6.0.
>> I have the exact same configuration on both clusters.
>> I am able to start the Standalone Cluster but I fail to submit a job
>> getting errors like the following:
>>
>> 16/01/05 14:24:14 INFO AppClient$ClientEndpoint: Connecting to master
>> spark://my-ip:7077...
>> 16/01/05 14:24:34 INFO AppClient$ClientEndpoint: Connecting to master
>> spark://my-ip:7077...
>> 16/01/05 14:24:54 INFO AppClient$ClientEndpoint: Connecting to master
>> spark://my-ip:7077...
>> 16/01/05 14:24:54 INFO AppClient$ClientEndpoint: Connecting to master
>> spark://my-ip:7077...
>> 16/01/05 14:24:54 WARN TransportChannelHandler: Exception in connection
>> from my-ip/X.XXX.XX.XX:7077
>> java.lang.NoSuchMethodError:
>> java.util.concurrent.ConcurrentHashMap.keySet()Ljava/util/concurrent/ConcurrentHashMap$KeySetView;
>> at org.apache.spark.rpc.netty.Dispatcher.postToAll(Dispatcher.scala:106)
>> at
>> org.apache.spark.rpc.netty.NettyRpcHandler.internalReceive(NettyRpcEnv.scala:586)
>> at
>> org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:577)
>> at
>> org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:170)
>> at
>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:104)
>> at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
>> at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>> at
>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>> at
>> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>> at
>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>> at
>> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>> at
>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>> at
>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>> at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>> at
>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> Has anyone else had similar problems?
>>
>> Thanks a lot
>>
>>
>


Avoid Shuffling on Partitioned Data

2015-12-04 Thread Yiannis Gkoufas
Hi there,

I have my data stored in HDFS partitioned by month in Parquet format.
The directory looks like this:

-month=201411
-month=201412
-month=201501
-

I want to compute some aggregates for every timestamp.
How is it possible to achieve that by taking advantage of the existing
partitioning?
One naive way I am thinking is issuing multiple sql queries:

SELECT * FROM TABLE WHERE month=201411
SELECT * FROM TABLE WHERE month=201412
SELECT * FROM TABLE WHERE month=201501
.

computing the aggregates on the results of each query and combining them in
the end.

I think there should be a better way right?

Thanks


Re: Sorted Multiple Outputs

2015-08-14 Thread Yiannis Gkoufas
Hi Eugene,

in my case the list of values that I want to sort and write to a separate
file, its fairly small so the way I solved it is the following:

.groupByKey().foreach(e = {
  val hadoopConfig = new Configuration()
  val hdfs = FileSystem.get(hadoopConfig);
  val newPath = rootPath+/+e._1;
  val dstream = hdfs.create(new Path(newPath));
  val bstream = new BufferedOutputStream(dstream, 100 * 1024)
  val writer = new PrintWriter(bstream)
  e._2.toList.sortBy(_._1).foreach(sub = {
writer.println(Utils.getDateStr(sub._1)+,+sub._2+,+sub._3);
  })
  writer.flush()
  writer.close();
})


Not sure what I changed to the way I write to HDFS, but this approach worked.


Thanks a lot!


On 13 August 2015 at 01:06, Eugene Morozov fathers...@list.ru wrote:

 Yiannis,

 sorry for late response,
 It is indeed not possible to create new RDD inside of foreachPartitions,
 so you have to write data manually. I haven’t tried that and haven’t got
 such an exception, but I’d assume you might try to write locally and them
 upload it into HDFS. FileSystem has a specific method for that
 “copyFromLocalFile”.

 Another approach would be to try to split RDD into multiple RDDs by key.
 You can get distinct keys, collect them on driver and have a loop over they
 keys and filter out new RDD out of the original one by that key.

 for( key : keys ) {
 RDD.filter( key ).saveAsTextfile()
 }

 It might help to cache original rdd.

 On 16 Jul 2015, at 12:21, Yiannis Gkoufas johngou...@gmail.com wrote:

 Hi Eugene,

 thanks for your response!
 Your recommendation makes sense, that's what I more or less tried.
 The problem that I am facing is that inside foreachPartition() I cannot
 create a new rdd and use saveAsTextFile.
 It would probably make sense to write directly to HDFS using the Java API.
 When I tried that I was getting errors similar to this:

 Failed on local exception: java.io.InterruptedIOException: Interruped
 while waiting for IO on channel java.nio.channels.SocketChannel

 Probably it's hitting a race condition.

 Has anyone else faced this situation? Any suggestions?

 Thanks a lot!

 On 15 July 2015 at 14:04, Eugene Morozov fathers...@list.ru wrote:

 Yiannis ,

 It looks like you might explore other approach.

 sc.textFile(input/path)
 .map() // your own implementation
 .partitionBy(new HashPartitioner(num))
 .groupBy() //your own implementation, as a result - PairRDD of key vs
 Iterable of values
 .foreachPartition()

 On the last step you could sort all values for the key and store them
 into separate file even into the same directory of all other files for
 other keys.
 HashParititoner must guarantee that all values for specific key will
 reside in just one partition, but it might happen that one partition might
 contain more, than one key (with values). This I’m not sure, but that
 shouldn’t be a big deal as you would iterate over tuplekey,
 Iterablevalue and store one key to a specific file.

 On 15 Jul 2015, at 03:23, Yiannis Gkoufas johngou...@gmail.com wrote:

 Hi there,

 I have been using the approach described here:


 http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job

 In addition to that, I was wondering if there is a way to set the
 customize the order of those values contained in each file.

 Thanks a lot!


 Eugene Morozov
 fathers...@list.ru






 Eugene Morozov
 fathers...@list.ru







Re: Sorted Multiple Outputs

2015-07-16 Thread Yiannis Gkoufas
Hi Eugene,

thanks for your response!
Your recommendation makes sense, that's what I more or less tried.
The problem that I am facing is that inside foreachPartition() I cannot
create a new rdd and use saveAsTextFile.
It would probably make sense to write directly to HDFS using the Java API.
When I tried that I was getting errors similar to this:

Failed on local exception: java.io.InterruptedIOException: Interruped while
waiting for IO on channel java.nio.channels.SocketChannel

Probably it's hitting a race condition.

Has anyone else faced this situation? Any suggestions?

Thanks a lot!

On 15 July 2015 at 14:04, Eugene Morozov fathers...@list.ru wrote:

 Yiannis ,

 It looks like you might explore other approach.

 sc.textFile(input/path)
 .map() // your own implementation
 .partitionBy(new HashPartitioner(num))
 .groupBy() //your own implementation, as a result - PairRDD of key vs
 Iterable of values
 .foreachPartition()

 On the last step you could sort all values for the key and store them into
 separate file even into the same directory of all other files for other
 keys.
 HashParititoner must guarantee that all values for specific key will
 reside in just one partition, but it might happen that one partition might
 contain more, than one key (with values). This I’m not sure, but that
 shouldn’t be a big deal as you would iterate over tuplekey,
 Iterablevalue and store one key to a specific file.

 On 15 Jul 2015, at 03:23, Yiannis Gkoufas johngou...@gmail.com wrote:

 Hi there,

 I have been using the approach described here:


 http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job

 In addition to that, I was wondering if there is a way to set the
 customize the order of those values contained in each file.

 Thanks a lot!


 Eugene Morozov
 fathers...@list.ru







Sorted Multiple Outputs

2015-07-14 Thread Yiannis Gkoufas
Hi there,

I have been using the approach described here:

http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job

In addition to that, I was wondering if there is a way to set the customize
the order of those values contained in each file.

Thanks a lot!


Re: How to share large resources like dictionaries while processing data with Spark ?

2015-06-04 Thread Yiannis Gkoufas
Hi there,

I would recommend checking out
https://github.com/spark-jobserver/spark-jobserver which I think gives the
functionality you are looking for.
I haven't tested it though.

BR

On 5 June 2015 at 01:35, Olivier Girardot ssab...@gmail.com wrote:

 You can use it as a broadcast variable, but if it's too large (more than
 1Gb I guess), you may need to share it joining this using some kind of key
 to the other RDDs.
 But this is the kind of thing broadcast variables were designed for.

 Regards,

 Olivier.

 Le jeu. 4 juin 2015 à 23:50, dgoldenberg dgoldenberg...@gmail.com a
 écrit :

 We have some pipelines defined where sometimes we need to load potentially
 large resources such as dictionaries.

 What would be the best strategy for sharing such resources among the
 transformations/actions within a consumer?  Can they be shared somehow
 across the RDD's?

 I'm looking for a way to load such a resource once into the cluster memory
 and have it be available throughout the lifecycle of a consumer...

 Thanks.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-large-resources-like-dictionaries-while-processing-data-with-Spark-tp23162.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-23 Thread Yiannis Gkoufas
Hi Yin,

Yes, I have set spark.executor.memory to 8g and the worker memory to 16g
without any success.
I cannot figure out how to increase the number of mapPartitions tasks.

Thanks a lot

On 20 March 2015 at 18:44, Yin Huai yh...@databricks.com wrote:

 spark.sql.shuffle.partitions only control the number of tasks in the
 second stage (the number of reducers). For your case, I'd say that the
 number of tasks in the first state (number of mappers) will be the number
 of files you have.

 Actually, have you changed spark.executor.memory (it controls the
 memory for an executor of your application)? I did not see it in your
 original email. The difference between worker memory and executor memory
 can be found at (http://spark.apache.org/docs/1.3.0/spark-standalone.html
 ),

 SPARK_WORKER_MEMORY
 Total amount of memory to allow Spark applications to use on the machine,
 e.g. 1000m, 2g (default: total memory minus 1 GB); note that each
 application's individual memory is configured using its
 spark.executor.memory property.


 On Fri, Mar 20, 2015 at 9:25 AM, Yiannis Gkoufas johngou...@gmail.com
 wrote:

 Actually I realized that the correct way is:

 sqlContext.sql(set spark.sql.shuffle.partitions=1000)

 but I am still experiencing the same behavior/error.

 On 20 March 2015 at 16:04, Yiannis Gkoufas johngou...@gmail.com wrote:

 Hi Yin,

 the way I set the configuration is:

 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 sqlContext.setConf(spark.sql.shuffle.partitions,1000);

 it is the correct way right?
 In the mapPartitions task (the first task which is launched), I get
 again the same number of tasks and again the same error. :(

 Thanks a lot!

 On 19 March 2015 at 17:40, Yiannis Gkoufas johngou...@gmail.com wrote:

 Hi Yin,

 thanks a lot for that! Will give it a shot and let you know.

 On 19 March 2015 at 16:30, Yin Huai yh...@databricks.com wrote:

 Was the OOM thrown during the execution of first stage (map) or the
 second stage (reduce)? If it was the second stage, can you increase the
 value of spark.sql.shuffle.partitions and see if the OOM disappears?

 This setting controls the number of reduces Spark SQL will use and the
 default is 200. Maybe there are too many distinct values and the memory
 pressure on every task (of those 200 reducers) is pretty high. You can
 start with 400 and increase it until the OOM disappears. Hopefully this
 will help.

 Thanks,

 Yin


 On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas johngou...@gmail.com
  wrote:

 Hi Yin,

 Thanks for your feedback. I have 1700 parquet files, sized 100MB
 each. The number of tasks launched is equal to the number of parquet 
 files.
 Do you have any idea on how to deal with this situation?

 Thanks a lot
 On 18 Mar 2015 17:35, Yin Huai yh...@databricks.com wrote:

 Seems there are too many distinct groups processed in a task, which
 trigger the problem.

 How many files do your dataset have and how large is a file? Seems
 your query will be executed with two stages, table scan and map-side
 aggregation in the first stage and the final round of reduce-side
 aggregation in the second stage. Can you take a look at the numbers of
 tasks launched in these two stages?

 Thanks,

 Yin

 On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas 
 johngou...@gmail.com wrote:

 Hi there, I set the executor memory to 8g but it didn't help

 On 18 March 2015 at 13:59, Cheng Lian lian.cs@gmail.com
 wrote:

 You should probably increase executor memory by setting
 spark.executor.memory.

 Full list of available configurations can be found here
 http://spark.apache.org/docs/latest/configuration.html

 Cheng


 On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:

 Hi there,

 I was trying the new DataFrame API with some basic operations on
 a parquet dataset.
 I have 7 nodes of 12 cores and 8GB RAM allocated to each worker
 in a standalone cluster mode.
 The code is the following:

 val people = sqlContext.parquetFile(/data.parquet);
 val res = people.groupBy(name,date).
 agg(sum(power),sum(supply)).take(10);
 System.out.println(res);

 The dataset consists of 16 billion entries.
 The error I get is java.lang.OutOfMemoryError: GC overhead limit
 exceeded

 My configuration is:

 spark.serializer org.apache.spark.serializer.KryoSerializer
 spark.driver.memory6g
 spark.executor.extraJavaOptions -XX:+UseCompressedOops
 spark.shuffle.managersort

 Any idea how can I workaround this?

 Thanks a lot












How to handle under-performing nodes in the cluster

2015-03-20 Thread Yiannis Gkoufas
Hi all,

I have 6 nodes in the cluster and one of the nodes is clearly
under-performing:


​
I was wandering what is the impact of having such issues? Also what is the
recommended way to workaround it?

Thanks a lot,
Yiannis


Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-20 Thread Yiannis Gkoufas
Hi Yin,

the way I set the configuration is:

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext.setConf(spark.sql.shuffle.partitions,1000);

it is the correct way right?
In the mapPartitions task (the first task which is launched), I get again
the same number of tasks and again the same error. :(

Thanks a lot!

On 19 March 2015 at 17:40, Yiannis Gkoufas johngou...@gmail.com wrote:

 Hi Yin,

 thanks a lot for that! Will give it a shot and let you know.

 On 19 March 2015 at 16:30, Yin Huai yh...@databricks.com wrote:

 Was the OOM thrown during the execution of first stage (map) or the
 second stage (reduce)? If it was the second stage, can you increase the
 value of spark.sql.shuffle.partitions and see if the OOM disappears?

 This setting controls the number of reduces Spark SQL will use and the
 default is 200. Maybe there are too many distinct values and the memory
 pressure on every task (of those 200 reducers) is pretty high. You can
 start with 400 and increase it until the OOM disappears. Hopefully this
 will help.

 Thanks,

 Yin


 On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas johngou...@gmail.com
 wrote:

 Hi Yin,

 Thanks for your feedback. I have 1700 parquet files, sized 100MB each.
 The number of tasks launched is equal to the number of parquet files. Do
 you have any idea on how to deal with this situation?

 Thanks a lot
 On 18 Mar 2015 17:35, Yin Huai yh...@databricks.com wrote:

 Seems there are too many distinct groups processed in a task, which
 trigger the problem.

 How many files do your dataset have and how large is a file? Seems your
 query will be executed with two stages, table scan and map-side aggregation
 in the first stage and the final round of reduce-side aggregation in the
 second stage. Can you take a look at the numbers of tasks launched in these
 two stages?

 Thanks,

 Yin

 On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas johngou...@gmail.com
  wrote:

 Hi there, I set the executor memory to 8g but it didn't help

 On 18 March 2015 at 13:59, Cheng Lian lian.cs@gmail.com wrote:

 You should probably increase executor memory by setting
 spark.executor.memory.

 Full list of available configurations can be found here
 http://spark.apache.org/docs/latest/configuration.html

 Cheng


 On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:

 Hi there,

 I was trying the new DataFrame API with some basic operations on a
 parquet dataset.
 I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a
 standalone cluster mode.
 The code is the following:

 val people = sqlContext.parquetFile(/data.parquet);
 val res = people.groupBy(name,date).
 agg(sum(power),sum(supply)).take(10);
 System.out.println(res);

 The dataset consists of 16 billion entries.
 The error I get is java.lang.OutOfMemoryError: GC overhead limit
 exceeded

 My configuration is:

 spark.serializer org.apache.spark.serializer.KryoSerializer
 spark.driver.memory6g
 spark.executor.extraJavaOptions -XX:+UseCompressedOops
 spark.shuffle.managersort

 Any idea how can I workaround this?

 Thanks a lot









Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-20 Thread Yiannis Gkoufas
Actually I realized that the correct way is:

sqlContext.sql(set spark.sql.shuffle.partitions=1000)

but I am still experiencing the same behavior/error.

On 20 March 2015 at 16:04, Yiannis Gkoufas johngou...@gmail.com wrote:

 Hi Yin,

 the way I set the configuration is:

 val sqlContext = new org.apache.spark.sql.SQLContext(sc)
 sqlContext.setConf(spark.sql.shuffle.partitions,1000);

 it is the correct way right?
 In the mapPartitions task (the first task which is launched), I get again
 the same number of tasks and again the same error. :(

 Thanks a lot!

 On 19 March 2015 at 17:40, Yiannis Gkoufas johngou...@gmail.com wrote:

 Hi Yin,

 thanks a lot for that! Will give it a shot and let you know.

 On 19 March 2015 at 16:30, Yin Huai yh...@databricks.com wrote:

 Was the OOM thrown during the execution of first stage (map) or the
 second stage (reduce)? If it was the second stage, can you increase the
 value of spark.sql.shuffle.partitions and see if the OOM disappears?

 This setting controls the number of reduces Spark SQL will use and the
 default is 200. Maybe there are too many distinct values and the memory
 pressure on every task (of those 200 reducers) is pretty high. You can
 start with 400 and increase it until the OOM disappears. Hopefully this
 will help.

 Thanks,

 Yin


 On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas johngou...@gmail.com
 wrote:

 Hi Yin,

 Thanks for your feedback. I have 1700 parquet files, sized 100MB each.
 The number of tasks launched is equal to the number of parquet files. Do
 you have any idea on how to deal with this situation?

 Thanks a lot
 On 18 Mar 2015 17:35, Yin Huai yh...@databricks.com wrote:

 Seems there are too many distinct groups processed in a task, which
 trigger the problem.

 How many files do your dataset have and how large is a file? Seems
 your query will be executed with two stages, table scan and map-side
 aggregation in the first stage and the final round of reduce-side
 aggregation in the second stage. Can you take a look at the numbers of
 tasks launched in these two stages?

 Thanks,

 Yin

 On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas 
 johngou...@gmail.com wrote:

 Hi there, I set the executor memory to 8g but it didn't help

 On 18 March 2015 at 13:59, Cheng Lian lian.cs@gmail.com wrote:

 You should probably increase executor memory by setting
 spark.executor.memory.

 Full list of available configurations can be found here
 http://spark.apache.org/docs/latest/configuration.html

 Cheng


 On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:

 Hi there,

 I was trying the new DataFrame API with some basic operations on a
 parquet dataset.
 I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in
 a standalone cluster mode.
 The code is the following:

 val people = sqlContext.parquetFile(/data.parquet);
 val res = people.groupBy(name,date).
 agg(sum(power),sum(supply)).take(10);
 System.out.println(res);

 The dataset consists of 16 billion entries.
 The error I get is java.lang.OutOfMemoryError: GC overhead limit
 exceeded

 My configuration is:

 spark.serializer org.apache.spark.serializer.KryoSerializer
 spark.driver.memory6g
 spark.executor.extraJavaOptions -XX:+UseCompressedOops
 spark.shuffle.managersort

 Any idea how can I workaround this?

 Thanks a lot










Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-19 Thread Yiannis Gkoufas
Hi Yin,

thanks a lot for that! Will give it a shot and let you know.

On 19 March 2015 at 16:30, Yin Huai yh...@databricks.com wrote:

 Was the OOM thrown during the execution of first stage (map) or the second
 stage (reduce)? If it was the second stage, can you increase the value
 of spark.sql.shuffle.partitions and see if the OOM disappears?

 This setting controls the number of reduces Spark SQL will use and the
 default is 200. Maybe there are too many distinct values and the memory
 pressure on every task (of those 200 reducers) is pretty high. You can
 start with 400 and increase it until the OOM disappears. Hopefully this
 will help.

 Thanks,

 Yin


 On Wed, Mar 18, 2015 at 4:46 PM, Yiannis Gkoufas johngou...@gmail.com
 wrote:

 Hi Yin,

 Thanks for your feedback. I have 1700 parquet files, sized 100MB each.
 The number of tasks launched is equal to the number of parquet files. Do
 you have any idea on how to deal with this situation?

 Thanks a lot
 On 18 Mar 2015 17:35, Yin Huai yh...@databricks.com wrote:

 Seems there are too many distinct groups processed in a task, which
 trigger the problem.

 How many files do your dataset have and how large is a file? Seems your
 query will be executed with two stages, table scan and map-side aggregation
 in the first stage and the final round of reduce-side aggregation in the
 second stage. Can you take a look at the numbers of tasks launched in these
 two stages?

 Thanks,

 Yin

 On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas johngou...@gmail.com
 wrote:

 Hi there, I set the executor memory to 8g but it didn't help

 On 18 March 2015 at 13:59, Cheng Lian lian.cs@gmail.com wrote:

 You should probably increase executor memory by setting
 spark.executor.memory.

 Full list of available configurations can be found here
 http://spark.apache.org/docs/latest/configuration.html

 Cheng


 On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:

 Hi there,

 I was trying the new DataFrame API with some basic operations on a
 parquet dataset.
 I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a
 standalone cluster mode.
 The code is the following:

 val people = sqlContext.parquetFile(/data.parquet);
 val res = people.groupBy(name,date).
 agg(sum(power),sum(supply)).take(10);
 System.out.println(res);

 The dataset consists of 16 billion entries.
 The error I get is java.lang.OutOfMemoryError: GC overhead limit
 exceeded

 My configuration is:

 spark.serializer org.apache.spark.serializer.KryoSerializer
 spark.driver.memory6g
 spark.executor.extraJavaOptions -XX:+UseCompressedOops
 spark.shuffle.managersort

 Any idea how can I workaround this?

 Thanks a lot








DataFrame operation on parquet: GC overhead limit exceeded

2015-03-18 Thread Yiannis Gkoufas
Hi there,

I was trying the new DataFrame API with some basic operations on a parquet
dataset.
I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a
standalone cluster mode.
The code is the following:

val people = sqlContext.parquetFile(/data.parquet);
val res =
people.groupBy(name,date).agg(sum(power),sum(supply)).take(10);
System.out.println(res);

The dataset consists of 16 billion entries.
The error I get is java.lang.OutOfMemoryError: GC overhead limit exceeded

My configuration is:

spark.serializerorg.apache.spark.serializer.KryoSerializer
spark.driver.memory6g
spark.executor.extraJavaOptions -XX:+UseCompressedOops
spark.shuffle.managersort

Any idea how can I workaround this?

Thanks a lot


Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-18 Thread Yiannis Gkoufas
Hi there, I set the executor memory to 8g but it didn't help

On 18 March 2015 at 13:59, Cheng Lian lian.cs@gmail.com wrote:

 You should probably increase executor memory by setting
 spark.executor.memory.

 Full list of available configurations can be found here
 http://spark.apache.org/docs/latest/configuration.html

 Cheng


 On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:

 Hi there,

 I was trying the new DataFrame API with some basic operations on a
 parquet dataset.
 I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a
 standalone cluster mode.
 The code is the following:

 val people = sqlContext.parquetFile(/data.parquet);
 val res = people.groupBy(name,date).agg(sum(power),sum(supply)
 ).take(10);
 System.out.println(res);

 The dataset consists of 16 billion entries.
 The error I get is java.lang.OutOfMemoryError: GC overhead limit exceeded

 My configuration is:

 spark.serializer org.apache.spark.serializer.KryoSerializer
 spark.driver.memory6g
 spark.executor.extraJavaOptions -XX:+UseCompressedOops
 spark.shuffle.managersort

 Any idea how can I workaround this?

 Thanks a lot





Re: DataFrame operation on parquet: GC overhead limit exceeded

2015-03-18 Thread Yiannis Gkoufas
Hi Yin,

Thanks for your feedback. I have 1700 parquet files, sized 100MB each. The
number of tasks launched is equal to the number of parquet files. Do you
have any idea on how to deal with this situation?

Thanks a lot
On 18 Mar 2015 17:35, Yin Huai yh...@databricks.com wrote:

 Seems there are too many distinct groups processed in a task, which
 trigger the problem.

 How many files do your dataset have and how large is a file? Seems your
 query will be executed with two stages, table scan and map-side aggregation
 in the first stage and the final round of reduce-side aggregation in the
 second stage. Can you take a look at the numbers of tasks launched in these
 two stages?

 Thanks,

 Yin

 On Wed, Mar 18, 2015 at 11:42 AM, Yiannis Gkoufas johngou...@gmail.com
 wrote:

 Hi there, I set the executor memory to 8g but it didn't help

 On 18 March 2015 at 13:59, Cheng Lian lian.cs@gmail.com wrote:

 You should probably increase executor memory by setting
 spark.executor.memory.

 Full list of available configurations can be found here
 http://spark.apache.org/docs/latest/configuration.html

 Cheng


 On 3/18/15 9:15 PM, Yiannis Gkoufas wrote:

 Hi there,

 I was trying the new DataFrame API with some basic operations on a
 parquet dataset.
 I have 7 nodes of 12 cores and 8GB RAM allocated to each worker in a
 standalone cluster mode.
 The code is the following:

 val people = sqlContext.parquetFile(/data.parquet);
 val res = people.groupBy(name,date).agg(sum(power),sum(supply)
 ).take(10);
 System.out.println(res);

 The dataset consists of 16 billion entries.
 The error I get is java.lang.OutOfMemoryError: GC overhead limit
 exceeded

 My configuration is:

 spark.serializer org.apache.spark.serializer.KryoSerializer
 spark.driver.memory6g
 spark.executor.extraJavaOptions -XX:+UseCompressedOops
 spark.shuffle.managersort

 Any idea how can I workaround this?

 Thanks a lot







Problems running version 1.3.0-rc1

2015-03-02 Thread Yiannis Gkoufas
Hi all,

I have downloaded version 1.3.0-rc1 from
https://github.com/apache/spark/archive/v1.3.0-rc1.zip, extracted it and
built it using:
mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.5.0 -DskipTests clean package

It doesn't complain for any issues, but when I call sbin/start-all.sh I get
on logs:

15/03/02 21:28:24 ERROR ActorSystemImpl: Uncaught fatal error from thread
[sparkWorker-akka.remote.default-remote-dispatcher-6] shutting down
ActorSystem [sparkWorker]
java.lang.NoClassDefFoundError: L akka/event/LogSou
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2663)
at java.lang.Class.getConstructor0(Class.java:3067)
at java.lang.Class.getDeclaredConstructor(Class.java:2170)
at
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:76)
at scala.util.Try$.apply(Try.scala:161)
at
akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73)
at
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at
akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84)
at scala.util.Success.flatMap(Try.scala:200)
at
akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84)
at akka.remote.EndpointManager$$anonfun$9.apply(Remoting.scala:692)
at akka.remote.EndpointManager$$anonfun$9.apply(Remoting.scala:684)
at
scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at
scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721)
at
akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:684)
at
akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:492)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at akka.remote.EndpointManager.aroundReceive(Remoting.scala:395)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassNotFoundException: L akka.event.LogSou
at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 32 more

I tried to search online but couldn't find anything similar.
Any ideas what could the error be?
I tried compiling with java 7 and java 8 but with the same result.

Thanks a lot!


Re: Brodcast Variable updated from one transformation and used from another

2015-02-25 Thread Yiannis Gkoufas
What I think is happening that the map operations are executed concurrently
and the map operation in rdd2 has the initial copy of myObjectBroadcated.
Is there a way to apply the transformations sequentially? First materialize
rdd1 and then rdd2.

Thanks a lot!

On 24 February 2015 at 18:49, Yiannis Gkoufas johngou...@gmail.com wrote:

 Sorry for the mistake, I actually have it this way:

 val myObject = new MyObject();
 val myObjectBroadcasted = sc.broadcast(myObject);

 val rdd1 = sc.textFile(/file1).map(e =
 {
  myObjectBroadcasted.value.insert(e._1);
  (e._1,1)
 });
 rdd.cache.count(); //to make sure it is transformed.

 val rdd2 = sc.textFile(/file2).map(e =
 {
  val lookedUp = myObjectBroadcasted.value.lookup(e._1);
  (e._1, lookedUp)
 });

 On 24 February 2015 at 17:36, Ganelin, Ilya ilya.gane...@capitalone.com
 wrote:

  You're not using the broadcasted variable within your map operations.
 You're attempting to modify myObjrct directly which won't work because you
 are modifying the serialized copy on the executor. You want to do
 myObjectBroadcasted.value.insert and myObjectBroadcasted.value.lookup.



 Sent with Good (www.good.com)



 -Original Message-
 *From: *Yiannis Gkoufas [johngou...@gmail.com]
 *Sent: *Tuesday, February 24, 2015 12:12 PM Eastern Standard Time
 *To: *user@spark.apache.org
 *Subject: *Brodcast Variable updated from one transformation and used
 from another

 Hi all,

 I am trying to do the following.

 val myObject = new MyObject();
 val myObjectBroadcasted = sc.broadcast(myObject);

 val rdd1 = sc.textFile(/file1).map(e =
 {
  myObject.insert(e._1);
  (e._1,1)
 });
 rdd.cache.count(); //to make sure it is transformed.

 val rdd2 = sc.textFile(/file2).map(e =
 {
  val lookedUp = myObject.lookup(e._1);
  (e._1, lookedUp)
 });

 When I check the contents of myObject within the map of rdd1 everything
 seems ok.
 On the other hand when I check the contents of myObject within the map of
 rdd2 it seems to be empty.
 I am doing something wrong?

 Thanks a lot!

 --

 The information contained in this e-mail is confidential and/or
 proprietary to Capital One and/or its affiliates. The information
 transmitted herewith is intended only for use by the individual or entity
 to which it is addressed.  If the reader of this message is not the
 intended recipient, you are hereby notified that any review,
 retransmission, dissemination, distribution, copying or other use of, or
 taking of any action in reliance upon this information is strictly
 prohibited. If you have received this communication in error, please
 contact the sender and delete the material from your computer.





Re: Brodcast Variable updated from one transformation and used from another

2015-02-24 Thread Yiannis Gkoufas
Sorry for the mistake, I actually have it this way:

val myObject = new MyObject();
val myObjectBroadcasted = sc.broadcast(myObject);

val rdd1 = sc.textFile(/file1).map(e =
{
 myObjectBroadcasted.value.insert(e._1);
 (e._1,1)
});
rdd.cache.count(); //to make sure it is transformed.

val rdd2 = sc.textFile(/file2).map(e =
{
 val lookedUp = myObjectBroadcasted.value.lookup(e._1);
 (e._1, lookedUp)
});

On 24 February 2015 at 17:36, Ganelin, Ilya ilya.gane...@capitalone.com
wrote:

  You're not using the broadcasted variable within your map operations.
 You're attempting to modify myObjrct directly which won't work because you
 are modifying the serialized copy on the executor. You want to do
 myObjectBroadcasted.value.insert and myObjectBroadcasted.value.lookup.



 Sent with Good (www.good.com)



 -Original Message-
 *From: *Yiannis Gkoufas [johngou...@gmail.com]
 *Sent: *Tuesday, February 24, 2015 12:12 PM Eastern Standard Time
 *To: *user@spark.apache.org
 *Subject: *Brodcast Variable updated from one transformation and used
 from another

 Hi all,

 I am trying to do the following.

 val myObject = new MyObject();
 val myObjectBroadcasted = sc.broadcast(myObject);

 val rdd1 = sc.textFile(/file1).map(e =
 {
  myObject.insert(e._1);
  (e._1,1)
 });
 rdd.cache.count(); //to make sure it is transformed.

 val rdd2 = sc.textFile(/file2).map(e =
 {
  val lookedUp = myObject.lookup(e._1);
  (e._1, lookedUp)
 });

 When I check the contents of myObject within the map of rdd1 everything
 seems ok.
 On the other hand when I check the contents of myObject within the map of
 rdd2 it seems to be empty.
 I am doing something wrong?

 Thanks a lot!

 --

 The information contained in this e-mail is confidential and/or
 proprietary to Capital One and/or its affiliates. The information
 transmitted herewith is intended only for use by the individual or entity
 to which it is addressed.  If the reader of this message is not the
 intended recipient, you are hereby notified that any review,
 retransmission, dissemination, distribution, copying or other use of, or
 taking of any action in reliance upon this information is strictly
 prohibited. If you have received this communication in error, please
 contact the sender and delete the material from your computer.



Brodcast Variable updated from one transformation and used from another

2015-02-24 Thread Yiannis Gkoufas
Hi all,

I am trying to do the following.

val myObject = new MyObject();
val myObjectBroadcasted = sc.broadcast(myObject);

val rdd1 = sc.textFile(/file1).map(e =
{
 myObject.insert(e._1);
 (e._1,1)
});
rdd.cache.count(); //to make sure it is transformed.

val rdd2 = sc.textFile(/file2).map(e =
{
 val lookedUp = myObject.lookup(e._1);
 (e._1, lookedUp)
});

When I check the contents of myObject within the map of rdd1 everything
seems ok.
On the other hand when I check the contents of myObject within the map of
rdd2 it seems to be empty.
I am doing something wrong?

Thanks a lot!


Re: Running out of space (when there's no shortage)

2015-02-24 Thread Yiannis Gkoufas
Hi there,

I assume you are using spark 1.2.1 right?
I faced the exact same issue and switched to 1.1.1 with the same
configuration and it was solved.
On 24 Feb 2015 19:22, Ted Yu yuzhih...@gmail.com wrote:

 Here is a tool which may give you some clue:
 http://file-leak-detector.kohsuke.org/

 Cheers

 On Tue, Feb 24, 2015 at 11:04 AM, Vladimir Rodionov 
 vrodio...@splicemachine.com wrote:

 Usually it happens in Linux when application deletes file w/o double
 checking that there are no open FDs (resource leak). In this case, Linux
 holds all space allocated and does not release it until application exits
 (crashes in your case). You check file system and everything is normal, you
 have enough space and you have no idea why does application report no
 space left on device.

 Just a guess.

 -Vladimir Rodionov

 On Tue, Feb 24, 2015 at 8:34 AM, Joe Wass jw...@crossref.org wrote:

 I'm running a cluster of 3 Amazon EC2 machines (small number because
 it's expensive when experiments keep crashing after a day!).

 Today's crash looks like this (stacktrace at end of message).
 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
 location for shuffle 0

 On my three nodes, I have plenty of space and inodes:

 A $ df -i
 FilesystemInodes   IUsed   IFree IUse% Mounted on
 /dev/xvda1524288   97937  426351   19% /
 tmpfs1909200   1 19091991% /dev/shm
 /dev/xvdb2457600  54 24575461% /mnt
 /dev/xvdc2457600  24 24575761% /mnt2
 /dev/xvds831869296   23844 8318454521% /vol0

 A $ df -h
 FilesystemSize  Used Avail Use% Mounted on
 /dev/xvda17.9G  3.4G  4.5G  44% /
 tmpfs 7.3G 0  7.3G   0% /dev/shm
 /dev/xvdb  37G  1.2G   34G   4% /mnt
 /dev/xvdc  37G  177M   35G   1% /mnt2
 /dev/xvds1000G  802G  199G  81% /vol0

 B $ df -i
 FilesystemInodes   IUsed   IFree IUse% Mounted on
 /dev/xvda1524288   97947  426341   19% /
 tmpfs1906639   1 19066381% /dev/shm
 /dev/xvdb2457600  54 24575461% /mnt
 /dev/xvdc2457600  24 24575761% /mnt2
 /dev/xvds816200704   24223 8161764811% /vol0

 B $ df -h
 FilesystemSize  Used Avail Use% Mounted on
 /dev/xvda17.9G  3.6G  4.3G  46% /
 tmpfs 7.3G 0  7.3G   0% /dev/shm
 /dev/xvdb  37G  1.2G   34G   4% /mnt
 /dev/xvdc  37G  177M   35G   1% /mnt2
 /dev/xvds1000G  805G  195G  81% /vol0

 C $df -i
 FilesystemInodes   IUsed   IFree IUse% Mounted on
 /dev/xvda1524288   97938  426350   19% /
 tmpfs1906897   1 19068961% /dev/shm
 /dev/xvdb2457600  54 24575461% /mnt
 /dev/xvdc2457600  24 24575761% /mnt2
 /dev/xvds755218352   24024 7551943281% /vol0
 root@ip-10-204-136-223 ~]$

 C $ df -h
 FilesystemSize  Used Avail Use% Mounted on
 /dev/xvda17.9G  3.4G  4.5G  44% /
 tmpfs 7.3G 0  7.3G   0% /dev/shm
 /dev/xvdb  37G  1.2G   34G   4% /mnt
 /dev/xvdc  37G  177M   35G   1% /mnt2
 /dev/xvds1000G  820G  181G  82% /vol0

 The devices may be ~80% full but that still leaves ~200G free on each.
 My spark-env.sh has

 export SPARK_LOCAL_DIRS=/vol0/spark

 I have manually verified that on each slave the only temporary files are
 stored on /vol0, all looking something like this


 /vol0/spark/spark-f05d407c/spark-fca3e573/spark-78c06215/spark-4f0c4236/20/rdd_8_884

 So it looks like all the files are being stored on the large drives
 (incidentally they're AWS EBS volumes, but that's the only way to get
 enough storage). My process crashed before with a slightly different
 exception under the same circumstances: kryo.KryoException:
 java.io.IOException: No space left on device

 These both happen after several hours and several GB of temporary files.

 Why does Spark think it's run out of space?

 TIA

 Joe

 Stack trace 1:

 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
 location for shuffle 0
 at
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384)
 at
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:381)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
 at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
 at
 

Re: Running out of space (when there's no shortage)

2015-02-24 Thread Yiannis Gkoufas
No problem, Joe. There you go
https://issues.apache.org/jira/browse/SPARK-5081
And also there is this one https://issues.apache.org/jira/browse/SPARK-5715
which is marked as resolved

On 24 February 2015 at 21:51, Joe Wass jw...@crossref.org wrote:

 Thanks everyone.

 Yiannis, do you know if there's a bug report for this regression? For some
 other (possibly connected) reason I upgraded from 1.1.1 to 1.2.1, but I
 can't remember what the bug was.

 Joe




 On 24 February 2015 at 19:26, Yiannis Gkoufas johngou...@gmail.com
 wrote:

 Hi there,

 I assume you are using spark 1.2.1 right?
 I faced the exact same issue and switched to 1.1.1 with the same
 configuration and it was solved.
 On 24 Feb 2015 19:22, Ted Yu yuzhih...@gmail.com wrote:

 Here is a tool which may give you some clue:
 http://file-leak-detector.kohsuke.org/

 Cheers

 On Tue, Feb 24, 2015 at 11:04 AM, Vladimir Rodionov 
 vrodio...@splicemachine.com wrote:

 Usually it happens in Linux when application deletes file w/o double
 checking that there are no open FDs (resource leak). In this case, Linux
 holds all space allocated and does not release it until application
 exits (crashes in your case). You check file system and everything is
 normal, you have enough space and you have no idea why does application
 report no space left on device.

 Just a guess.

 -Vladimir Rodionov

 On Tue, Feb 24, 2015 at 8:34 AM, Joe Wass jw...@crossref.org wrote:

 I'm running a cluster of 3 Amazon EC2 machines (small number because
 it's expensive when experiments keep crashing after a day!).

 Today's crash looks like this (stacktrace at end of message).
 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
 output location for shuffle 0

 On my three nodes, I have plenty of space and inodes:

 A $ df -i
 FilesystemInodes   IUsed   IFree IUse% Mounted on
 /dev/xvda1524288   97937  426351   19% /
 tmpfs1909200   1 19091991% /dev/shm
 /dev/xvdb2457600  54 24575461% /mnt
 /dev/xvdc2457600  24 24575761% /mnt2
 /dev/xvds831869296   23844 8318454521% /vol0

 A $ df -h
 FilesystemSize  Used Avail Use% Mounted on
 /dev/xvda17.9G  3.4G  4.5G  44% /
 tmpfs 7.3G 0  7.3G   0% /dev/shm
 /dev/xvdb  37G  1.2G   34G   4% /mnt
 /dev/xvdc  37G  177M   35G   1% /mnt2
 /dev/xvds1000G  802G  199G  81% /vol0

 B $ df -i
 FilesystemInodes   IUsed   IFree IUse% Mounted on
 /dev/xvda1524288   97947  426341   19% /
 tmpfs1906639   1 19066381% /dev/shm
 /dev/xvdb2457600  54 24575461% /mnt
 /dev/xvdc2457600  24 24575761% /mnt2
 /dev/xvds816200704   24223 8161764811% /vol0

 B $ df -h
 FilesystemSize  Used Avail Use% Mounted on
 /dev/xvda17.9G  3.6G  4.3G  46% /
 tmpfs 7.3G 0  7.3G   0% /dev/shm
 /dev/xvdb  37G  1.2G   34G   4% /mnt
 /dev/xvdc  37G  177M   35G   1% /mnt2
 /dev/xvds1000G  805G  195G  81% /vol0

 C $df -i
 FilesystemInodes   IUsed   IFree IUse% Mounted on
 /dev/xvda1524288   97938  426350   19% /
 tmpfs1906897   1 19068961% /dev/shm
 /dev/xvdb2457600  54 24575461% /mnt
 /dev/xvdc2457600  24 24575761% /mnt2
 /dev/xvds755218352   24024 7551943281% /vol0
 root@ip-10-204-136-223 ~]$

 C $ df -h
 FilesystemSize  Used Avail Use% Mounted on
 /dev/xvda17.9G  3.4G  4.5G  44% /
 tmpfs 7.3G 0  7.3G   0% /dev/shm
 /dev/xvdb  37G  1.2G   34G   4% /mnt
 /dev/xvdc  37G  177M   35G   1% /mnt2
 /dev/xvds1000G  820G  181G  82% /vol0

 The devices may be ~80% full but that still leaves ~200G free on each.
 My spark-env.sh has

 export SPARK_LOCAL_DIRS=/vol0/spark

 I have manually verified that on each slave the only temporary files
 are stored on /vol0, all looking something like this


 /vol0/spark/spark-f05d407c/spark-fca3e573/spark-78c06215/spark-4f0c4236/20/rdd_8_884

 So it looks like all the files are being stored on the large drives
 (incidentally they're AWS EBS volumes, but that's the only way to get
 enough storage). My process crashed before with a slightly different
 exception under the same circumstances: kryo.KryoException:
 java.io.IOException: No space left on device

 These both happen after several hours and several GB of temporary
 files.

 Why does Spark think it's run out of space?

 TIA

 Joe

 Stack trace 1:

 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
 output location for shuffle 0
 at
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$1.apply(MapOutputTracker.scala:384)
 at
 org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark

Re: Worker and Nodes

2015-02-21 Thread Yiannis Gkoufas
Hi,

I have experienced the same behavior. You are talking about standalone
cluster mode right?

BR

On 21 February 2015 at 14:37, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 Hi,
 I have been running some jobs in my local single node stand alone cluster.
 I am varying the worker instances for the same job, and the time taken for
 the job to complete increases with increase in the number of workers. I
 repeated some experiments varying the number of nodes in a cluster too and
 the same behavior is seen.
 Can the idea of worker instances be extrapolated to the nodes in a cluster?

 Thank You



Setting the number of executors in standalone mode

2015-02-20 Thread Yiannis Gkoufas
Hi there,

I try to increase the number of executors per worker in the standalone mode
and I have failed to achieve that.
I followed a bit the instructions of this thread:
http://stackoverflow.com/questions/26645293/spark-configuration-memory-instance-cores

and did that:
spark.executor.memory 1g
SPARK_WORKER_MEMORY=8g

hoping to get 8 executors per worker but its still 1.
And the option num-executors is not available in the standalone mode.

Thanks a lot!


Re: Setting the number of executors in standalone mode

2015-02-20 Thread Yiannis Gkoufas
Hi Mohammed,

thanks a lot for the reply.
Ok, so from what I understand I cannot control the number of executors per
worker in standalone cluster mode.
Is that correct?

BR

On 20 February 2015 at 17:46, Mohammed Guller moham...@glassbeam.com
wrote:

  SPARK_WORKER_MEMORY=8g

 Will allocate 8GB memory to Spark on each worker node. Nothing to do with
 # of executors.





 Mohammed



 *From:* Yiannis Gkoufas [mailto:johngou...@gmail.com]
 *Sent:* Friday, February 20, 2015 4:55 AM
 *To:* user@spark.apache.org
 *Subject:* Setting the number of executors in standalone mode



 Hi there,



 I try to increase the number of executors per worker in the standalone
 mode and I have failed to achieve that.

 I followed a bit the instructions of this thread:
 http://stackoverflow.com/questions/26645293/spark-configuration-memory-instance-cores



 and did that:

 spark.executor.memory   1g

 SPARK_WORKER_MEMORY=8g



 hoping to get 8 executors per worker but its still 1.

 And the option num-executors is not available in the standalone mode.



 Thanks a lot!