Re: BlockNotFoundException when running spark word count on Tachyon

2015-08-26 Thread Dibyendu Bhattacharya
Sometime back I was playing with Spark and Tachyon and I also found this
issue .  The issue here is TachyonBlockManager put the blocks in
WriteType.TRY_CACHE configuration . And because of this Blocks ate evicted
from Tachyon Cache when Memory is full and when Spark try to find the block
it throws  BlockNotFoundException .

To solve this I tried Hierarchical Storage on Tachyon ( http://tachyon
-project.org/Hierarchy-Storage-on-Tachyon.html ) , and that seems to have
worked and I did not see any any Spark Job failed due to
BlockNotFoundException.
below is my  Hierarchical Storage settings which I used..

  -Dtachyon.worker.hierarchystore.level.max=2
  -Dtachyon.worker.hierarchystore.level0.alias=MEM
  -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER

-Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE
  -Dtachyon.worker.hierarchystore.level1.alias=HDD
  -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon
  -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB
  -Dtachyon.worker.allocate.strategy=MAX_FREE
  -Dtachyon.worker.evict.strategy=LRU

Regards,
Dibyendu

On Wed, Aug 26, 2015 at 12:25 PM, Todd  wrote:

>
> I am using tachyon in the spark program below,but I encounter a
> BlockNotFoundxception.
> Does someone know what's wrong and also is there guide on how to configure
> spark to work with Tackyon?Thanks!
>
> conf.set("spark.externalBlockStore.url", "tachyon://10.18.19.33:19998
> ")
> conf.set("spark.externalBlockStore.baseDir","/spark")
> val sc = new SparkContext(conf)
> import org.apache.spark.storage.StorageLevel
> val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
> rdd.persist(StorageLevel.OFF_HEAP)
> val count = rdd.count()
>val sum = rdd.reduce(_ + _)
> println(s"The count: $count, The sum is: $sum")
>
>
> 15/08/26 14:52:03 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks
> have all completed, from pool
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 5
> in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage
> 0.0 (TID 5, localhost): java.lang.RuntimeException:
> org.apache.spark.storage.BlockNotFoundException: Block rdd_0_5 not found
> at
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:308)
> at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
> at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> at
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
> at
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
> at
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
> at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
> at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipelin

Re: BlockNotFoundException when running spark word count on Tachyon

2015-08-26 Thread Dibyendu Bhattacharya
The URL seems to have changed .. here is the one ..
http://tachyon-project.org/documentation/Tiered-Storage-on-Tachyon.html



On Wed, Aug 26, 2015 at 12:32 PM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> Sometime back I was playing with Spark and Tachyon and I also found this
> issue .  The issue here is TachyonBlockManager put the blocks in
> WriteType.TRY_CACHE configuration . And because of this Blocks ate evicted
> from Tachyon Cache when Memory is full and when Spark try to find the
> block it throws  BlockNotFoundException .
>
> To solve this I tried Hierarchical Storage on Tachyon ( http://tachyon
> -project.org/Hierarchy-Storage-on-Tachyon.html ) , and that seems to have
> worked and I did not see any any Spark Job failed due to 
> BlockNotFoundException.
> below is my  Hierarchical Storage settings which I used..
>
>   -Dtachyon.worker.hierarchystore.level.max=2
>   -Dtachyon.worker.hierarchystore.level0.alias=MEM
>   -Dtachyon.worker.hierarchystore.level0.dirs.path=$TACHYON_RAM_FOLDER
>
> -Dtachyon.worker.hierarchystore.level0.dirs.quota=$TACHYON_WORKER_MEMORY_SIZE
>   -Dtachyon.worker.hierarchystore.level1.alias=HDD
>   -Dtachyon.worker.hierarchystore.level1.dirs.path=/mnt/tachyon
>   -Dtachyon.worker.hierarchystore.level1.dirs.quota=50GB
>   -Dtachyon.worker.allocate.strategy=MAX_FREE
>   -Dtachyon.worker.evict.strategy=LRU
>
> Regards,
> Dibyendu
>
> On Wed, Aug 26, 2015 at 12:25 PM, Todd  wrote:
>
>>
>> I am using tachyon in the spark program below,but I encounter a
>> BlockNotFoundxception.
>> Does someone know what's wrong and also is there guide on how to
>> configure spark to work with Tackyon?Thanks!
>>
>> conf.set("spark.externalBlockStore.url", "tachyon://10.18.19.33:19998
>> ")
>> conf.set("spark.externalBlockStore.baseDir","/spark")
>> val sc = new SparkContext(conf)
>> import org.apache.spark.storage.StorageLevel
>> val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6))
>> rdd.persist(StorageLevel.OFF_HEAP)
>> val count = rdd.count()
>>val sum = rdd.reduce(_ + _)
>> println(s"The count: $count, The sum is: $sum")
>>
>>
>> 15/08/26 14:52:03 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose
>> tasks have all completed, from pool
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 5
>> in stage 0.0 failed 1 times, most recent failure: Lost task 5.0 in stage
>> 0.0 (TID 5, localhost): java.lang.RuntimeException:
>> org.apache.spark.storage.BlockNotFoundException: Block rdd_0_5 not found
>> at
>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:308)
>> at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>> at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>> at
>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>> at
>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
>> at
>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
>> at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
>> at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>> at
>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>> at
>> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>> at
>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>> at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>> at
>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(Byt

RE: SparkR: exported functions

2015-08-26 Thread Felix Cheung
I believe that is done explicitly while the final API is being figured out.
For the moment you could use DataFrame read.df()
 
> From: csgilles...@gmail.com
> Date: Tue, 25 Aug 2015 18:26:50 +0100
> Subject: SparkR: exported functions
> To: user@spark.apache.org
> 
> Hi,
> 
> I've just started playing about with SparkR (Spark 1.4.1), and noticed
> that a number of the functions haven't been exported. For example,
> the textFile function
> 
> https://github.com/apache/spark/blob/master/R/pkg/R/context.R
> 
> isn't exported, i.e. the function isn't in the NAMESPACE file. This is 
> obviously
> due to the ' missing in the roxygen2 directives.
> 
> Is this intentional?
> 
> Thanks
> 
> Colin
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
  

Re: MLlib Prefixspan implementation

2015-08-26 Thread alexis GILLAIN
A first use case of gap constraint is included in the article.
Another application would be customer-shopping sequence analysis where you
want to put a constraint on the duration between two purchases for them to
be considered as a pertinent sequence.

Additional question regarding the code : what's the point of using
ReversedPrefix
in localprefispan ? The prefix is used neither in finding frequent items of
a projected database or computing a new projected database so it looks like
it's appended in inverse order just to be reversed when transformed to a
sequence.

2015-08-25 12:15 GMT+08:00 Feynman Liang :

> CCing the mailing list again.
>
> It's currently not on the radar. Do you have a use case for it? I can
> bring it up during 1.6 roadmap planning tomorrow.
>
> On Mon, Aug 24, 2015 at 8:28 PM, alexis GILLAIN 
> wrote:
>
>> Hi,
>>
>> I just realized the article I mentioned is cited in the jira and not in
>> the code so I guess you didn't use this result.
>>
>> Do you plan to implement sequence with timestamp and gap constraint as in
>> :
>>
>> https://people.mpi-inf.mpg.de/~rgemulla/publications/miliaraki13mg-fsm.pdf
>>
>> 2015-08-25 7:06 GMT+08:00 Feynman Liang :
>>
>>> Hi Alexis,
>>>
>>> Unfortunately, both of the papers you referenced appear to be
>>> translations and are quite difficult to understand. We followed
>>> http://doi.org/10.1109/ICDE.2001.914830 when implementing PrefixSpan.
>>> Perhaps you can find the relevant lines in there so I can elaborate further?
>>>
>>> Feynman
>>>
>>> On Thu, Aug 20, 2015 at 9:07 AM, alexis GILLAIN 
>>> wrote:
>>>
 I want to use prefixspan so I had a look at the code and the cited
 paper : "Distributed PrefixSpan Algorithm Based on MapReduce".

 There is a result in the paper I didn't really undertstand and I
 could'nt find where it is used in the code.

 Suppose a sequence database S = {­1­,2...­n}, a sequence  is a
 length-(L-1) (2≤L≤n) sequential pattern, in projected databases which is a
 prefix of a length-(L-1) sequential pattern , when the support count
 of  is not less than min_support, it is equal to obtaining a length-L
 sequential pattern < a ... a > from projected databases that obtaining a
 length-L sequential pattern < a ... a > from a sequence database S.

 According to the paper It's supposed to add a pruning step in the
 reduce function but I couldn't find where.

 This result seems to come from a previous paper : "Wang Linlin, Fan
 Jun. Improved Algorithm for Sequential Pattern Mining Based on PrefixSpan
 [J]. Computer Engineering, 2009, 35(23): 56-61" but it didn't help me to
 understand it and how it can improve the algorithm.

>>>
>>>
>>
>


Re: MLlib Prefixspan implementation

2015-08-26 Thread Feynman Liang
ReversedPrefix is used because scala's List uses a linked list, which has
constant time append to head but linear time append to tail.

I'm aware that there are use cases for the gap constraints. My question was
more about whether any users of Spark/MLlib have an immediate application
for these features.

On Wed, Aug 26, 2015 at 12:10 AM, alexis GILLAIN  wrote:

> A first use case of gap constraint is included in the article.
> Another application would be customer-shopping sequence analysis where you
> want to put a constraint on the duration between two purchases for them to
> be considered as a pertinent sequence.
>
> Additional question regarding the code : what's the point of using 
> ReversedPrefix
> in localprefispan ? The prefix is used neither in finding frequent items
> of a projected database or computing a new projected database so it looks
> like it's appended in inverse order just to be reversed when transformed to
> a sequence.
>
> 2015-08-25 12:15 GMT+08:00 Feynman Liang :
>
>> CCing the mailing list again.
>>
>> It's currently not on the radar. Do you have a use case for it? I can
>> bring it up during 1.6 roadmap planning tomorrow.
>>
>> On Mon, Aug 24, 2015 at 8:28 PM, alexis GILLAIN 
>> wrote:
>>
>>> Hi,
>>>
>>> I just realized the article I mentioned is cited in the jira and not in
>>> the code so I guess you didn't use this result.
>>>
>>> Do you plan to implement sequence with timestamp and gap constraint as
>>> in :
>>>
>>>
>>> https://people.mpi-inf.mpg.de/~rgemulla/publications/miliaraki13mg-fsm.pdf
>>>
>>> 2015-08-25 7:06 GMT+08:00 Feynman Liang :
>>>
 Hi Alexis,

 Unfortunately, both of the papers you referenced appear to be
 translations and are quite difficult to understand. We followed
 http://doi.org/10.1109/ICDE.2001.914830 when implementing PrefixSpan.
 Perhaps you can find the relevant lines in there so I can elaborate 
 further?

 Feynman

 On Thu, Aug 20, 2015 at 9:07 AM, alexis GILLAIN 
 wrote:

> I want to use prefixspan so I had a look at the code and the cited
> paper : "Distributed PrefixSpan Algorithm Based on MapReduce".
>
> There is a result in the paper I didn't really undertstand and I
> could'nt find where it is used in the code.
>
> Suppose a sequence database S = {­1­,2...­n}, a sequence  is a
> length-(L-1) (2≤L≤n) sequential pattern, in projected databases which is a
> prefix of a length-(L-1) sequential pattern , when the support 
> count
> of  is not less than min_support, it is equal to obtaining a length-L
> sequential pattern < a ... a > from projected databases that obtaining a
> length-L sequential pattern < a ... a > from a sequence database S.
>
> According to the paper It's supposed to add a pruning step in the
> reduce function but I couldn't find where.
>
> This result seems to come from a previous paper : "Wang Linlin, Fan
> Jun. Improved Algorithm for Sequential Pattern Mining Based on PrefixSpan
> [J]. Computer Engineering, 2009, 35(23): 56-61" but it didn't help me to
> understand it and how it can improve the algorithm.
>


>>>
>>
>


Re: Question on take function - Spark Java API

2015-08-26 Thread Sonal Goyal
You can try using wholeTextFile which will give you a pair rdd of fileName,
content. flatMap through this and manipulate the content.

Best Regards,
Sonal
Founder, Nube Technologies 
Check out Reifier at Spark Summit 2015






On Wed, Aug 26, 2015 at 8:25 AM, Pankaj Wahane 
wrote:

> Hi community members,
>
>
> Apache Spark is Fantastic and very easy to learn.. Awesome work!!!
>
> *Question:*
>
> I have multiple files in a folder and and the first line in each file is
> name of the asset that the file belongs to. Second line is csv header row
> and data starts from third row..
>
> Ex: File 1
>
> TestAsset01
> Time,dp_1,dp_2,dp_3
> 11-01-2015 15:00:00,123,456,789
> 11-01-2015 15:00:01,123,456,789
> . . .
>
> Ex: File 2
>
> TestAsset02
> Time,dp_1,dp_2,dp_3
> 11-01-2015 15:00:00,1230,4560,7890
> 11-01-2015 15:00:01,1230,4560,7890
> . . .
>
> I have got nearly 1000 files in each folder sizing ~10G
>
> I am using apache spark Java api to read all this files.
>
> Following is code extract that I am using:
>
> try (JavaSparkContext sc = new JavaSparkContext(conf)) {
> Map readingTypeMap = getReadingTypesMap(sc);
> //Read File
> JavaRDD data = 
> sc.textFile(resourceBundle.getString(FOLDER_NAME));
> //Get Asset
> String asset = data.take(1).get(0);
> //Extract Time Series Data
> JavaRDD actualData = data.filter(line -> 
> line.contains(DELIMERTER));
> //Strip header
> String header = actualData.take(1).get(0);
> String[] headers = header.split(DELIMERTER);
> //Extract actual data
> JavaRDD timeSeriesLines = actualData.filter(line -> 
> !line.equals(header));
> //Extract valid records
> JavaRDD validated = timeSeriesLines.filter(line -> 
> validate(line));
> //Find Granularity
> Integer granularity = 
> toInt(resourceBundle.getString(GRANULARITY));
> //Transform to TSD objects
> JavaRDD tsdFlatMap = 
> transformTotimeSeries(validated, asset, readingTypeMap, headers, granularity);
>
> //Save to Cassandra
> 
> javaFunctions(tsdFlatMap).writerBuilder(resourceBundle.getString("cassandra.tsd.keyspace"),
> "time_series_data", 
> mapToRow(TimeSeriesData.class)).saveToCassandra();
>
> System.out.println("Total Records: " + timeSeriesLines.count());
> System.out.println("Valid Records: " + validated.count());
> }
>
> Within TimeSeriesData Object I need to set the asset name for the reading,
> so I need output of data.take(1) to be different for different files.
>
> Thank You.
>
> Best Regards,
> Pankaj
>
>
>
>
> QIO Technologies Limited is a limited company registered in England &
> Wales at 1 Curzon Street, London, England, W1J 5HD, with registered number
> 09368431
>
> This message and the information contained within it is intended solely
> for the addressee and may contain confidential or privileged information.
> If you have received this message in error please notify QIO Technologies
> Limited immediately and then permanently delete this message. If you are
> not the intended addressee then you must not copy, transmit, disclose or
> rely on the information contained in this message or in any attachment to
> it, all such use is prohibited to maximum extent possible by law.
>


SPARK_DIST_CLASSPATH, primordial class loader & app ClassNotFound

2015-08-26 Thread Night Wolf
Hey all,

I'm trying to do some stuff with a YAML file in the Spark driver using
SnakeYAML library in scala.

When I put the snakeyaml v1.14 jar on the SPARK_DIST_CLASSPATH and try to
de-serialize some objects from YAML into classes in my app JAR on the
driver (only the driver). I get the exception below.

Yet when I dont have the snakeyaml jar on the SPARK_DIST_CLASSPATH but
instead create a fat jar for my application (so it has the snakeyaml jar
baked inside), then everything works fine. If I have both the jar on
DIST_CLASSPATH & fat jarred (with sbt assembly) it still fails with the
same exception.

I'm guessing that SPARK_DIST_CLASSPATH has jars that end up in the
'primordial' class loader, because SnakeYAML is then live in this class
loader, it cant find my application jar classes because they are loaded at
a subsequent point/different class loader.

What is the workaround for this?

Thanks
~N


Exception in thread "main" com.sai.cfg.InvalidConfigurationPropertyException
at
com.sai.cfg.ConfigurationParser$.getConfig(ConfigurationParser.scala:184)
at
com.sai.cfg.ConfigurationParser$.getGlobalConfig(ConfigurationParser.scala:171)
at
com.sai.cfg.ConfigurationParser.globalConfig$lzycompute(ConfigurationParser.scala:26)
at
com.sai.cfg.ConfigurationParser.globalConfig(ConfigurationParser.scala:26)
at
com.sai.cfg.ConfigurationParser.(ConfigurationParser.scala:44)
at
com.sai.strategy.StrategyEngineMain$.delayedEndpoint$au$com$quantium$personalisation$strategy$StrategyEngineMain$1(StrategyEngineMain.scala:39)
at
com.sai.strategy.StrategyEngineMain$delayedInit$body.apply(StrategyEngineMain.scala:15)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at
com.sai.strategy.StrategyEngineMain$.main(StrategyEngineMain.scala:15)
at
com.sai.strategy.StrategyEngineMain.main(StrategyEngineMain.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.yaml.snakeyaml.error.YAMLException: Class not found:
com.sai.cfg.models.GlobalConfiguration
at
org.yaml.snakeyaml.constructor.Constructor.getClassForNode(Constructor.java:647)
at
org.yaml.snakeyaml.constructor.Constructor$ConstructYamlObject.getConstructor(Constructor.java:330)
at
org.yaml.snakeyaml.constructor.Constructor$ConstructYamlObject.construct(Constructor.java:340)
at
org.yaml.snakeyaml.constructor.BaseConstructor.constructObject(BaseConstructor.java:182)
at
org.yaml.snakeyaml.constructor.BaseConstructor.constructDocument(BaseConstructor.java:141)
at
org.yaml.snakeyaml.constructor.BaseConstructor.getSingleData(BaseConstructor.java:127)
at org.yaml.snakeyaml.Yaml.loadFromReader(Yaml.java:481)
at org.yaml.snakeyaml.Yaml.load(Yaml.java:400)
at
com.sai.cfg.ConfigurationParser$.getConfig(ConfigurationParser.scala:181)
... 24 more


Re: reduceByKey not working on JavaPairDStream

2015-08-26 Thread Sean Owen
I don't see that you invoke any action in this code. It won't do
anything unless you tell it to perform an action that requires the
transformations.

On Wed, Aug 26, 2015 at 7:05 AM, Deepesh Maheshwari
 wrote:
> Hi,
> I have applied mapToPair and then a reduceByKey on a DStream to obtain a
> JavaPairDStream>.
> I have to apply a flatMapToPair and reduceByKey on the DSTream Obtained
> above.
> But i do not see any logs from reduceByKey operation.
> Can anyone explain why is this happening..?
>
>
> find My Code Below -
>
>  /***
>  * GroupLevel1 Groups - articleId, host and tags
>  */
> JavaPairDStream> groupLevel1 =
> inputDataMap
>
> .mapToPair(
> new PairFunction, String,
> Map>() {
>
> private static final long serialVersionUID =
> 5196132687044875422L;
>
> @Override
> public Tuple2> call(
> Map map) throws
> Exception {
> String host = (String) map.get("host");
> String articleId = (String)
> map.get("articleId");
> List tags = (List) map.get("tags");
>
> if (host == null || articleId == null) {
> logger.error("*** Error Doc
> \n" + map);
> }
> String key = "articleId_" + articleId +
> "_host_" + host + "_tags_" + tags.toString();
>
> //logger.info(key);
> System.out.println("Printing Key - " + key);
> map.put("articlecount", 1L);
>
> return new Tuple2 Object>>(key, map);
> }
> })
> .reduceByKey(
> new Function2, Map Object>, Map>() {
>
> private static final long serialVersionUID = 1L;
>
> @Override
> public Map call(
> Map map1,
> Map map2) throws
> Exception {
> Long count1 = (Long)
> map1.get("articlecount");
> Long count2 = (Long)
> map2.get("articlecount");
>
> map1.put("articlecount", count1 + count2);
> return map1;
> }
> });
>
>
>
>
>
> /***
>  * Grouping level 1 groups on articleId+host+tags
>  * Tags can be multiple for an article.
>  * Grouping level 2 does -
>  *  1. For each tag in a row, find occurrence of that tag in other
> rows.
>  *  2. If one tag found in another row, then add the articleCount of
> current and new row and put as articleCount for that tag.
>  *  Note -
>  *  Idea behind this grouping is to get all article counts that
> contain a particular tag and preserve this value.
>  */
>
>
> JavaPairDStream> groupLevel2 =
> groupLevel1.flatMapToPair(new PairFlatMapFunction Object>>, String, Map>() {
> @Override
> public Iterable>>
> call(Tuple2> stringMapTuple2) throws Exception {
> System.out.println("group level 2 tuple 1 -" +
> stringMapTuple2._1());
> System.out.println("group level 2 tuple 2 -" +
> stringMapTuple2._2());
> ArrayList tagList = (ArrayList)
> stringMapTuple2._2().get("tags");
> ArrayList tagKeyList = new ArrayList();
> String host = (String) stringMapTuple2._2().get("host");
> StringBuilder key;
> for (String tag : tagList) {
> key = new
> StringBuilder("host_").append(host).append("_tag_").append(tag);
> System.out.println("generated Key - "+key);
> tagKeyList.add(new Tuple2 Object>>(key.toString(), stringMapTuple2._2()));
> }
> return tagKeyList;
> }
> });
>
> groupLevel2 = groupLevel2.reduceByKey(new Function2 Object>, Map, Map>() {
> @Override
> public Map call(Map dataMap1,
> Map dataMap2) throws Exception {
> System.out.println("Type of article map in 1 " +
> dataMap1.get("articleId").getClass());
> System.out.println("Type of article map in 2 " +
> dataMap2.get("articleId").getClass());
> Map articleMap1 = (Map)
> dataMap1.get("articleId");
> Map articleMap2 = (Map)
> dataMap2.get("articleId");
>
> if (articleMap1 == null || articleMap1.isEmpty()) {
> System.out.println("returning because map 1 null");
> return

Relation between threads and executor core

2015-08-26 Thread Samya
Hi All,

Few basic queries :-
1. Is there a way we can control the number of threads per executor core?
2. Does this parameter “executor-cores” also has say in deciding how many
threads to be run?

Regards,
Sam



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Relation-between-threads-and-executor-core-tp24456.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: DataFrame Parquet Writer doesn't keep schema

2015-08-26 Thread Petr Novak
The same as
https://mail.google.com/mail/#label/Spark%2Fuser/14f64c75c15f5ccd

Please follow the discussion there.

On Tue, Aug 25, 2015 at 5:02 PM, Petr Novak  wrote:

> Hi all,
> when I read parquet files with "required" fields aka nullable=false they
> are read correctly. Then I save them (df.write.parquet) and read again all
> my fields are saved and read as optional, aka nullable=true. Which means I
> suddenly have files with incompatible schemas. This happens on 1.3.0-1.4.1
> and even on 1.5.1-rc1.
>
> Should I set some write option to keep nullability? Is there a specific
> reason why nullability is always overriden to true?
>
> Many thanks,
> Peter
>
>
>


Re:Re:Re: How to increase data scale in Spark SQL Perf

2015-08-26 Thread Todd

Sorry  for the noise, It's my bad...I have worked it out now.

At 2015-08-26 13:20:57, "Todd"  wrote:



I think the answer is No. I only see such message on the console..and #2 is the 
thread stack trace。
I am thinking is that in Spark SQL Perf forks many dsdgen process to generate 
data when the scalafactor is increased which at last exhaust the JVM
When thread exception is thrown on the console and I leave it there for some 
while(15min about),then eventually I will see OutOfMemory occur

Can you guys try to run it if you have the environment ? I think you may 
reproduce it. Thanks!







At 2015-08-26 13:01:34, "Ted Yu"  wrote:

The error in #1 below was not informative. 


Are you able to get more detailed error message ?


Thanks




On Aug 25, 2015, at 6:57 PM, Todd  wrote:




Thanks Ted Yu.

Following are the error message:
1. The exception that is shown on the UI is :
Exception in thread "Thread-113" Exception in thread "Thread-126" Exception in 
thread "Thread-64" Exception in thread "Thread-90" Exception in thread 
"Thread-117" Exception in thread "Thread-80" Exception in thread "Thread-115" 
Exception in thread "ResponseProcessor for block 
BP-1564562096-172.18.149.132-1435294011279:blk_1073846767_105984" Exception in 
thread "qtp1270119920-57" Exception in thread "Thread-77" Exception in thread 
"Thread-132" Exception in thread "Thread-68" Exception in thread "Thread-61" 
Exception in thread "Thread-70" Exception in thread "qtp1270119920-52" 
Exception in thread "Thread-88" Exception in thread "qtp318933312-47" Exception 
in thread "qtp1270119920-56"

2. jstack the process, I see bunch of following message:

Thread 31258: (state = BLOCKED)
 - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
 - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame)
 - java.lang.UNIXProcess.waitFor() @bci=8, line=263 (Interpreted frame)
 - scala.sys.process.ProcessImpl$SimpleProcess.exitValue() @bci=4, line=218 
(Interpreted frame)
 - 
scala.sys.process.ProcessBuilderImpl$AbstractBuilder$$anonfun$lines$1.apply$mcV$sp()
 @bci=11, line=142 (Interpreted frame)
 - scala.sys.process.ProcessImpl$Spawn$$anon$1.run() @bci=4, line=22 
(Interpreted frame)


Thread 31257: (state = BLOCKED)
 - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
 - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame)
 - java.lang.UNIXProcess.waitFor() @bci=8, line=263 (Interpreted frame)
 - scala.sys.process.ProcessImpl$SimpleProcess.exitValue() @bci=4, line=218 
(Interpreted frame)
 - 
scala.sys.process.ProcessBuilderImpl$AbstractBuilder$$anonfun$lines$1.apply$mcV$sp()
 @bci=11, line=142 (Interpreted frame)
 - scala.sys.process.ProcessImpl$Spawn$$anon$1.run() @bci=4, line=22 
(Interpreted frame)







At 2015-08-25 19:32:56, "Ted Yu"  wrote:

Looks like you were attaching images to your email which didn't go through.


Consider using third party site for images - or paste error in text.


Cheers


On Tue, Aug 25, 2015 at 4:22 AM, Todd  wrote:

Hi,
The spark sql perf itself contains benchmark data generation. I am using spark 
shell to run the spark sql perf to generate the data with 10G memory for both 
driver and executor.
When I increase the scalefactor to be 30,and run the job, Then I got the 
following error:



When I jstack it to see the status of the thread. I see the following: looks it 
is waiting for the process that the spark job kicks off.








Re: Relation between threads and executor core

2015-08-26 Thread Jem Tucker
Hi Samya,

When submitting an application with spark-submit the cores per executor can
be set with --executor-cores, meaning you can run that many tasks per
executor concurrently. The page below has some more details on submitting
applications:

https://spark.apache.org/docs/latest/submitting-applications.html

thanks,

Jem

On Wed, Aug 26, 2015 at 9:47 AM Samya  wrote:

> Hi All,
>
> Few basic queries :-
> 1. Is there a way we can control the number of threads per executor core?
> 2. Does this parameter “executor-cores” also has say in deciding how many
> threads to be run?
>
> Regards,
> Sam
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Relation-between-threads-and-executor-core-tp24456.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to increase data scale in Spark SQL Perf

2015-08-26 Thread Ted Yu
Mind sharing how you fixed the issue ?

Cheers



> On Aug 26, 2015, at 1:50 AM, Todd  wrote:
> 
> 
> Sorry  for the noise, It's my bad...I have worked it out now. 
> 
> At 2015-08-26 13:20:57, "Todd"  wrote:
> 
> I think the answer is No. I only see such message on the console..and #2 is 
> the thread stack trace。
> I am thinking is that in Spark SQL Perf forks many dsdgen process to generate 
> data when the scalafactor is increased which at last exhaust the JVM
> When thread exception is thrown on the console and I leave it there for some 
> while(15min about),then eventually I will see OutOfMemory occur
> 
> Can you guys try to run it if you have the environment ? I think you may 
> reproduce it. Thanks!
> 
> 
> 
> 
> 
> At 2015-08-26 13:01:34, "Ted Yu"  wrote:
> The error in #1 below was not informative. 
> 
> Are you able to get more detailed error message ?
> 
> Thanks
> 
> 
> 
>> On Aug 25, 2015, at 6:57 PM, Todd  wrote:
>> 
>> 
>> Thanks Ted Yu.
>> 
>> Following are the error message:
>> 1. The exception that is shown on the UI is :
>> Exception in thread "Thread-113" Exception in thread "Thread-126" Exception 
>> in thread "Thread-64" Exception in thread "Thread-90" Exception in thread 
>> "Thread-117" Exception in thread "Thread-80" Exception in thread 
>> "Thread-115" Exception in thread "ResponseProcessor for block 
>> BP-1564562096-172.18.149.132-1435294011279:blk_1073846767_105984" Exception 
>> in thread "qtp1270119920-57" Exception in thread "Thread-77" Exception in 
>> thread "Thread-132" Exception in thread "Thread-68" Exception in thread 
>> "Thread-61" Exception in thread "Thread-70" Exception in thread 
>> "qtp1270119920-52" Exception in thread "Thread-88" Exception in thread 
>> "qtp318933312-47" Exception in thread "qtp1270119920-56" 
>> 
>> 2. jstack the process, I see bunch of following message:
>> 
>> Thread 31258: (state = BLOCKED)
>>  - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
>>  - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame)
>>  - java.lang.UNIXProcess.waitFor() @bci=8, line=263 (Interpreted frame)
>>  - scala.sys.process.ProcessImpl$SimpleProcess.exitValue() @bci=4, line=218 
>> (Interpreted frame)
>>  - 
>> scala.sys.process.ProcessBuilderImpl$AbstractBuilder$$anonfun$lines$1.apply$mcV$sp()
>>  @bci=11, line=142 (Interpreted frame)
>>  - scala.sys.process.ProcessImpl$Spawn$$anon$1.run() @bci=4, line=22 
>> (Interpreted frame)
>> 
>> 
>> Thread 31257: (state = BLOCKED)
>>  - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
>>  - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame)
>>  - java.lang.UNIXProcess.waitFor() @bci=8, line=263 (Interpreted frame)
>>  - scala.sys.process.ProcessImpl$SimpleProcess.exitValue() @bci=4, line=218 
>> (Interpreted frame)
>>  - 
>> scala.sys.process.ProcessBuilderImpl$AbstractBuilder$$anonfun$lines$1.apply$mcV$sp()
>>  @bci=11, line=142 (Interpreted frame)
>>  - scala.sys.process.ProcessImpl$Spawn$$anon$1.run() @bci=4, line=22 
>> (Interpreted frame)
>> 
>> 
>> 
>> 
>> 
>> At 2015-08-25 19:32:56, "Ted Yu"  wrote:
>> Looks like you were attaching images to your email which didn't go through.
>> 
>> Consider using third party site for images - or paste error in text.
>> 
>> Cheers
>> 
>>> On Tue, Aug 25, 2015 at 4:22 AM, Todd  wrote:
>>> Hi,
>>> The spark sql perf itself contains benchmark data generation. I am using 
>>> spark shell to run the spark sql perf to generate the data with 10G memory 
>>> for both driver and executor. 
>>> When I increase the scalefactor to be 30,and run the job, Then I got the 
>>> following error:
>>> 
>>> 
>>> 
>>> When I jstack it to see the status of the thread. I see the following: 
>>> looks it is waiting for the process that the spark job kicks off.
>>> 
>>> 
>>> 
>>> 
>> 


Re:Re: How to increase data scale in Spark SQL Perf

2015-08-26 Thread Todd
Increase the number of executors, :-)



At 2015-08-26 16:57:48, "Ted Yu"  wrote:

Mind sharing how you fixed the issue ?


Cheers




On Aug 26, 2015, at 1:50 AM, Todd  wrote:



Sorry  for the noise, It's my bad...I have worked it out now.

At 2015-08-26 13:20:57, "Todd"  wrote:



I think the answer is No. I only see such message on the console..and #2 is the 
thread stack trace。
I am thinking is that in Spark SQL Perf forks many dsdgen process to generate 
data when the scalafactor is increased which at last exhaust the JVM
When thread exception is thrown on the console and I leave it there for some 
while(15min about),then eventually I will see OutOfMemory occur

Can you guys try to run it if you have the environment ? I think you may 
reproduce it. Thanks!







At 2015-08-26 13:01:34, "Ted Yu"  wrote:

The error in #1 below was not informative. 


Are you able to get more detailed error message ?


Thanks




On Aug 25, 2015, at 6:57 PM, Todd  wrote:




Thanks Ted Yu.

Following are the error message:
1. The exception that is shown on the UI is :
Exception in thread "Thread-113" Exception in thread "Thread-126" Exception in 
thread "Thread-64" Exception in thread "Thread-90" Exception in thread 
"Thread-117" Exception in thread "Thread-80" Exception in thread "Thread-115" 
Exception in thread "ResponseProcessor for block 
BP-1564562096-172.18.149.132-1435294011279:blk_1073846767_105984" Exception in 
thread "qtp1270119920-57" Exception in thread "Thread-77" Exception in thread 
"Thread-132" Exception in thread "Thread-68" Exception in thread "Thread-61" 
Exception in thread "Thread-70" Exception in thread "qtp1270119920-52" 
Exception in thread "Thread-88" Exception in thread "qtp318933312-47" Exception 
in thread "qtp1270119920-56"

2. jstack the process, I see bunch of following message:

Thread 31258: (state = BLOCKED)
 - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
 - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame)
 - java.lang.UNIXProcess.waitFor() @bci=8, line=263 (Interpreted frame)
 - scala.sys.process.ProcessImpl$SimpleProcess.exitValue() @bci=4, line=218 
(Interpreted frame)
 - 
scala.sys.process.ProcessBuilderImpl$AbstractBuilder$$anonfun$lines$1.apply$mcV$sp()
 @bci=11, line=142 (Interpreted frame)
 - scala.sys.process.ProcessImpl$Spawn$$anon$1.run() @bci=4, line=22 
(Interpreted frame)


Thread 31257: (state = BLOCKED)
 - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
 - java.lang.Object.wait() @bci=2, line=503 (Interpreted frame)
 - java.lang.UNIXProcess.waitFor() @bci=8, line=263 (Interpreted frame)
 - scala.sys.process.ProcessImpl$SimpleProcess.exitValue() @bci=4, line=218 
(Interpreted frame)
 - 
scala.sys.process.ProcessBuilderImpl$AbstractBuilder$$anonfun$lines$1.apply$mcV$sp()
 @bci=11, line=142 (Interpreted frame)
 - scala.sys.process.ProcessImpl$Spawn$$anon$1.run() @bci=4, line=22 
(Interpreted frame)







At 2015-08-25 19:32:56, "Ted Yu"  wrote:

Looks like you were attaching images to your email which didn't go through.


Consider using third party site for images - or paste error in text.


Cheers


On Tue, Aug 25, 2015 at 4:22 AM, Todd  wrote:

Hi,
The spark sql perf itself contains benchmark data generation. I am using spark 
shell to run the spark sql perf to generate the data with 10G memory for both 
driver and executor.
When I increase the scalefactor to be 30,and run the job, Then I got the 
following error:



When I jstack it to see the status of the thread. I see the following: looks it 
is waiting for the process that the spark job kicks off.








RE: Relation between threads and executor core

2015-08-26 Thread Samya MAITI
Thanks Jem, I do understand your suggestion. Actually --executor-cores alone 
doesn’t control the number of tasks, but is also governed by spark.task.cpus 
(amount of cores dedicated for each task’s execution).

Reframing my Question, How many threads can be spawned per executor core? Is it 
in user control?

Regards,
Sam

From: Jem Tucker [mailto:jem.tuc...@gmail.com]
Sent: Wednesday, August 26, 2015 2:26 PM
To: Samya MAITI ; user@spark.apache.org
Subject: Re: Relation between threads and executor core

Hi Samya,

When submitting an application with spark-submit the cores per executor can be 
set with --executor-cores, meaning you can run that many tasks per executor 
concurrently. The page below has some more details on submitting applications:

https://spark.apache.org/docs/latest/submitting-applications.html

thanks,

Jem

On Wed, Aug 26, 2015 at 9:47 AM Samya 
mailto:samya.ma...@amadeus.com>> wrote:
Hi All,

Few basic queries :-
1. Is there a way we can control the number of threads per executor core?
2. Does this parameter “executor-cores” also has say in deciding how many
threads to be run?

Regards,
Sam



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Relation-between-threads-and-executor-core-tp24456.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org


Performance issue with Spark join

2015-08-26 Thread lucap
Hi,

I'm trying to perform an ETL using Spark, but as soon as I start performing
joins performance degrades a lot. Let me explain what I'm doing and what I
found out until now.

First of all, I'm reading avro files that are on a Cloudera cluster, using
commands like this:
/val tab1 = sc.hadoopFile("hdfs:///path/to/file",
classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]],
classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]],
classOf[org.apache.hadoop.io.NullWritable], 10)/

After this, I'm applying some filter functions to data (to reproduce "where"
clauses of the original query) and then I'm using one map for each table in
order to translate RDD elements in (key,record) format. Let's say I'm doing
this:
/val elabTab1 = tab1.filter(...).map()/

It is important to notice that if I do something like /elabTab1.first/ or
/elabTab1.count/ the task is performed in a short time, let's say around
impala's time. Now I need to do the following:
/val joined = elabTab1.leftOuterJoin(elabTab2)/
Then I tried something like /joined.count/ to test performance, but it
degraded really a lot (let's say that a count on a single table takes like 4
seconds and the count on the joined table takes 12 minutes). I think there's
a problem with the configuration, but what might it be?

I'll give you some more information:
1] Spark is running on YARN on a Cloudera cluster
2] I'm starting spark-shell with a command like /spark-shell
--executor-cores 5 --executor-memory 10G/ that gives the shell approx 10
vcores and 25 GB of memory
3] The task seems still for a lot of time after the map tasks, with the
following message in console: /Asked to send map output locations for
shuffle ... to .../
4] If I open the stderr of the executors, I can read plenty of messages like
the following: /Thread ... spilling in-memory map of ... MB to disk/, where
MBs are in the order of 300-400
5] I tried to raise the number of executors, but the situation didn't seem
to change much. I also tried to change the number of splits of the avro
files (currently set to 10), but it didn't seem to change much as well
6] Tables aren't particularly big, the bigger one should be few GBs

Regards,
Luca



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Performance-issue-with-Spark-join-tp24458.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Performance issue with Spark join

2015-08-26 Thread Hemant Bhanawat
Spark joins are different than traditional database joins because of the
lack of support of indexes.  Spark has to shuffle data between various
nodes to perform joins. Hence joins are bound to be much slower than count
which is just a parallel scan of the data.

Still, to ensure that nothing is wrong with the setup, you may want to look
at your Spark Task UI. You may want to look at the Shuffle Reads and
Shuffle write parameters.

On Wed, Aug 26, 2015 at 3:08 PM, lucap  wrote:

> Hi,
>
> I'm trying to perform an ETL using Spark, but as soon as I start performing
> joins performance degrades a lot. Let me explain what I'm doing and what I
> found out until now.
>
> First of all, I'm reading avro files that are on a Cloudera cluster, using
> commands like this:
> /val tab1 = sc.hadoopFile("hdfs:///path/to/file",
> classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]],
> classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]],
> classOf[org.apache.hadoop.io.NullWritable], 10)/
>
> After this, I'm applying some filter functions to data (to reproduce
> "where"
> clauses of the original query) and then I'm using one map for each table in
> order to translate RDD elements in (key,record) format. Let's say I'm doing
> this:
> /val elabTab1 = tab1.filter(...).map()/
>
> It is important to notice that if I do something like /elabTab1.first/ or
> /elabTab1.count/ the task is performed in a short time, let's say around
> impala's time. Now I need to do the following:
> /val joined = elabTab1.leftOuterJoin(elabTab2)/
> Then I tried something like /joined.count/ to test performance, but it
> degraded really a lot (let's say that a count on a single table takes like
> 4
> seconds and the count on the joined table takes 12 minutes). I think
> there's
> a problem with the configuration, but what might it be?
>
> I'll give you some more information:
> 1] Spark is running on YARN on a Cloudera cluster
> 2] I'm starting spark-shell with a command like /spark-shell
> --executor-cores 5 --executor-memory 10G/ that gives the shell approx 10
> vcores and 25 GB of memory
> 3] The task seems still for a lot of time after the map tasks, with the
> following message in console: /Asked to send map output locations for
> shuffle ... to .../
> 4] If I open the stderr of the executors, I can read plenty of messages
> like
> the following: /Thread ... spilling in-memory map of ... MB to disk/, where
> MBs are in the order of 300-400
> 5] I tried to raise the number of executors, but the situation didn't seem
> to change much. I also tried to change the number of splits of the avro
> files (currently set to 10), but it didn't seem to change much as well
> 6] Tables aren't particularly big, the bigger one should be few GBs
>
> Regards,
> Luca
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Performance-issue-with-Spark-join-tp24458.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


JobScheduler: Error generating jobs for time for custom InputDStream

2015-08-26 Thread Juan Rodríguez Hortalá
Hi,

I've developed a ScalaCheck property for testing Spark Streaming
transformations. To do that I had to develop a custom InputDStream, which
is very similar to QueueInputDStream but has a method for adding new test
cases for dstreams, which are objects of type Seq[Seq[A]], to the DStream.
You can see the code at
https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/main/scala/org/apache/spark/streaming/dstream/DynSeqQueueInputDStream.scala.
I have developed a few properties that run in local mode
https://github.com/juanrh/sscheck/blob/32c2bff66aa5500182e0162a24ecca6d47707c42/src/test/scala/es/ucm/fdi/sscheck/spark/streaming/ScalaCheckStreamingTest.scala.
The problem is that when the batch interval is too small, and the machine
cannot complete the batches fast enough, I get the following exceptions in
the Spark log

15/08/26 11:22:02 ERROR JobScheduler: Error generating jobs for time
1440580922500 ms
java.lang.NullPointerException
at
org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$count$1$$anonfun$apply$18.apply(DStream.scala:587)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21.apply(DStream.scala:654)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:668)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5.apply(DStream.scala:666)
at
org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:41)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at
org.apache.spark.streaming.dstream.ShuffledDStream.compute(ShuffledDStream.scala:41)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)

Build k-NN graph for large dataset

2015-08-26 Thread Jaonary Rabarisoa
Dear all,

I'm trying to find an efficient way to build a k-NN graph for a large
dataset. Precisely, I have a large set of high dimensional vector (say d
>>> 1) and I want to build a graph where those high dimensional points
are the vertices and each one is linked to the k-nearest neighbor based on
some kind similarity defined on the vertex spaces.
My problem is to implement an efficient algorithm to compute the weight
matrix of the graph. I need to compute a N*N similarities and the only way
I know is to use "cartesian" operation follow by "map" operation on RDD.
But, this is very slow when the N is large. Is there a more cleaver way to
do this for an arbitrary similarity function ?

Cheers,

Jao


Re: Build k-NN graph for large dataset

2015-08-26 Thread Robin East
You could try dimensionality reduction (PCA or SVD) first. I would imagine that 
even if you could successfully compute similarities in the high-dimensional 
space you would probably run into the curse of dimensionality.
> On 26 Aug 2015, at 12:35, Jaonary Rabarisoa  wrote:
> 
> Dear all,
> 
> I'm trying to find an efficient way to build a k-NN graph for a large 
> dataset. Precisely, I have a large set of high dimensional vector (say d >>> 
> 1) and I want to build a graph where those high dimensional points are 
> the vertices and each one is linked to the k-nearest neighbor based on some 
> kind similarity defined on the vertex spaces. 
> My problem is to implement an efficient algorithm to compute the weight 
> matrix of the graph. I need to compute a N*N similarities and the only way I 
> know is to use "cartesian" operation follow by "map" operation on RDD. But, 
> this is very slow when the N is large. Is there a more cleaver way to do this 
> for an arbitrary similarity function ? 
> 
> Cheers,
> 
> Jao


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Relation between threads and executor core

2015-08-26 Thread Jem Tucker
Sam,

This may be of interest, as far as i can see it suggests that a spark
'task' is always executed as a single thread in the JVM.

http://0x0fff.com/spark-architecture/

Thanks,

Jem



On Wed, Aug 26, 2015 at 10:06 AM Samya MAITI 
wrote:

> Thanks Jem, I do understand your suggestion. Actually --executor-cores
> alone doesn’t control the number of tasks, but is also governed by
> *spark.task.cpus* (amount of cores dedicated for each task’s execution).
>
>
>
> Reframing my Question*, How many threads can be spawned per executor
> core? Is it in user control? *
>
>
>
> Regards,
>
> Sam
>
>
>
> *From:* Jem Tucker [mailto:jem.tuc...@gmail.com]
> *Sent:* Wednesday, August 26, 2015 2:26 PM
> *To:* Samya MAITI ; user@spark.apache.org
> *Subject:* Re: Relation between threads and executor core
>
>
>
> Hi Samya,
>
>
>
> When submitting an application with spark-submit the cores per executor
> can be set with --executor-cores, meaning you can run that many tasks per
> executor concurrently. The page below has some more details on submitting
> applications:
>
>
>
> https://spark.apache.org/docs/latest/submitting-applications.html
>
>
>
> thanks,
>
>
>
> Jem
>
>
>
> On Wed, Aug 26, 2015 at 9:47 AM Samya  wrote:
>
> Hi All,
>
> Few basic queries :-
> 1. Is there a way we can control the number of threads per executor core?
> 2. Does this parameter “executor-cores” also has say in deciding how many
> threads to be run?
>
> Regards,
> Sam
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Relation-between-threads-and-executor-core-tp24456.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Custom Offset Management

2015-08-26 Thread Deepesh Maheshwari
Hi Folks,

My Spark application interacts with kafka for getting data through Java Api.
I am using Direct Approach (No Receivers) - which use Kafka’s simple
consumer API to Read data.
So, kafka offsets need to be handles explicitly.

In case of Spark failure i need to save the offset state of kafka for
resuming from the failure point.
I am saving these points in MongoDB.

Please tell he how to initialize Kafka DirectStream with saved offset
points.
I want to initialize kafka stream in Spark Streaming with required offset
points.

There is method i gets on web.

KafkaUtils.createDirectStream(jssc, String.class, String.class,
StringDecoder.class, StringDecoder.class, kafkaParams,
topicsSet, fromOffsets, arg8);

arg8 - kafka.message.MessageAndMetadata

Please tell me how to handle and initialize this.

Regards,
Deepesh


Re: Custom Offset Management

2015-08-26 Thread Cody Koeninger
That argument takes a function from MessageAndMetadata to whatever you want
your stream to contain.

See

https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala#L57

On Wed, Aug 26, 2015 at 7:55 AM, Deepesh Maheshwari <
deepesh.maheshwar...@gmail.com> wrote:

> Hi Folks,
>
> My Spark application interacts with kafka for getting data through Java
> Api.
> I am using Direct Approach (No Receivers) - which use Kafka’s simple
> consumer API to Read data.
> So, kafka offsets need to be handles explicitly.
>
> In case of Spark failure i need to save the offset state of kafka for
> resuming from the failure point.
> I am saving these points in MongoDB.
>
> Please tell he how to initialize Kafka DirectStream with saved offset
> points.
> I want to initialize kafka stream in Spark Streaming with required offset
> points.
>
> There is method i gets on web.
>
> KafkaUtils.createDirectStream(jssc, String.class, String.class,
> StringDecoder.class, StringDecoder.class, kafkaParams,
> topicsSet, fromOffsets, arg8);
>
> arg8 - kafka.message.MessageAndMetadata
>
> Please tell me how to handle and initialize this.
>
> Regards,
> Deepesh
>


Re: Build k-NN graph for large dataset

2015-08-26 Thread Kristina Rogale Plazonic
If you don't want to compute all N^2 similarities, you need to implement
some kind of blocking first. For example, LSH (locally sensitive hashing).
A quick search gave this link to a Spark implementation:

http://stackoverflow.com/questions/2771/spark-implementation-for-locality-sensitive-hashing

On Wed, Aug 26, 2015 at 7:35 AM, Jaonary Rabarisoa 
wrote:

> Dear all,
>
> I'm trying to find an efficient way to build a k-NN graph for a large
> dataset. Precisely, I have a large set of high dimensional vector (say d
> >>> 1) and I want to build a graph where those high dimensional points
> are the vertices and each one is linked to the k-nearest neighbor based on
> some kind similarity defined on the vertex spaces.
> My problem is to implement an efficient algorithm to compute the weight
> matrix of the graph. I need to compute a N*N similarities and the only way
> I know is to use "cartesian" operation follow by "map" operation on RDD.
> But, this is very slow when the N is large. Is there a more cleaver way to
> do this for an arbitrary similarity function ?
>
> Cheers,
>
> Jao
>


Spark-on-YARN LOCAL_DIRS location

2015-08-26 Thread michael.england
Hi,

I am having issues with /tmp space filling up during Spark jobs because 
Spark-on-YARN uses the yarn.nodemanager.local-dirs for shuffle space. I noticed 
this message appears when submitting Spark-on-YARN jobs:

WARN SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by 
the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone 
and LOCAL_DIRS in YARN).

I can’t find much documentation on where to set the LOCAL_DIRS property. Please 
can someone advise whether this is a yarn-env.sh or a spark-env.sh property and 
whether it would then use the directory specified by this env variable as a 
shuffle area instead of the default yarn.nodemanager.local-dirs location?

Thanks,
Mike


This e-mail (including any attachments) is private and confidential, may 
contain proprietary or privileged information and is intended for the named 
recipient(s) only. Unintended recipients are strictly prohibited from taking 
action on the basis of information in this e-mail and must contact the sender 
immediately, delete this e-mail (and all attachments) and destroy any hard 
copies. Nomura will not accept responsibility or liability for the accuracy or 
completeness of, or the presence of any virus or disabling code in, this 
e-mail. If verification is sought please request a hard copy. Any reference to 
the terms of executed transactions should be treated as preliminary only and 
subject to formal written confirmation by Nomura. Nomura reserves the right to 
retain, monitor and intercept e-mail communications through its networks 
(subject to and in accordance with applicable laws). No confidentiality or 
privilege is waived or lost by Nomura by any mistransmission of this e-mail. 
Any reference to "Nomura" is a reference to any entity in the Nomura Holdings, 
Inc. group. Please read our Electronic Communications Legal Notice which forms 
part of this e-mail: http://www.Nomura.com/email_disclaimer.htm



Setting number of CORES from inside the Topology (JAVA code )

2015-08-26 Thread anshu shukla
Hey ,

I  need to set the number of cores from inside the topology . Its working
fine  by setting in  spark-env.sh but  unable to do  via setting key/value
for  conf .

SparkConf sparkConf = new
SparkConf().setAppName("JavaCustomReceiver").setMaster("local[4]");

if(toponame.equals("IdentityTopology"))
{
sparkConf.setExecutorEnv("SPARK_WORKER_CORES","1");
}




-- 
Thanks & Regards,
Anshu Shukla


Fwd: Issue with building Spark v1.4.1-rc4 with Scala 2.11

2015-08-26 Thread Felix Neutatz
Hi everybody,

I tried to build Spark v1.4.1-rc4 with Scala 2.11:
../apache-maven-3.3.3/bin/mvn -Dscala-2.11 -DskipTests clean install

Before running this, I deleted:
../.m2/repository/org/apache/spark
../.m2/repository/org/spark-project

My changes to the code:
I just changed line 174 of org.apache.spark.executor.Executor$TaskRunner
to:
logInfo(s"test Executor is trying to kill $taskName (TID $taskId)")

Everything builds without an error, but I have an issue.

When I look into the jar of spark-core_2.10, I can see the changed string
in Executor$TaskRunner$$anonfun$kill$1.class. But when I look
into spark-core_2.11 the corresponding string didn't change. It seems like
it downloads the jar from maven.

Do you know what I did wrong?

I also tried to run "mvn -Dscala-2.11 -DskipTests clean install" on the
current master and got the following error:

[ERROR] Failed to execute goal
org.apache.maven.plugins:maven-enforcer-plugin:1.4:enforce
(enforce-versions) on project spark-parent_2.10: Some Enforcer rules have
failed. Look above for specific messages explaining why the rule failed. ->
[Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e
switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions,
please read the following articles:
[ERROR] [Help 1]
http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException

Thank you for your help.

Best regards,
Felix


Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

2015-08-26 Thread Cody Koeninger
The first thing that stands out to me is
createOnError = true

Are you sure the checkpoint is actually loading, as opposed to failing and
starting the job anyway?  There should be info lines that look like

INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData:
Restoring KafkaRDD for time 144059718 ms [(test,1,162,220)


You should be able to tell from those whether the offset ranges being
loaded from the checkpoint look reasonable.

Also, is there a reason you're calling

directKStream.checkpoint(checkpointDuration)

Just calling checkpoint on the streaming context should be sufficient to
save the metadata



On Tue, Aug 25, 2015 at 2:36 PM, Susan Zhang  wrote:

> Sure thing!
>
> The main looks like:
>
>
> --
>
>
> val kafkaBrokers = conf.getString(s"$varPrefix.metadata.broker.list")
>
> val kafkaConf = Map(
>   "zookeeper.connect" -> zookeeper,
>   "group.id" -> options.group,
>   "zookeeper.connection.timeout.ms" -> "1",
>   "auto.commit.interval.ms" -> "1000",
>   "rebalance.max.retries" -> "25",
>   "bootstrap.servers" -> kafkaBrokers
> )
>
> val ssc = StreamingContext.getOrCreate(checkpointDirectory,
>   () => {
> createContext(kafkaConf, checkpointDirectory, topic, numThreads,
> isProd)
>   }, createOnError = true)
>
> ssc.start()
> ssc.awaitTermination()
>
>
>
> --
>
>
> And createContext is defined as:
>
>
>
> --
>
>
> val batchDuration = Seconds(5)
> val checkpointDuration = Seconds(20)
>
> private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
>
> def createContext(kafkaConf: Map[String, String],
> checkpointDirectory: String,
> topic: String,
> numThreads: Int,
> isProd: Boolean)
>   : StreamingContext = {
>
> val sparkConf = new SparkConf().setAppName("***")
> val ssc = new StreamingContext(sparkConf, batchDuration)
> ssc.checkpoint(checkpointDirectory)
>
> val topicSet = topic.split(",").toSet
> val groupId = kafkaConf.getOrElse("group.id", "")
>
> val directKStream = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaConf, topicSet)
> directKStream.checkpoint(checkpointDuration)
>
> val table = ***
>
> directKStream.foreachRDD { rdd =>
>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>   rdd.flatMap(rec => someFunc(rec))
> .reduceByKey((i1: Long, i2: Long) => if (i1 > i2) i1 else i2)
> .foreachPartition { partitionRec =>
>   val dbWrite = DynamoDBWriter()
>   partitionRec.foreach {
> /* Update Dynamo Here */
>   }
> }
>
>   /** Set up ZK Connection **/
>   val props = new Properties()
>   kafkaConf.foreach(param => props.put(param._1, param._2))
>
>   props.setProperty(AUTO_OFFSET_COMMIT, "false")
>
>   val consumerConfig = new ConsumerConfig(props)
>   assert(!consumerConfig.autoCommitEnable)
>
>   val zkClient = new ZkClient(consumerConfig.zkConnect,
> consumerConfig.zkSessionTimeoutMs,
> consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
>
>   offsetRanges.foreach { osr =>
> val topicDirs = new ZKGroupTopicDirs(groupId, osr.topic)
> val zkPath = s"${topicDirs.consumerOffsetDir}/${osr.partition}"
> ZkUtils.updatePersistentPath(zkClient, zkPath,
> osr.untilOffset.toString)
>   }
> }
> ssc
>   }
>
>
>
> On Tue, Aug 25, 2015 at 12:07 PM, Cody Koeninger 
> wrote:
>
>> Sounds like something's not set up right... can you post a minimal code
>> example that reproduces the issue?
>>
>> On Tue, Aug 25, 2015 at 1:40 PM, Susan Zhang 
>> wrote:
>>
>>> Yeah. All messages are lost while the streaming job was down.
>>>
>>> On Tue, Aug 25, 2015 at 11:37 AM, Cody Koeninger 
>>> wrote:
>>>
 Are you actually losing messages then?

 On Tue, Aug 25, 2015 at 1:15 PM, Susan Zhang 
 wrote:

> No; first batch only contains messages received after the second job
> starts (messages come in at a steady rate of about 400/second).
>
> On Tue, Aug 25, 2015 at 11:07 AM, Cody Koeninger 
> wrote:
>
>> Does the first batch after restart contain all the messages received
>> while the job was down?
>>
>> On Tue, Aug 25, 2015 at 12:53 PM, suchenzang 
>> wrote:
>>
>>> Hello,
>>>
>>> I'm using direct spark streaming (from kafka) with checkpointing, and
>>> everything works well until a restart. When I shut down (^C) the
>>> first
>>> streaming job, wait 1 minute, then re-submit, there is somehow a
>>> series of 0
>>> event batches that get queued (corresponding to th

Re: Issue with building Spark v1.4.1-rc4 with Scala 2.11

2015-08-26 Thread Ted Yu
Have you run dev/change-version-to-2.11.sh ?

Cheers

On Wed, Aug 26, 2015 at 7:07 AM, Felix Neutatz 
wrote:

> Hi everybody,
>
> I tried to build Spark v1.4.1-rc4 with Scala 2.11:
> ../apache-maven-3.3.3/bin/mvn -Dscala-2.11 -DskipTests clean install
>
> Before running this, I deleted:
> ../.m2/repository/org/apache/spark
> ../.m2/repository/org/spark-project
>
> My changes to the code:
> I just changed line 174 of org.apache.spark.executor.Executor$TaskRunner
> to:
> logInfo(s"test Executor is trying to kill $taskName (TID $taskId)")
>
> Everything builds without an error, but I have an issue.
>
> When I look into the jar of spark-core_2.10, I can see the changed string
> in Executor$TaskRunner$$anonfun$kill$1.class. But when I look
> into spark-core_2.11 the corresponding string didn't change. It seems like
> it downloads the jar from maven.
>
> Do you know what I did wrong?
>
> I also tried to run "mvn -Dscala-2.11 -DskipTests clean install" on the
> current master and got the following error:
>
> [ERROR] Failed to execute goal
> org.apache.maven.plugins:maven-enforcer-plugin:1.4:enforce
> (enforce-versions) on project spark-parent_2.10: Some Enforcer rules have
> failed. Look above for specific messages explaining why the rule failed. ->
> [Help 1]
> [ERROR]
> [ERROR] To see the full stack trace of the errors, re-run Maven with the
> -e switch.
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
> [ERROR]
> [ERROR] For more information about the errors and possible solutions,
> please read the following articles:
> [ERROR] [Help 1]
> http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
>
> Thank you for your help.
>
> Best regards,
> Felix
>
>


Re: Finding the number of executors.

2015-08-26 Thread Virgil Palanciuc
As I was writing a long-ish message to explain how it doesn't work, it
dawned on me that maybe driver connects to executors only after there's
some work to do (while I was trying to find the number of executors BEFORE
starting the actual work).

So the solution was to simply execute a dummy task (
sparkContext.parallelize(1 until 1000, 200).reduce(_+_) ) before attempting
to retrieve the executors. It works now :)

Virgil.

On Sat, Aug 22, 2015 at 12:44 AM, Du Li  wrote:

> Following is a method that retrieves the list of executors registered to a
> spark context. It worked perfectly with spark-submit in standalone mode for
> my project.
>
> /**
>* A simplified method that just returns the current active/registered
> executors
>* excluding the driver.
>* @param sc
>*   The spark context to retrieve registered executors.
>* @return
>* A list of executors each in the form of host:port.
>*/
>   def currentActiveExecutors(sc: SparkContext): Seq[String] = {
> val allExecutors = sc.getExecutorMemoryStatus.map(_._1)
> val driverHost: String = sc.getConf.get("spark.driver.host")
> allExecutors.filter(! _.split(":")(0).equals(driverHost)).toList
>   }
>
>
>
>
> On Friday, August 21, 2015 1:53 PM, Virgil Palanciuc 
> wrote:
>
>
> Hi Akhil,
>
> I'm using spark 1.4.1.
> Number of executors is not in the command line, not in the 
> getExecutorMemoryStatus
> (I already mentioned that I tried that, works in spark-shell but not when
> executed via spark-submit). I tried looking at "defaultParallelism" too,
> it's 112 (7 executors * 16 cores) when ran via spark-shell, but just 2 when
> ran via spark-submit.
>
> But the scheduler obviously knows this information. It *must* know it. How
> can I access it? Other that parsing the HTML of the WebUI, that is...
> that's pretty much guaranteed to work, and maybe I'll do that, but it's
> extremely convoluted.
>
> Regards,
> Virgil.
>
> On Fri, Aug 21, 2015 at 11:35 PM, Akhil Das 
> wrote:
>
> Which version spark are you using? There was a discussion happened over
> here
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Determine-number-of-running-executors-td19453.html
>
> http://mail-archives.us.apache.org/mod_mbox/spark-user/201411.mbox/%3ccacbyxk+ya1rbbnkwjheekpnbsbh10rykuzt-laqgpdanvhm...@mail.gmail.com%3E
> On Aug 21, 2015 7:42 AM, "Virgil Palanciuc"  wrote:
>
> Is there any reliable way to find out the number of executors
> programatically - regardless of how the job  is run? A method that
> preferably works for spark-standalone, yarn, mesos, regardless whether the
> code runs from the shell or not?
>
> Things that I tried and don't work:
> - sparkContext.getExecutorMemoryStatus.size - 1 // works from the shell,
> does not work if task submitted via  spark-submit
> - sparkContext.getConf.getInt("spark.executor.instances", 1) - doesn't
> work unless explicitly configured
> - call to http://master:8080/json (this used to work, but doesn't
> anymore?)
>
> I guess I could parse the output html from the Spark UI... but that seems
> dumb. is there really no better way?
>
> Thanks,
> Virgil.
>
>
>
>
>
>


application logs for long lived job on YARN

2015-08-26 Thread Chen Song
When running long-lived job on YARN like Spark Streaming, I found that
container logs gone after days on executor nodes, although the job itself
is still running.


I am using cdh5.4.0 and have aggregated logs enabled. Because the local
logs are gone on executor nodes, I don't see any aggregated logs on hdfs
after the job is killed or failed.

Is there a YARN config to keep the logs from being deleted for long-lived
streaming job?

-- 
Chen Song


Spark 1.3.1 saveAsParquetFile hangs on app exit

2015-08-26 Thread cingram
I have a simple test that is hanging when using s3a with spark 1.3.1. Is
there something I need to do to cleanup the S3A file system? The write to S3
appears to have worked but this job hangs in the spark-shell and using
spark-submit. Any help would be greatly appreciated. TIA.

import sqlContext.implicits._
import com.datastax.spark.connector._
case class LU(userid: String, timestamp: Long, lat: Double, lon: Double)
val uid ="testuser"
val lue = sc.cassandraTable[LU]("test", "foo").where("userid=?", uid).toDF
lue.saveAsParquetFile("s3a://twc-scratch/craig_lues")



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-saveAsParquetFile-hangs-on-app-exit-tp24460.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: JDBC Streams

2015-08-26 Thread Chen Song
Piggyback on this question.

I have a similar use case but a bit different. My job is consuming a stream
from Kafka and I need to join the Kafka stream with some reference table
from MySQL (kind of data validation and enrichment). I need to process this
stream every 1 min. The data in MySQL is not changed very often, maybe once
a few days.

So my requirement is:

* I cannot easily use broadcast variable because the data does change,
although not very often.
* I am not sure if it is good practice to read data from MySQL in every
batch (in my case, 1 min).

Anyone has done this before, any suggestions and feedback is appreciated.

Chen


On Sun, Jul 5, 2015 at 11:50 AM, Ashic Mahtab  wrote:

> If it is indeed a reactive use case, then Spark Streaming would be a good
> choice.
>
> One approach worth considering - is it possible to receive a message via
> kafka (or some other queue). That'd not need any polling, and you could use
> standard consumers. If polling isn't an issue, then writing a custom
> receiver will work fine. The way a receiver works is this:
>
> * Your receiver has a receive() function, where you'd typically start a
> loop. In your loop, you'd fetch items, and call store(entry).
> * You control everything in the receiver. If you're listening on a queue,
> you receive messages, store() and ack your queue. If you're polling, it's
> up to you to ensure delays between db calls.
> * The things you store() go on to make up the rdds in your DStream. So,
> intervals, windowing, etc. apply to those. The receiver is the boundary
> between your data source and the DStream RDDs. In other words, if your
> interval is 15 seconds with no windowing, then the things that went to
> store() every 15 seconds are bunched up into an RDD of your DStream. That's
> kind of a simplification, but should give you the idea that your "db
> polling" interval and streaming interval are not tied together.
>
> -Ashic.
>
> --
> Date: Mon, 6 Jul 2015 01:12:34 +1000
> Subject: Re: JDBC Streams
> From: guha.a...@gmail.com
> To: as...@live.com
> CC: ak...@sigmoidanalytics.com; user@spark.apache.org
>
>
> Hi
>
> Thanks for the reply. here is my situation: I hve a DB which enbles
> synchronus CDC, think this as a DBtrigger which writes to a taable with
> "changed" values as soon as something changes in production table. My job
> will need to pick up the data "as soon as it arrives" which can be every 1
> min interval. Ideally it will pick up the changes, transform it into a
> jsonand puts it to kinesis. In short, I am emulating a Kinesis producer
> with a DB source (dont even ask why, lets say these are the constraints :) )
>
> Please advice (a) is spark a good choice here (b)  whats your suggestion
> either way.
>
> I understand I can easily do it using a simple java/python app but I am
> little worried about managing scaling/fault tolerance and thats where my
> concern is.
>
> TIA
> Ayan
>
> On Mon, Jul 6, 2015 at 12:51 AM, Ashic Mahtab  wrote:
>
> Hi Ayan,
> How "continuous" is your workload? As Akhil points out, with streaming,
> you'll give up at least one core for receiving, will need at most one more
> core for processing. Unless you're running on something like Mesos, this
> means that those cores are dedicated to your app, and can't be leveraged by
> other apps / jobs.
>
> If it's something periodic (once an hour, once every 15 minutes, etc.),
> then I'd simply write a "normal" spark application, and trigger it
> periodically. There are many things that can take care of that - sometimes
> a simple cronjob is enough!
>
> --
> Date: Sun, 5 Jul 2015 22:48:37 +1000
> Subject: Re: JDBC Streams
> From: guha.a...@gmail.com
> To: ak...@sigmoidanalytics.com
> CC: user@spark.apache.org
>
>
> Thanks Akhil. In case I go with spark streaming, I guess I have to
> implment a custom receiver and spark streaming will call this receiver
> every batch interval, is that correct? Any gotcha you see in this plan?
> TIA...Best, Ayan
>
> On Sun, Jul 5, 2015 at 5:40 PM, Akhil Das 
> wrote:
>
> If you want a long running application, then go with spark streaming
> (which kind of blocks your resources). On the other hand, if you use job
> server then you can actually use the resources (CPUs) for other jobs also
> when your dbjob is not using them.
>
> Thanks
> Best Regards
>
> On Sun, Jul 5, 2015 at 5:28 AM, ayan guha  wrote:
>
> Hi All
>
> I have a requireent to connect to a DB every few minutes and bring data to
> HBase. Can anyone suggest if spark streaming would be appropriate for this
> senario or I shoud look into jobserver?
>
> Thanks in advance
>
> --
> Best Regards,
> Ayan Guha
>
>
>
>
>
> --
> Best Regards,
> Ayan Guha
>
>
>
>
> --
> Best Regards,
> Ayan Guha
>



-- 
Chen Song


Re: Spark 1.3.1 saveAsParquetFile hangs on app exit

2015-08-26 Thread Cheng Lian

Could you please show jstack result of the hanged process? Thanks!

Cheng

On 8/26/15 10:46 PM, cingram wrote:

I have a simple test that is hanging when using s3a with spark 1.3.1. Is
there something I need to do to cleanup the S3A file system? The write to S3
appears to have worked but this job hangs in the spark-shell and using
spark-submit. Any help would be greatly appreciated. TIA.

import sqlContext.implicits._
import com.datastax.spark.connector._
case class LU(userid: String, timestamp: Long, lat: Double, lon: Double)
val uid ="testuser"
val lue = sc.cassandraTable[LU]("test", "foo").where("userid=?", uid).toDF
lue.saveAsParquetFile("s3a://twc-scratch/craig_lues")



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-saveAsParquetFile-hangs-on-app-exit-tp24460.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: JDBC Streams

2015-08-26 Thread Cody Koeninger
If your data only changes every few days, why not restart the job every few
days, and just broadcast the data?

Or you can keep a local per-jvm cache with an expiry (e.g. guava cache) to
avoid many mysql reads

On Wed, Aug 26, 2015 at 9:46 AM, Chen Song  wrote:

> Piggyback on this question.
>
> I have a similar use case but a bit different. My job is consuming a
> stream from Kafka and I need to join the Kafka stream with some reference
> table from MySQL (kind of data validation and enrichment). I need to
> process this stream every 1 min. The data in MySQL is not changed very
> often, maybe once a few days.
>
> So my requirement is:
>
> * I cannot easily use broadcast variable because the data does change,
> although not very often.
> * I am not sure if it is good practice to read data from MySQL in every
> batch (in my case, 1 min).
>
> Anyone has done this before, any suggestions and feedback is appreciated.
>
> Chen
>
>
> On Sun, Jul 5, 2015 at 11:50 AM, Ashic Mahtab  wrote:
>
>> If it is indeed a reactive use case, then Spark Streaming would be a good
>> choice.
>>
>> One approach worth considering - is it possible to receive a message via
>> kafka (or some other queue). That'd not need any polling, and you could use
>> standard consumers. If polling isn't an issue, then writing a custom
>> receiver will work fine. The way a receiver works is this:
>>
>> * Your receiver has a receive() function, where you'd typically start a
>> loop. In your loop, you'd fetch items, and call store(entry).
>> * You control everything in the receiver. If you're listening on a queue,
>> you receive messages, store() and ack your queue. If you're polling, it's
>> up to you to ensure delays between db calls.
>> * The things you store() go on to make up the rdds in your DStream. So,
>> intervals, windowing, etc. apply to those. The receiver is the boundary
>> between your data source and the DStream RDDs. In other words, if your
>> interval is 15 seconds with no windowing, then the things that went to
>> store() every 15 seconds are bunched up into an RDD of your DStream. That's
>> kind of a simplification, but should give you the idea that your "db
>> polling" interval and streaming interval are not tied together.
>>
>> -Ashic.
>>
>> --
>> Date: Mon, 6 Jul 2015 01:12:34 +1000
>> Subject: Re: JDBC Streams
>> From: guha.a...@gmail.com
>> To: as...@live.com
>> CC: ak...@sigmoidanalytics.com; user@spark.apache.org
>>
>>
>> Hi
>>
>> Thanks for the reply. here is my situation: I hve a DB which enbles
>> synchronus CDC, think this as a DBtrigger which writes to a taable with
>> "changed" values as soon as something changes in production table. My job
>> will need to pick up the data "as soon as it arrives" which can be every 1
>> min interval. Ideally it will pick up the changes, transform it into a
>> jsonand puts it to kinesis. In short, I am emulating a Kinesis producer
>> with a DB source (dont even ask why, lets say these are the constraints :) )
>>
>> Please advice (a) is spark a good choice here (b)  whats your suggestion
>> either way.
>>
>> I understand I can easily do it using a simple java/python app but I am
>> little worried about managing scaling/fault tolerance and thats where my
>> concern is.
>>
>> TIA
>> Ayan
>>
>> On Mon, Jul 6, 2015 at 12:51 AM, Ashic Mahtab  wrote:
>>
>> Hi Ayan,
>> How "continuous" is your workload? As Akhil points out, with streaming,
>> you'll give up at least one core for receiving, will need at most one more
>> core for processing. Unless you're running on something like Mesos, this
>> means that those cores are dedicated to your app, and can't be leveraged by
>> other apps / jobs.
>>
>> If it's something periodic (once an hour, once every 15 minutes, etc.),
>> then I'd simply write a "normal" spark application, and trigger it
>> periodically. There are many things that can take care of that - sometimes
>> a simple cronjob is enough!
>>
>> --
>> Date: Sun, 5 Jul 2015 22:48:37 +1000
>> Subject: Re: JDBC Streams
>> From: guha.a...@gmail.com
>> To: ak...@sigmoidanalytics.com
>> CC: user@spark.apache.org
>>
>>
>> Thanks Akhil. In case I go with spark streaming, I guess I have to
>> implment a custom receiver and spark streaming will call this receiver
>> every batch interval, is that correct? Any gotcha you see in this plan?
>> TIA...Best, Ayan
>>
>> On Sun, Jul 5, 2015 at 5:40 PM, Akhil Das 
>> wrote:
>>
>> If you want a long running application, then go with spark streaming
>> (which kind of blocks your resources). On the other hand, if you use job
>> server then you can actually use the resources (CPUs) for other jobs also
>> when your dbjob is not using them.
>>
>> Thanks
>> Best Regards
>>
>> On Sun, Jul 5, 2015 at 5:28 AM, ayan guha  wrote:
>>
>> Hi All
>>
>> I have a requireent to connect to a DB every few minutes and bring data
>> to HBase. Can anyone suggest if spark streaming would be appropriate for
>> this senario or I sho

Building spark-examples takes too much time using Maven

2015-08-26 Thread Muhammad Haseeb Javed
I checked out the master branch and started playing around with the
examples. I want to build a jar  of the examples as I wish run them using
the modified spark jar that I have. However, packaging spark-examples takes
too much time as maven tries to download the jar dependencies rather than
use the jar that are already present int the system as I extended and
packaged spark itself locally?


Re: Build k-NN graph for large dataset

2015-08-26 Thread Michael Malak
Yes. And a paper that describes using grids (actually varying grids) is 
http://research.microsoft.com/en-us/um/people/jingdw/pubs%5CCVPR12-GraphConstruction.pdf
 In the Spark GraphX In Action book that Robin East and I are writing, we 
implement a drastically simplified version of this in chapter 7, which should 
become available in the MEAP mid-September. 
http://www.manning.com/books/spark-graphx-in-action

  From: Kristina Rogale Plazonic 
 To: Jaonary Rabarisoa  
Cc: user  
 Sent: Wednesday, August 26, 2015 7:24 AM
 Subject: Re: Build k-NN graph for large dataset
   
If you don't want to compute all N^2 similarities, you need to implement some 
kind of blocking first. For example, LSH (locally sensitive hashing). A quick 
search gave this link to a Spark implementation: 
http://stackoverflow.com/questions/2771/spark-implementation-for-locality-sensitive-hashing


On Wed, Aug 26, 2015 at 7:35 AM, Jaonary Rabarisoa  wrote:

Dear all,
I'm trying to find an efficient way to build a k-NN graph for a large dataset. 
Precisely, I have a large set of high dimensional vector (say d >>> 1) and 
I want to build a graph where those high dimensional points are the vertices 
and each one is linked to the k-nearest neighbor based on some kind similarity 
defined on the vertex spaces. 
My problem is to implement an efficient algorithm to compute the weight matrix 
of the graph. I need to compute a N*N similarities and the only way I know is 
to use "cartesian" operation follow by "map" operation on RDD. But, this is 
very slow when the N is large. Is there a more cleaver way to do this for an 
arbitrary similarity function ? 
Cheers,
Jao



  

Re: Build k-NN graph for large dataset

2015-08-26 Thread Charlie Hack
+1 to all of the above esp.  Dimensionality reduction and locality sensitive 
hashing / min hashing. 


There's also an algorithm implemented in MLlib called DIMSUM which was 
developed at Twitter for this purpose. I've been meaning to try it and would be 
interested to hear about results you get. 





https://blog.twitter.com/2014/all-pairs-similarity-via-dimsum













​Charlie 







—
Sent from Mailbox




On Wednesday, Aug 26, 2015 at 09:57, Michael Malak 
, wrote:


Yes. And a paper that describes using grids (actually varying grids) is 
http://research.microsoft.com/en-us/um/people/jingdw/pubs%5CCVPR12-GraphConstruction.pdf
 In the Spark GraphX In Action book that Robin East and I are writing, we 
implement a drastically simplified version of this in chapter 7, which should 
become available in the MEAP mid-September. 
http://www.manning.com/books/spark-graphx-in-action






   
 


If you don't want to compute all N^2 similarities, you need to implement some 
kind of blocking first. For example, LSH (locally sensitive hashing). A quick 
search gave this link to a Spark implementation: 

http://stackoverflow.com/questions/2771/spark-implementation-for-locality-sensitive-hashing












On Wed, Aug 26, 2015 at 7:35 AM, Jaonary Rabarisoa  wrote:

Dear all,


I'm trying to find an efficient way to build a k-NN graph for a large dataset. 
Precisely, I have a large set of high dimensional vector (say d >>> 1) and 
I want to build a graph where those high dimensional points are the vertices 
and each one is linked to the k-nearest neighbor based on some kind similarity 
defined on the vertex spaces. 
My problem is to implement an efficient algorithm to compute the weight matrix 
of the graph. I need to compute a N*N similarities and the only way I know is 
to use "cartesian" operation follow by "map" operation on RDD. But, this is 
very slow when the N is large. Is there a more cleaver way to do this for an 
arbitrary similarity function ? 




Cheers,




Jao

Re: DataFrame/JDBC very slow performance

2015-08-26 Thread Dhaval Patel
Thanks Michael, much appreciated!

Nothing should be held in memory for a query like this (other than a single
count per partition), so I don't think that is the problem.  There is
likely an error buried somewhere.

For your above comments - I don't get any error but just get the NULL as
return value. I have tried digging deeper in the logs etc but couldn't spot
anything. Is there any other suggestions to spot such buried errors?

Thanks,
Dhaval

On Mon, Aug 24, 2015 at 6:38 PM, Michael Armbrust 
wrote:

> Much appreciated! I am not comparing with "select count(*)" for
>> performance, but it was one simple thing I tried to check the performance
>> :). I think it now makes sense since Spark tries to extract all records
>> before doing the count. I thought having an aggregated function query
>> submitted over JDBC/Teradata would let Teradata do the heavy lifting.
>>
>
> We currently only push down filters since there is a lot of variability in
> what types of aggregations various databases support.  You can manually
> pushdown whatever you want by replacing the table name with a subquery
> (i.e. "(SELECT ... FROM ...)")
>
>- How come my second query for (5B) records didn't return anything
>> even after a long processing? If I understood correctly, Spark would try to
>> fit it in memory and if not then might use disk space, which I have
>> available?
>>
>
> Nothing should be held in memory for a query like this (other than a
> single count per partition), so I don't think that is the problem.  There
> is likely an error buried somewhere.
>
>
>>  - Am I supposed to do any Spark related tuning to make it work?
>>
>> My main need is to access data from these large table(s) on demand and
>> provide aggregated and calculated results much quicker, for that  I was
>> trying out Spark. Next step I am thinking to export data in Parque files
>> and give it a try. Do you have any suggestions for to deal with the problem?
>>
>
> Exporting to parquet will likely be a faster option that trying to query
> through JDBC, since we have many more opportunities for parallelism here.
>


Re: JDBC Streams

2015-08-26 Thread Chen Song
Thanks Cody.

Are you suggesting to put the cache in global context in each executor JVM,
in a Scala object for example. Then have a scheduled task to refresh the
cache (or triggered by the expiry if Guava)?

Chen

On Wed, Aug 26, 2015 at 10:51 AM, Cody Koeninger  wrote:

> If your data only changes every few days, why not restart the job every
> few days, and just broadcast the data?
>
> Or you can keep a local per-jvm cache with an expiry (e.g. guava cache) to
> avoid many mysql reads
>
> On Wed, Aug 26, 2015 at 9:46 AM, Chen Song  wrote:
>
>> Piggyback on this question.
>>
>> I have a similar use case but a bit different. My job is consuming a
>> stream from Kafka and I need to join the Kafka stream with some reference
>> table from MySQL (kind of data validation and enrichment). I need to
>> process this stream every 1 min. The data in MySQL is not changed very
>> often, maybe once a few days.
>>
>> So my requirement is:
>>
>> * I cannot easily use broadcast variable because the data does change,
>> although not very often.
>> * I am not sure if it is good practice to read data from MySQL in every
>> batch (in my case, 1 min).
>>
>> Anyone has done this before, any suggestions and feedback is appreciated.
>>
>> Chen
>>
>>
>> On Sun, Jul 5, 2015 at 11:50 AM, Ashic Mahtab  wrote:
>>
>>> If it is indeed a reactive use case, then Spark Streaming would be a
>>> good choice.
>>>
>>> One approach worth considering - is it possible to receive a message via
>>> kafka (or some other queue). That'd not need any polling, and you could use
>>> standard consumers. If polling isn't an issue, then writing a custom
>>> receiver will work fine. The way a receiver works is this:
>>>
>>> * Your receiver has a receive() function, where you'd typically start a
>>> loop. In your loop, you'd fetch items, and call store(entry).
>>> * You control everything in the receiver. If you're listening on a
>>> queue, you receive messages, store() and ack your queue. If you're polling,
>>> it's up to you to ensure delays between db calls.
>>> * The things you store() go on to make up the rdds in your DStream. So,
>>> intervals, windowing, etc. apply to those. The receiver is the boundary
>>> between your data source and the DStream RDDs. In other words, if your
>>> interval is 15 seconds with no windowing, then the things that went to
>>> store() every 15 seconds are bunched up into an RDD of your DStream. That's
>>> kind of a simplification, but should give you the idea that your "db
>>> polling" interval and streaming interval are not tied together.
>>>
>>> -Ashic.
>>>
>>> --
>>> Date: Mon, 6 Jul 2015 01:12:34 +1000
>>> Subject: Re: JDBC Streams
>>> From: guha.a...@gmail.com
>>> To: as...@live.com
>>> CC: ak...@sigmoidanalytics.com; user@spark.apache.org
>>>
>>>
>>> Hi
>>>
>>> Thanks for the reply. here is my situation: I hve a DB which enbles
>>> synchronus CDC, think this as a DBtrigger which writes to a taable with
>>> "changed" values as soon as something changes in production table. My job
>>> will need to pick up the data "as soon as it arrives" which can be every 1
>>> min interval. Ideally it will pick up the changes, transform it into a
>>> jsonand puts it to kinesis. In short, I am emulating a Kinesis producer
>>> with a DB source (dont even ask why, lets say these are the constraints :) )
>>>
>>> Please advice (a) is spark a good choice here (b)  whats your suggestion
>>> either way.
>>>
>>> I understand I can easily do it using a simple java/python app but I am
>>> little worried about managing scaling/fault tolerance and thats where my
>>> concern is.
>>>
>>> TIA
>>> Ayan
>>>
>>> On Mon, Jul 6, 2015 at 12:51 AM, Ashic Mahtab  wrote:
>>>
>>> Hi Ayan,
>>> How "continuous" is your workload? As Akhil points out, with streaming,
>>> you'll give up at least one core for receiving, will need at most one more
>>> core for processing. Unless you're running on something like Mesos, this
>>> means that those cores are dedicated to your app, and can't be leveraged by
>>> other apps / jobs.
>>>
>>> If it's something periodic (once an hour, once every 15 minutes, etc.),
>>> then I'd simply write a "normal" spark application, and trigger it
>>> periodically. There are many things that can take care of that - sometimes
>>> a simple cronjob is enough!
>>>
>>> --
>>> Date: Sun, 5 Jul 2015 22:48:37 +1000
>>> Subject: Re: JDBC Streams
>>> From: guha.a...@gmail.com
>>> To: ak...@sigmoidanalytics.com
>>> CC: user@spark.apache.org
>>>
>>>
>>> Thanks Akhil. In case I go with spark streaming, I guess I have to
>>> implment a custom receiver and spark streaming will call this receiver
>>> every batch interval, is that correct? Any gotcha you see in this plan?
>>> TIA...Best, Ayan
>>>
>>> On Sun, Jul 5, 2015 at 5:40 PM, Akhil Das 
>>> wrote:
>>>
>>> If you want a long running application, then go with spark streaming
>>> (which kind of blocks your resources). On the other hand, if you use job

Re: JDBC Streams

2015-08-26 Thread Cody Koeninger
Yes

On Wed, Aug 26, 2015 at 10:23 AM, Chen Song  wrote:

> Thanks Cody.
>
> Are you suggesting to put the cache in global context in each executor
> JVM, in a Scala object for example. Then have a scheduled task to refresh
> the cache (or triggered by the expiry if Guava)?
>
> Chen
>
> On Wed, Aug 26, 2015 at 10:51 AM, Cody Koeninger 
> wrote:
>
>> If your data only changes every few days, why not restart the job every
>> few days, and just broadcast the data?
>>
>> Or you can keep a local per-jvm cache with an expiry (e.g. guava cache)
>> to avoid many mysql reads
>>
>> On Wed, Aug 26, 2015 at 9:46 AM, Chen Song 
>> wrote:
>>
>>> Piggyback on this question.
>>>
>>> I have a similar use case but a bit different. My job is consuming a
>>> stream from Kafka and I need to join the Kafka stream with some reference
>>> table from MySQL (kind of data validation and enrichment). I need to
>>> process this stream every 1 min. The data in MySQL is not changed very
>>> often, maybe once a few days.
>>>
>>> So my requirement is:
>>>
>>> * I cannot easily use broadcast variable because the data does change,
>>> although not very often.
>>> * I am not sure if it is good practice to read data from MySQL in every
>>> batch (in my case, 1 min).
>>>
>>> Anyone has done this before, any suggestions and feedback is appreciated.
>>>
>>> Chen
>>>
>>>
>>> On Sun, Jul 5, 2015 at 11:50 AM, Ashic Mahtab  wrote:
>>>
 If it is indeed a reactive use case, then Spark Streaming would be a
 good choice.

 One approach worth considering - is it possible to receive a message
 via kafka (or some other queue). That'd not need any polling, and you could
 use standard consumers. If polling isn't an issue, then writing a custom
 receiver will work fine. The way a receiver works is this:

 * Your receiver has a receive() function, where you'd typically start a
 loop. In your loop, you'd fetch items, and call store(entry).
 * You control everything in the receiver. If you're listening on a
 queue, you receive messages, store() and ack your queue. If you're polling,
 it's up to you to ensure delays between db calls.
 * The things you store() go on to make up the rdds in your DStream. So,
 intervals, windowing, etc. apply to those. The receiver is the boundary
 between your data source and the DStream RDDs. In other words, if your
 interval is 15 seconds with no windowing, then the things that went to
 store() every 15 seconds are bunched up into an RDD of your DStream. That's
 kind of a simplification, but should give you the idea that your "db
 polling" interval and streaming interval are not tied together.

 -Ashic.

 --
 Date: Mon, 6 Jul 2015 01:12:34 +1000
 Subject: Re: JDBC Streams
 From: guha.a...@gmail.com
 To: as...@live.com
 CC: ak...@sigmoidanalytics.com; user@spark.apache.org


 Hi

 Thanks for the reply. here is my situation: I hve a DB which enbles
 synchronus CDC, think this as a DBtrigger which writes to a taable with
 "changed" values as soon as something changes in production table. My job
 will need to pick up the data "as soon as it arrives" which can be every 1
 min interval. Ideally it will pick up the changes, transform it into a
 jsonand puts it to kinesis. In short, I am emulating a Kinesis producer
 with a DB source (dont even ask why, lets say these are the constraints :) 
 )

 Please advice (a) is spark a good choice here (b)  whats your
 suggestion either way.

 I understand I can easily do it using a simple java/python app but I am
 little worried about managing scaling/fault tolerance and thats where my
 concern is.

 TIA
 Ayan

 On Mon, Jul 6, 2015 at 12:51 AM, Ashic Mahtab  wrote:

 Hi Ayan,
 How "continuous" is your workload? As Akhil points out, with streaming,
 you'll give up at least one core for receiving, will need at most one more
 core for processing. Unless you're running on something like Mesos, this
 means that those cores are dedicated to your app, and can't be leveraged by
 other apps / jobs.

 If it's something periodic (once an hour, once every 15 minutes, etc.),
 then I'd simply write a "normal" spark application, and trigger it
 periodically. There are many things that can take care of that - sometimes
 a simple cronjob is enough!

 --
 Date: Sun, 5 Jul 2015 22:48:37 +1000
 Subject: Re: JDBC Streams
 From: guha.a...@gmail.com
 To: ak...@sigmoidanalytics.com
 CC: user@spark.apache.org


 Thanks Akhil. In case I go with spark streaming, I guess I have to
 implment a custom receiver and spark streaming will call this receiver
 every batch interval, is that correct? Any gotcha you see in this plan?
 TIA...Best, Ayan

 On Sun, Jul 5,

spark streaming 1.3 kafka buffer size

2015-08-26 Thread Shushant Arora
whats the default buffer in spark streaming 1.3 for  kafka messages.

Say In this run it has to fetch messages from offset 1 to 1. will it
fetch all in one go or internally it fetches messages in  few messages
batch.

Is there any setting to configure this no of offsets fetched in one batch?


Re: How to add a new column with date duration from 2 date columns in a dataframe

2015-08-26 Thread Dhaval Patel
Thanks Davies. HiveContext seems neat to use :)

On Thu, Aug 20, 2015 at 3:02 PM, Davies Liu  wrote:

> As Aram said, there two options in Spark 1.4,
>
> 1) Use the HiveContext, then you got datediff from Hive,
> df.selectExpr("datediff(d2, d1)")
> 2) Use Python UDF:
> ```
> >>> from datetime import date
> >>> df = sqlContext.createDataFrame([(date(2008, 8, 18), date(2008, 9,
> 26))], ['d1', 'd2'])
> >>> from pyspark.sql.functions import udf
> >>> from pyspark.sql.types import IntegerType
> >>> diff = udf(lambda a, b: (a - b).days, IntegerType())
> >>> df.select(diff(df.d1, df.d2)).show()
> +-+
> |PythonUDF#(d1,d2)|
> +-+
> |  -39|
> +-+
> ```
>
> On Thu, Aug 20, 2015 at 7:45 AM, Aram Mkrtchyan
>  wrote:
> > Hi,
> >
> > hope this will help you
> >
> > import org.apache.spark.sql.functions._
> > import sqlContext.implicits._
> > import java.sql.Timestamp
> >
> > val df = sc.parallelize(Array((date1, date2))).toDF("day1", "day2")
> >
> > val dateDiff = udf[Long, Timestamp, Timestamp]((value1, value2) =>
> >   Days.daysBetween(new DateTime(value2.getTime), new
> > DateTime(value1.getTime)).getDays)
> > df.withColumn("diff", dateDiff(df("day2"), df("day1"))).show()
> >
> > or you can write sql query using hiveql's datediff function.
> >  https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF
> >
> > On Thu, Aug 20, 2015 at 4:57 PM, Dhaval Patel 
> wrote:
> >>
> >> More update on this question..I am using spark 1.4.1.
> >>
> >> I was just reading documentation of spark 1.5 (still in development)
> and I
> >> think there will be a new func *datediff* that will solve the issue. So
> >> please let me know if there is any work-around until spark 1.5 is out
> :).
> >>
> >> pyspark.sql.functions.datediff(end, start)[source]
> >>
> >> Returns the number of days from start to end.
> >>
> >> >>> df = sqlContext.createDataFrame([('2015-04-08','2015-05-10')],
> ['d1',
> >> >>> 'd2'])
> >> >>> df.select(datediff(df.d2, df.d1).alias('diff')).collect()
> >> [Row(diff=32)]
> >>
> >> New in version 1.5.
> >>
> >>
> >> On Thu, Aug 20, 2015 at 8:26 AM, Dhaval Patel 
> >> wrote:
> >>>
> >>> Apologies, sent too early accidentally. Actual message is below
> >>> 
> >>>
> >>> A dataframe has 2 datecolumns (datetime type) and I would like to add
> >>> another column that would have difference between these two dates.
> Dataframe
> >>> snippet is below.
> >>>
> >>> new_df.show(5)
> >>> +---+--+--+
> >>> | PATID| SVCDATE|next_diag_date|
> >>> +---+--+--+
> >>> |12345655545|2012-02-13| 2012-02-13|
> >>> |12345655545|2012-02-13| 2012-02-13|
> >>> |12345655545|2012-02-13| 2012-02-27|
> >>> +---+--+--+
> >>>
> >>>
> >>>
> >>> Here is what I have tried so far:
> >>>
> >>> -> new_df.withColumn('SVCDATE2',
> >>> (new_df.next_diag_date-new_df.SVCDATE)).show()
> >>> Error: DateType does not support numeric operations
> >>>
> >>> -> new_df.withColumn('SVCDATE2',
> >>> (new_df.next_diag_date-new_df.SVCDATE).days).show()
> >>> Error: Can't extract value from (next_diag_date#927 - SVCDATE#377);
> >>>
> >>>
> >>> However this simple python code works fine with pySpark:
> >>>
> >>> from datetime import date
> >>> d0 = date(2008, 8, 18)
> >>> d1 = date(2008, 9, 26)
> >>> delta = d0 - d1
> >>> print (d0 - d1).days
> >>>
> >>> # -39
> >>>
> >>>
> >>> Any suggestions would be appreciated! Also is there a way to add a new
> >>> column in dataframe without using column expression (e.g. like in
> pandas or
> >>> R. df$new_col = 'new col value')?
> >>>
> >>>
> >>> Thanks,
> >>> Dhaval
> >>>
> >>>
> >>>
> >>> On Thu, Aug 20, 2015 at 8:18 AM, Dhaval Patel 
> >>> wrote:
> 
>  new_df.withColumn('SVCDATE2',
>  (new_df.next_diag_date-new_df.SVCDATE).days).show()
> 
>  +---+--+--+ | PATID|
> SVCDATE|next_diag_date|
>  +---+--+--+ |12345655545|2012-02-13|
> 2012-02-13|
>  |12345655545|2012-02-13| 2012-02-13| |12345655545|2012-02-13|
> 2012-02-27|
>  +---+--+--+
> >>>
> >>>
> >>
> >
>


Efficient sampling from a Hive table

2015-08-26 Thread Thomas Dudziak
I have a sizeable table (2.5T, 1b rows) that I want to get ~100m rows from
and I don't particularly care which rows. Doing a LIMIT unfortunately
results in two stages where the first stage reads the whole table, and the
second then performs the limit with a single worker, which is not very
efficient.
Is there a better way to sample a subset of rows in Spark without, ideally
in a single stage without reading all partitions.

cheers,
Tom


Re: spark streaming 1.3 kafka buffer size

2015-08-26 Thread Cody Koeninger
see http://kafka.apache.org/documentation.html#consumerconfigs

fetch.message.max.bytes

in the kafka params passed to the constructor


On Wed, Aug 26, 2015 at 10:39 AM, Shushant Arora 
wrote:

> whats the default buffer in spark streaming 1.3 for  kafka messages.
>
> Say In this run it has to fetch messages from offset 1 to 1. will it
> fetch all in one go or internally it fetches messages in  few messages
> batch.
>
> Is there any setting to configure this no of offsets fetched in one batch?
>


Re: Building spark-examples takes too much time using Maven

2015-08-26 Thread Ted Yu
Can you provide a bit more information ?

Are Spark artifacts packaged by you have the same names / paths (in maven
repo) as the ones published by Apache Spark ?

Is Zinc running on the machine where you performed the build ?

Cheers

On Wed, Aug 26, 2015 at 7:56 AM, Muhammad Haseeb Javed <
11besemja...@seecs.edu.pk> wrote:

> I checked out the master branch and started playing around with the
> examples. I want to build a jar  of the examples as I wish run them using
> the modified spark jar that I have. However, packaging spark-examples takes
> too much time as maven tries to download the jar dependencies rather than
> use the jar that are already present int the system as I extended and
> packaged spark itself locally?
>


Re: Efficient sampling from a Hive table

2015-08-26 Thread Thomas Dudziak
Sorry, I meant without reading from all splits. This is a single partition
in the table.

On Wed, Aug 26, 2015 at 8:53 AM, Thomas Dudziak  wrote:

> I have a sizeable table (2.5T, 1b rows) that I want to get ~100m rows from
> and I don't particularly care which rows. Doing a LIMIT unfortunately
> results in two stages where the first stage reads the whole table, and the
> second then performs the limit with a single worker, which is not very
> efficient.
> Is there a better way to sample a subset of rows in Spark without, ideally
> in a single stage without reading all partitions.
>
> cheers,
> Tom
>


Just Released V1.0.4 Low Level Receiver Based Kafka-Spark-Consumer in Spark Packages having built-in Back Pressure Controller

2015-08-26 Thread Dibyendu Bhattacharya
Dear All,

Just now released the 1.0.4 version of Low Level Receiver based
Kafka-Spark-Consumer in spark-packages.org .  You can find the latest
release here :
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer

Here is github location : https://github.com/dibbhatt/kafka-spark-consumer

This consumer is now have built in PID ( Proportional , Integral,
Derivative ) Rate controller to control the Spark Back-Pressure .

This consumer implemented the Rate Limiting logic not by controlling the
number of messages per block ( as it is done in Spark's Out of Box
Consumers), but by size of the blocks per batch. i.e. for any given batch,
this consumer controls the Rate limit by controlling the size of the
batches. As Spark memory is driven by block size rather the number of
messages , I think rate limit by block size is more appropriate. e.g. Let
assume Kafka contains messages of very small sizes ( say few hundred bytes
) to larger messages ( to few hundred KB ) for same topic. Now if we
control the rate limit by number of messages, Block sizes may vary
drastically based on what type of messages get pulled per block . Whereas ,
if I control my rate limiting by size of block, my block size remain
constant across batches (even though number of messages differ across
blocks ) and can help to tune my memory settings more correctly as I know
how much exact memory my Block is going to consume.


This Consumer has its own PID (Proportional, Integral, Derivative )
Controller built into the consumer and control the Spark Back Pressure by
modifying the size of Block it can consume at run time. The PID Controller
rate feedback mechanism is built using Zookeeper. Again the logic to
control Back Pressure is not by controlling number of messages ( as it is
done in Spark 1.5 , SPARK-7398) but altering size of the Block consumed per
batch from Kafka. As the Back Pressure is built into the Consumer, this
consumer can be used with any version of Spark if anyone want to have a
back pressure controlling mechanism in their existing Spark / Kafka
environment.

Regards,
Dibyendu


Re: Efficient sampling from a Hive table

2015-08-26 Thread Jörn Franke
Have you tried tablesample? You find the exact syntax in the documentation,
but it exlxactly does what you want

Le mer. 26 août 2015 à 18:12, Thomas Dudziak  a écrit :

> Sorry, I meant without reading from all splits. This is a single partition
> in the table.
>
> On Wed, Aug 26, 2015 at 8:53 AM, Thomas Dudziak  wrote:
>
>> I have a sizeable table (2.5T, 1b rows) that I want to get ~100m rows
>> from and I don't particularly care which rows. Doing a LIMIT unfortunately
>> results in two stages where the first stage reads the whole table, and the
>> second then performs the limit with a single worker, which is not very
>> efficient.
>> Is there a better way to sample a subset of rows in Spark without,
>> ideally in a single stage without reading all partitions.
>>
>> cheers,
>> Tom
>>
>
>


Re: JDBC Streams

2015-08-26 Thread Jörn Franke
I would use Sqoop. It has been designed exactly for these types of
scenarios. Spark streaming does not make sense here

Le dim. 5 juil. 2015 à 1:59, ayan guha  a écrit :

> Hi All
>
> I have a requireent to connect to a DB every few minutes and bring data to
> HBase. Can anyone suggest if spark streaming would be appropriate for this
> senario or I shoud look into jobserver?
>
> Thanks in advance
>
>
> --
> Best Regards,
> Ayan Guha
>


Persisting sorted parquet tables for future sort merge joins

2015-08-26 Thread Jason
I want to persist a large _sorted_ table to Parquet on S3 and then read
this in and join it using the Sorted Merge Join strategy against another
large sorted table.

The problem is: even though I sort these tables on the join key beforehand,
once I persist them to Parquet, they lose the information about their
sortedness. Is there anyway to hint to Spark that they do not need to be
resorted the next time I read them in?

I've been trying this on 1.5 and I keep getting plans looking like:
[== Physical Plan ==]
[TungstenProject [pos#28400,workf...#28399]]
[ SortMergeJoin [CHROM#28403,pos#28400], [CHROM#28399,pos#28332]]
[ TungstenSort [CHROM#28403 ASC,pos#28400 ASC], false, 0]
[ TungstenExchange hashpartitioning(CHROM#28403,pos#28400)]
[ ConvertToUnsafe]
[ Scan ParquetRelation[file:/sorted.parquet][pos#2848424]]
[ TungstenSort [CHROM#28399 ASC,pos#28332 ASC], false, 0]
[ TungstenExchange hashpartitioning(CHROM#28399,pos#28332)]
[ ConvertToUnsafe]
[ Scan ParquetRelation[file:exploded_sorted.parquet][pos#2.399]]

As you can see, this plan exchanges and sorts the data before performing
the SortMergeJoin even though these parquet tables are already sorted.

Thanks,
Jason


Re: Spark 1.3.1 saveAsParquetFile hangs on app exit

2015-08-26 Thread cingram
spark-shell-hang-on-exit.tdump

  





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-3-1-saveAsParquetFile-hangs-on-app-exit-tp24460p24461.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark cluster multi tenancy

2015-08-26 Thread Sadhan Sood
Hi All,

We've set up our spark cluster on aws running on yarn (running on hadoop
2.3) with fair scheduling and preemption turned on. The cluster is shared
for prod and dev work where prod runs with a higher fair share and can
preempt dev jobs if there are not enough resources available for it.
It appears that dev jobs which get preempted often get unstable after
losing some executors and the whole jobs gets stuck (without making any
progress) or end up getting restarted (and hence losing all the work done).
Has someone encountered this before ? Is the solution just to set
spark.task.maxFailures
to a really high value to recover from task failures in such scenarios? Are
there other approaches that people have taken for spark multi tenancy that
works better in such scenario?

Thanks,
Sadhan


Re: How to unit test HiveContext without OutOfMemoryError (using sbt)

2015-08-26 Thread Mike Trienis
Thanks for your response Yana,

I can increase the MaxPermSize parameter and it will allow me to run the
unit test a few more times before I run out of memory.

However, the primary issue is that running the same unit test in the same
JVM (multiple times) results in increased memory (each run of the unit
test) and I believe it has something to do with HiveContext not reclaiming
memory after it is finished (or I'm not shutting it down properly).

It could very well be related to sbt, however, it's not clear to me.


On Tue, Aug 25, 2015 at 1:12 PM, Yana Kadiyska 
wrote:

> The PermGen space error is controlled with MaxPermSize parameter. I run
> with this in my pom, I think copied pretty literally from Spark's own
> tests... I don't know what the sbt equivalent is but you should be able to
> pass it...possibly via SBT_OPTS?
>
>
>  
>   org.scalatest
>   scalatest-maven-plugin
>   1.0
>   
>
> ${project.build.directory}/surefire-reports
>   false
>   .
>   SparkTestSuite.txt
>   -Xmx3g -XX:MaxPermSize=256m
> -XX:ReservedCodeCacheSize=512m
>   
>   
>   true
>   1
>   false
>
> true
>   
>   
>   
>   
>   test
>   
>   test
>   
>   
>   
>   
>   
>
>
> On Tue, Aug 25, 2015 at 2:10 PM, Mike Trienis 
> wrote:
>
>> Hello,
>>
>> I am using sbt and created a unit test where I create a `HiveContext` and
>> execute some query and then return. Each time I run the unit test the JVM
>> will increase it's memory usage until I get the error:
>>
>> Internal error when running tests: java.lang.OutOfMemoryError: PermGen
>> space
>> Exception in thread "Thread-2" java.io.EOFException
>>
>> As a work-around, I can fork a new JVM each time I run the unit test,
>> however, it seems like a bad solution as takes a while to run the unit
>> test.
>>
>> By the way, I tried to importing the TestHiveContext:
>>
>>- import org.apache.spark.sql.hive.test.TestHiveContext
>>
>> However, it suffers from the same memory issue. Has anyone else suffered
>> from the same problem? Note that I am running these unit tests on my mac.
>>
>> Cheers, Mike.
>>
>>
>


Re: Efficient sampling from a Hive table

2015-08-26 Thread Thomas Dudziak
Using TABLESAMPLE(0.1) is actually way worse. Spark first spends 12 minutes
to discover all split files on all hosts (for some reason) before it even
starts the job, and then it creates 3.5 million tasks (the partition has
~32k split files).

On Wed, Aug 26, 2015 at 9:36 AM, Jörn Franke  wrote:

>
> Have you tried tablesample? You find the exact syntax in the
> documentation, but it exlxactly does what you want
>
> Le mer. 26 août 2015 à 18:12, Thomas Dudziak  a écrit :
>
>> Sorry, I meant without reading from all splits. This is a single
>> partition in the table.
>>
>> On Wed, Aug 26, 2015 at 8:53 AM, Thomas Dudziak  wrote:
>>
>>> I have a sizeable table (2.5T, 1b rows) that I want to get ~100m rows
>>> from and I don't particularly care which rows. Doing a LIMIT unfortunately
>>> results in two stages where the first stage reads the whole table, and the
>>> second then performs the limit with a single worker, which is not very
>>> efficient.
>>> Is there a better way to sample a subset of rows in Spark without,
>>> ideally in a single stage without reading all partitions.
>>>
>>> cheers,
>>> Tom
>>>
>>
>>


Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

2015-08-26 Thread Susan Zhang
Thanks for the suggestions! I tried the following:

I removed

createOnError = true

And reran the same process to reproduce. Double checked that checkpoint is
loading:

15/08/26 10:10:40 INFO
DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
KafkaRDD for time 1440608825000 ms [(install-json,5,825898270,825898528),
(install-json,4,825400856,825401058), (install-json,1,831453228,831453396),
(install-json,0,1295759888,1295760378),
(install-json,2,824443526,82409), (install-json,3,
811222580,811222874)]
15/08/26 10:10:40 INFO
DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
KafkaRDD for time 144060883 ms [(install-json,5,825898528,825898791),
(install-json,4,825401058,825401249), (install-json,1,831453396,831453603),
(install-json,0,1295760378,1295760809),
(install-json,2,82409,824445510), (install-json,3,
811222874,811223285)]
...

And the same issue is appearing as before (with 0 event batches getting
queued corresponding to dropped messages). Our kafka brokers are on version
0.8.2.0, if that makes a difference.

Also as a sanity check, I took out the ZK updates and reran (just in case
that was somehow causing problems), and that didn't change anything as
expected.

Furthermore, the 0 event batches seem to take longer to process than
batches with the regular load of events: processing time for 0 event
batches can be upwards of 1 - 2 minutes, whereas processing time for ~2000
event batches is consistently < 1s. Why would that happen?


As for the checkpoint call:

directKStream.checkpoint(checkpointDuration)

was an attempt to set the checkpointing interval (at some multiple of the
batch interval), whereas StreamingContext.checkpoint seems like it will
only set the checkpoint directory.



Thanks for all the help,

Susan


On Wed, Aug 26, 2015 at 7:12 AM, Cody Koeninger  wrote:

> The first thing that stands out to me is
> createOnError = true
>
> Are you sure the checkpoint is actually loading, as opposed to failing and
> starting the job anyway?  There should be info lines that look like
>
> INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData:
> Restoring KafkaRDD for time 144059718 ms [(test,1,162,220)
>
>
> You should be able to tell from those whether the offset ranges being
> loaded from the checkpoint look reasonable.
>
> Also, is there a reason you're calling
>
> directKStream.checkpoint(checkpointDuration)
>
> Just calling checkpoint on the streaming context should be sufficient to
> save the metadata
>
>
>
> On Tue, Aug 25, 2015 at 2:36 PM, Susan Zhang  wrote:
>
>> Sure thing!
>>
>> The main looks like:
>>
>>
>> --
>>
>>
>> val kafkaBrokers = conf.getString(s"$varPrefix.metadata.broker.list")
>>
>> val kafkaConf = Map(
>>   "zookeeper.connect" -> zookeeper,
>>   "group.id" -> options.group,
>>   "zookeeper.connection.timeout.ms" -> "1",
>>   "auto.commit.interval.ms" -> "1000",
>>   "rebalance.max.retries" -> "25",
>>   "bootstrap.servers" -> kafkaBrokers
>> )
>>
>> val ssc = StreamingContext.getOrCreate(checkpointDirectory,
>>   () => {
>> createContext(kafkaConf, checkpointDirectory, topic, numThreads,
>> isProd)
>>   }, createOnError = true)
>>
>> ssc.start()
>> ssc.awaitTermination()
>>
>>
>>
>> --
>>
>>
>> And createContext is defined as:
>>
>>
>>
>> --
>>
>>
>> val batchDuration = Seconds(5)
>> val checkpointDuration = Seconds(20)
>>
>> private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
>>
>> def createContext(kafkaConf: Map[String, String],
>> checkpointDirectory: String,
>> topic: String,
>> numThreads: Int,
>> isProd: Boolean)
>>   : StreamingContext = {
>>
>> val sparkConf = new SparkConf().setAppName("***")
>> val ssc = new StreamingContext(sparkConf, batchDuration)
>> ssc.checkpoint(checkpointDirectory)
>>
>> val topicSet = topic.split(",").toSet
>> val groupId = kafkaConf.getOrElse("group.id", "")
>>
>> val directKStream = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaConf, topicSet)
>> directKStream.checkpoint(checkpointDuration)
>>
>> val table = ***
>>
>> directKStream.foreachRDD { rdd =>
>>   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>   rdd.flatMap(rec => someFunc(rec))
>> .reduceByKey((i1: Long, i2: Long) => if (i1 > i2) i1 else i2)
>> .foreachPartition { partitionRec =>
>>   val dbWrite = DynamoDBWriter()
>>   partitionRec.foreach {
>> /* Update Dynamo Here */
>>   }
>> }
>>
>>   /** Set up ZK Connection **/
>>   v

Feedback: Feature request

2015-08-26 Thread Murphy, James
Hey all,

In working with the DecisionTree classifier, I found it difficult to extract 
rules that could easily facilitate visualization with libraries like D3.

So for example, using : print(model.toDebugString()), I get the following 
result =

   If (feature 0 <= -35.0)
  If (feature 24 <= 176.0)
Predict: 2.1
  If (feature 24 = 176.0)
Predict: 4.2
  Else (feature 24 > 176.0)
Predict: 6.3
Else (feature 0 > -35.0)
  If (feature 24 <= 11.0)
Predict: 4.5
  Else (feature 24 > 11.0)
Predict: 10.2

But ideally, I could see results in a more parseable format like JSON:


{

"node": [

{

"name":"node1",

"rule":"feature 0 <= -35.0",

"children":[

{

  "name":"node2",

  "rule":"feature 24 <= 176.0",

  "children":[

  {

  "name":"node4",

  "rule":"feature 20 < 116.0",

  "predict":  2.1

  },

  {

  "name":"node5",

  "rule":"feature 20 = 116.0",

  "predict": 4.2

  },

  {

  "name":"node5",

  "rule":"feature 20 > 116.0",

  "predict": 6.3

  }

  ]

},

{

"name":"node3",

"rule":"feature 0 > -35.0",

  "children":[

  {

  "name":"node7",

  "rule":"feature 3 <= 11.0",

  "predict": 4.5

  },

  {

  "name":"node8",

  "rule":"feature 3 > 11.0",

  "predict": 10.2

  }

  ]

}



]

}

]

}

Food for thought!

Thanks,

Jim



query avro hive table in spark sql

2015-08-26 Thread gpatcham
Hi,

I'm trying to query hive table which is based on avro in spark SQL and
seeing below errors.

15/08/26 17:51:12 WARN avro.AvroSerdeUtils: Encountered AvroSerdeException
determining schema. Returning signal schema to indicate problem
org.apache.hadoop.hive.serde2.avro.AvroSerdeException: Neither
avro.schema.literal nor avro.schema.url specified, can't determine table
schema
at
org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.determineSchemaOrThrowException(AvroSerdeUtils.java:68)
at
org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.determineSchemaOrReturnErrorSchema(AvroSerdeUtils.java:93)
at
org.apache.hadoop.hive.serde2.avro.AvroSerDe.initialize(AvroSerDe.java:60)
at
org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:375)
at
org.apache.hadoop.hive.ql.metadata.Partition.getDeserializer(Partition.java:249)


Its not able to determine schema. Hive table is pointing to avro schema
using url. I'm stuck and couldn't find more info on this. 

Any pointers ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/query-avro-hive-table-in-spark-sql-tp24462.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark cluster multi tenancy

2015-08-26 Thread Sadhan Sood
Attaching log for when the dev job gets stuck (once all its executors are
lost due to preemption). This is a spark-shell job running in yarn-client
mode.

On Wed, Aug 26, 2015 at 10:45 AM, Sadhan Sood  wrote:

> Hi All,
>
> We've set up our spark cluster on aws running on yarn (running on hadoop
> 2.3) with fair scheduling and preemption turned on. The cluster is shared
> for prod and dev work where prod runs with a higher fair share and can
> preempt dev jobs if there are not enough resources available for it.
> It appears that dev jobs which get preempted often get unstable after
> losing some executors and the whole jobs gets stuck (without making any
> progress) or end up getting restarted (and hence losing all the work done).
> Has someone encountered this before ? Is the solution just to set 
> spark.task.maxFailures
> to a really high value to recover from task failures in such scenarios? Are
> there other approaches that people have taken for spark multi tenancy that
> works better in such scenario?
>
> Thanks,
> Sadhan
>


spark_job_stuck.log
Description: Binary data

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

2015-08-26 Thread Cody Koeninger
When the kafka rdd is actually being iterated on the worker, there should
be an info line of the form

log.info(s"Computing topic ${part.topic}, partition ${part.partition} "
+

  s"offsets ${part.fromOffset} -> ${part.untilOffset}")


You should be able to compare that to log of offsets during checkpoint
loading, to see if they line up.

Just out of curiosity, does removing the call to checkpoint on the stream
affect anything?



On Wed, Aug 26, 2015 at 1:04 PM, Susan Zhang  wrote:

> Thanks for the suggestions! I tried the following:
>
> I removed
>
> createOnError = true
>
> And reran the same process to reproduce. Double checked that checkpoint is
> loading:
>
> 15/08/26 10:10:40 INFO
> DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
> KafkaRDD for time 1440608825000 ms [(install-json,5,825898270,825898528),
> (install-json,4,825400856,825401058),
> (install-json,1,831453228,831453396),
> (install-json,0,1295759888,1295760378),
> (install-json,2,824443526,82409), (install-json,3,
> 811222580,811222874)]
> 15/08/26 10:10:40 INFO
> DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
> KafkaRDD for time 144060883 ms [(install-json,5,825898528,825898791),
> (install-json,4,825401058,825401249),
> (install-json,1,831453396,831453603),
> (install-json,0,1295760378,1295760809),
> (install-json,2,82409,824445510), (install-json,3,
> 811222874,811223285)]
> ...
>
> And the same issue is appearing as before (with 0 event batches getting
> queued corresponding to dropped messages). Our kafka brokers are on version
> 0.8.2.0, if that makes a difference.
>
> Also as a sanity check, I took out the ZK updates and reran (just in case
> that was somehow causing problems), and that didn't change anything as
> expected.
>
> Furthermore, the 0 event batches seem to take longer to process than
> batches with the regular load of events: processing time for 0 event
> batches can be upwards of 1 - 2 minutes, whereas processing time for ~2000
> event batches is consistently < 1s. Why would that happen?
>
>
> As for the checkpoint call:
>
> directKStream.checkpoint(checkpointDuration)
>
> was an attempt to set the checkpointing interval (at some multiple of the
> batch interval), whereas StreamingContext.checkpoint seems like it will
> only set the checkpoint directory.
>
>
>
> Thanks for all the help,
>
> Susan
>
>
> On Wed, Aug 26, 2015 at 7:12 AM, Cody Koeninger 
> wrote:
>
>> The first thing that stands out to me is
>> createOnError = true
>>
>> Are you sure the checkpoint is actually loading, as opposed to failing
>> and starting the job anyway?  There should be info lines that look like
>>
>> INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData:
>> Restoring KafkaRDD for time 144059718 ms [(test,1,162,220)
>>
>>
>> You should be able to tell from those whether the offset ranges being
>> loaded from the checkpoint look reasonable.
>>
>> Also, is there a reason you're calling
>>
>> directKStream.checkpoint(checkpointDuration)
>>
>> Just calling checkpoint on the streaming context should be sufficient to
>> save the metadata
>>
>>
>>
>> On Tue, Aug 25, 2015 at 2:36 PM, Susan Zhang 
>> wrote:
>>
>>> Sure thing!
>>>
>>> The main looks like:
>>>
>>>
>>> --
>>>
>>>
>>> val kafkaBrokers = conf.getString(s"$varPrefix.metadata.broker.list")
>>>
>>> val kafkaConf = Map(
>>>   "zookeeper.connect" -> zookeeper,
>>>   "group.id" -> options.group,
>>>   "zookeeper.connection.timeout.ms" -> "1",
>>>   "auto.commit.interval.ms" -> "1000",
>>>   "rebalance.max.retries" -> "25",
>>>   "bootstrap.servers" -> kafkaBrokers
>>> )
>>>
>>> val ssc = StreamingContext.getOrCreate(checkpointDirectory,
>>>   () => {
>>> createContext(kafkaConf, checkpointDirectory, topic, numThreads,
>>> isProd)
>>>   }, createOnError = true)
>>>
>>> ssc.start()
>>> ssc.awaitTermination()
>>>
>>>
>>>
>>> --
>>>
>>>
>>> And createContext is defined as:
>>>
>>>
>>>
>>> --
>>>
>>>
>>> val batchDuration = Seconds(5)
>>> val checkpointDuration = Seconds(20)
>>>
>>> private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
>>>
>>> def createContext(kafkaConf: Map[String, String],
>>> checkpointDirectory: String,
>>> topic: String,
>>> numThreads: Int,
>>> isProd: Boolean)
>>>   : StreamingContext = {
>>>
>>> val sparkConf = new SparkConf().setAppName("***")
>>> val ssc = new StreamingContext(sparkConf, batchDuration)
>>> ssc.checkpoint(checkpointDirectory)
>>>
>>> val topicSet = topic.split(",").toSet
>>> val groupId = kafkaConf.getOrElse("group.id", "")
>>>
>>> 

Re: Spark cluster multi tenancy

2015-08-26 Thread Sadhan Sood
Interestingly, if there is nothing running on dev spark-shell, it recovers
successfully and regains the lost executors. Attaching the log for that.
Notice, the "Registering block manager .." statements in the very end after
all executors were lost.

On Wed, Aug 26, 2015 at 11:27 AM, Sadhan Sood  wrote:

> Attaching log for when the dev job gets stuck (once all its executors are
> lost due to preemption). This is a spark-shell job running in yarn-client
> mode.
>
> On Wed, Aug 26, 2015 at 10:45 AM, Sadhan Sood 
> wrote:
>
>> Hi All,
>>
>> We've set up our spark cluster on aws running on yarn (running on hadoop
>> 2.3) with fair scheduling and preemption turned on. The cluster is shared
>> for prod and dev work where prod runs with a higher fair share and can
>> preempt dev jobs if there are not enough resources available for it.
>> It appears that dev jobs which get preempted often get unstable after
>> losing some executors and the whole jobs gets stuck (without making any
>> progress) or end up getting restarted (and hence losing all the work done).
>> Has someone encountered this before ? Is the solution just to set 
>> spark.task.maxFailures
>> to a really high value to recover from task failures in such scenarios? Are
>> there other approaches that people have taken for spark multi tenancy that
>> works better in such scenario?
>>
>> Thanks,
>> Sadhan
>>
>
>


spark_job_recovers.log
Description: Binary data

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Question on take function - Spark Java API

2015-08-26 Thread Pankaj Wahane
Thanks Sonal.. I shall try doing that..

> On 26-Aug-2015, at 1:05 pm, Sonal Goyal  wrote:
> 
> You can try using wholeTextFile which will give you a pair rdd of fileName, 
> content. flatMap through this and manipulate the content. 
> 
> Best Regards,
> Sonal
> Founder, Nube Technologies  
> Check out Reifier at Spark Summit 2015 
> 
> 
>  
> 
> 
> 
> On Wed, Aug 26, 2015 at 8:25 AM, Pankaj Wahane  > wrote:
> Hi community members,
> 
> 
>> Apache Spark is Fantastic and very easy to learn.. Awesome work!!!
>> 
>> Question:
>> 
>> I have multiple files in a folder and and the first line in each file is 
>> name of the asset that the file belongs to. Second line is csv header row 
>> and data starts from third row..
>> 
>> Ex: File 1
>> 
>> TestAsset01
>> Time,dp_1,dp_2,dp_3
>> 11-01-2015 15:00:00,123,456,789
>> 11-01-2015 15:00:01,123,456,789
>> . . .
>> 
>> Ex: File 2
>> 
>> TestAsset02
>> Time,dp_1,dp_2,dp_3
>> 11-01-2015 15:00:00,1230,4560,7890
>> 11-01-2015 15:00:01,1230,4560,7890
>> . . .
>> 
>> I have got nearly 1000 files in each folder sizing ~10G
>> 
>> I am using apache spark Java api to read all this files.
>> 
>> Following is code extract that I am using:
>> 
>> try (JavaSparkContext sc = new JavaSparkContext(conf)) {
>> Map readingTypeMap = getReadingTypesMap(sc);
>> //Read File
>> JavaRDD data = 
>> sc.textFile(resourceBundle.getString(FOLDER_NAME));
>> //Get Asset
>> String asset = data.take(1).get(0);
>> //Extract Time Series Data
>> JavaRDD actualData = data.filter(line -> 
>> line.contains(DELIMERTER));
>> //Strip header
>> String header = actualData.take(1).get(0);
>> String[] headers = header.split(DELIMERTER);
>> //Extract actual data
>> JavaRDD timeSeriesLines = actualData.filter(line -> 
>> !line.equals(header));
>> //Extract valid records
>> JavaRDD validated = timeSeriesLines.filter(line -> 
>> validate(line));
>> //Find Granularity
>> Integer granularity = 
>> toInt(resourceBundle.getString(GRANULARITY));
>> //Transform to TSD objects
>> JavaRDD tsdFlatMap = 
>> transformTotimeSeries(validated, asset, readingTypeMap, headers, 
>> granularity);
>> 
>> //Save to Cassandra
>> 
>> javaFunctions(tsdFlatMap).writerBuilder(resourceBundle.getString("cassandra.tsd.keyspace"),
>> "time_series_data", 
>> mapToRow(TimeSeriesData.class)).saveToCassandra();
>> 
>> System.out.println("Total Records: " + timeSeriesLines.count());
>> System.out.println("Valid Records: " + validated.count());
>> }
>> Within TimeSeriesData Object I need to set the asset name for the reading, 
>> so I need output of data.take(1) to be different for different files.
>> 
>> 
>> Thank You.
>> 
>> Best Regards,
>> Pankaj
>> 
>> 
> 
> 
> QIO Technologies Limited is a limited company registered in England & Wales 
> at 1 Curzon Street, London, England, W1J 5HD, with registered number 09368431 
> 
> This message and the information contained within it is intended solely for 
> the addressee and may contain confidential or privileged information. If you 
> have received this message in error please notify QIO Technologies Limited 
> immediately and then permanently delete this message. If you are not the 
> intended addressee then you must not copy, transmit, disclose or rely on the 
> information contained in this message or in any attachment to it, all such 
> use is prohibited to maximum extent possible by law.
> 
> 


-- 


QIO Technologies Limited is a limited company registered in England & Wales 
at 1 Curzon Street, London, England, W1J 5HD, with registered number 
09368431 

This message and the information contained within it is intended solely for 
the addressee and may contain confidential or privileged information. If 
you have received this message in error please notify QIO Technologies 
Limited immediately and then permanently delete this message. If you are 
not the intended addressee then you must not copy, transmit, disclose or 
rely on the information contained in this message or in any attachment to 
it, all such use is prohibited to maximum extent possible by law.


Can RDD be shared accross the cluster by other drivers?

2015-08-26 Thread Tao Lu
Hi, Guys,

Is it possible that RDD created by driver A be used driver B?

Thanks!


Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

2015-08-26 Thread Susan Zhang
Compared offsets, and it continues from checkpoint loading:

15/08/26 11:24:54 INFO
DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
KafkaRDD for time 1440612035000 ms [(install-json,5,826112083,826112446),
(install-json,4,825772921,825773536), (install-json,1,831654775,831655076),
(install-json,0,1296018451,1296018810),
(install-json,2,824785282,824785696), (install-json,3,
811428882,811429181)]

15/08/26 11:25:19 INFO kafka.KafkaRDD: Computing topic install-json,
partition 0 offsets 1296018451 -> 1296018810
15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
partition 4 offsets 825773536 -> 825907428
15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
partition 2 offsets 824785696 -> 824889957
15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
partition 3 offsets 811429181 -> 811529084
15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
partition 1 offsets 831655076 -> 831729964
...

But for some reason the streaming UI shows it as computing 0 events.

Removing the call to checkpoint does remove the queueing of 0 event
batches, since offsets just skip to the latest (checked that the first
part.fromOffset in the restarted job is larger than the last
part.untilOffset before restart).




On Wed, Aug 26, 2015 at 11:19 AM, Cody Koeninger  wrote:

> When the kafka rdd is actually being iterated on the worker, there should
> be an info line of the form
>
> log.info(s"Computing topic ${part.topic}, partition ${part.partition}
> " +
>
>   s"offsets ${part.fromOffset} -> ${part.untilOffset}")
>
>
> You should be able to compare that to log of offsets during checkpoint
> loading, to see if they line up.
>
> Just out of curiosity, does removing the call to checkpoint on the stream
> affect anything?
>
>
>
> On Wed, Aug 26, 2015 at 1:04 PM, Susan Zhang  wrote:
>
>> Thanks for the suggestions! I tried the following:
>>
>> I removed
>>
>> createOnError = true
>>
>> And reran the same process to reproduce. Double checked that checkpoint
>> is loading:
>>
>> 15/08/26 10:10:40 INFO
>> DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
>> KafkaRDD for time 1440608825000 ms [(install-json,5,825898270,825898528),
>> (install-json,4,825400856,825401058),
>> (install-json,1,831453228,831453396),
>> (install-json,0,1295759888,1295760378),
>> (install-json,2,824443526,82409), (install-json,3,
>> 811222580,811222874)]
>> 15/08/26 10:10:40 INFO
>> DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
>> KafkaRDD for time 144060883 ms [(install-json,5,825898528,825898791),
>> (install-json,4,825401058,825401249),
>> (install-json,1,831453396,831453603),
>> (install-json,0,1295760378,1295760809),
>> (install-json,2,82409,824445510), (install-json,3,
>> 811222874,811223285)]
>> ...
>>
>> And the same issue is appearing as before (with 0 event batches getting
>> queued corresponding to dropped messages). Our kafka brokers are on version
>> 0.8.2.0, if that makes a difference.
>>
>> Also as a sanity check, I took out the ZK updates and reran (just in case
>> that was somehow causing problems), and that didn't change anything as
>> expected.
>>
>> Furthermore, the 0 event batches seem to take longer to process than
>> batches with the regular load of events: processing time for 0 event
>> batches can be upwards of 1 - 2 minutes, whereas processing time for ~2000
>> event batches is consistently < 1s. Why would that happen?
>>
>>
>> As for the checkpoint call:
>>
>> directKStream.checkpoint(checkpointDuration)
>>
>> was an attempt to set the checkpointing interval (at some multiple of the
>> batch interval), whereas StreamingContext.checkpoint seems like it will
>> only set the checkpoint directory.
>>
>>
>>
>> Thanks for all the help,
>>
>> Susan
>>
>>
>> On Wed, Aug 26, 2015 at 7:12 AM, Cody Koeninger 
>> wrote:
>>
>>> The first thing that stands out to me is
>>> createOnError = true
>>>
>>> Are you sure the checkpoint is actually loading, as opposed to failing
>>> and starting the job anyway?  There should be info lines that look like
>>>
>>> INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData:
>>> Restoring KafkaRDD for time 144059718 ms [(test,1,162,220)
>>>
>>>
>>> You should be able to tell from those whether the offset ranges being
>>> loaded from the checkpoint look reasonable.
>>>
>>> Also, is there a reason you're calling
>>>
>>> directKStream.checkpoint(checkpointDuration)
>>>
>>> Just calling checkpoint on the streaming context should be sufficient to
>>> save the metadata
>>>
>>>
>>>
>>> On Tue, Aug 25, 2015 at 2:36 PM, Susan Zhang 
>>> wrote:
>>>
 Sure thing!

 The main looks like:


 --


 val kafkaBrokers = conf.getString(s"$varPrefix.metadata.broker.list")

 val kafkaConf = Map(
   "zookeeper.connect" ->

Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

2015-08-26 Thread Cody Koeninger
I'd be less concerned about what the streaming ui shows than what's
actually going on with the job.  When you say you were losing messages, how
were you observing that?  The UI, or actual job output?

The log lines you posted indicate that the checkpoint was restored and
those offsets were processed; what are the log lines for the following
KafkaRDD ?


On Wed, Aug 26, 2015 at 2:04 PM, Susan Zhang  wrote:

> Compared offsets, and it continues from checkpoint loading:
>
> 15/08/26 11:24:54 INFO
> DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
> KafkaRDD for time 1440612035000 ms [(install-json,5,826112083,826112446),
> (install-json,4,825772921,825773536),
> (install-json,1,831654775,831655076),
> (install-json,0,1296018451,1296018810),
> (install-json,2,824785282,824785696), (install-json,3,
> 811428882,811429181)]
>
> 15/08/26 11:25:19 INFO kafka.KafkaRDD: Computing topic install-json,
> partition 0 offsets 1296018451 -> 1296018810
> 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
> partition 4 offsets 825773536 -> 825907428
> 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
> partition 2 offsets 824785696 -> 824889957
> 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
> partition 3 offsets 811429181 -> 811529084
> 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
> partition 1 offsets 831655076 -> 831729964
> ...
>
> But for some reason the streaming UI shows it as computing 0 events.
>
> Removing the call to checkpoint does remove the queueing of 0 event
> batches, since offsets just skip to the latest (checked that the first
> part.fromOffset in the restarted job is larger than the last
> part.untilOffset before restart).
>
>
>
>
> On Wed, Aug 26, 2015 at 11:19 AM, Cody Koeninger 
> wrote:
>
>> When the kafka rdd is actually being iterated on the worker, there should
>> be an info line of the form
>>
>> log.info(s"Computing topic ${part.topic}, partition
>> ${part.partition} " +
>>
>>   s"offsets ${part.fromOffset} -> ${part.untilOffset}")
>>
>>
>> You should be able to compare that to log of offsets during checkpoint
>> loading, to see if they line up.
>>
>> Just out of curiosity, does removing the call to checkpoint on the stream
>> affect anything?
>>
>>
>>
>> On Wed, Aug 26, 2015 at 1:04 PM, Susan Zhang 
>> wrote:
>>
>>> Thanks for the suggestions! I tried the following:
>>>
>>> I removed
>>>
>>> createOnError = true
>>>
>>> And reran the same process to reproduce. Double checked that checkpoint
>>> is loading:
>>>
>>> 15/08/26 10:10:40 INFO
>>> DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
>>> KafkaRDD for time 1440608825000 ms [(install-json,5,825898270,825898528),
>>> (install-json,4,825400856,825401058),
>>> (install-json,1,831453228,831453396),
>>> (install-json,0,1295759888,1295760378),
>>> (install-json,2,824443526,82409), (install-json,3,
>>> 811222580,811222874)]
>>> 15/08/26 10:10:40 INFO
>>> DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
>>> KafkaRDD for time 144060883 ms [(install-json,5,825898528,825898791),
>>> (install-json,4,825401058,825401249),
>>> (install-json,1,831453396,831453603),
>>> (install-json,0,1295760378,1295760809),
>>> (install-json,2,82409,824445510), (install-json,3,
>>> 811222874,811223285)]
>>> ...
>>>
>>> And the same issue is appearing as before (with 0 event batches getting
>>> queued corresponding to dropped messages). Our kafka brokers are on version
>>> 0.8.2.0, if that makes a difference.
>>>
>>> Also as a sanity check, I took out the ZK updates and reran (just in
>>> case that was somehow causing problems), and that didn't change anything as
>>> expected.
>>>
>>> Furthermore, the 0 event batches seem to take longer to process than
>>> batches with the regular load of events: processing time for 0 event
>>> batches can be upwards of 1 - 2 minutes, whereas processing time for ~2000
>>> event batches is consistently < 1s. Why would that happen?
>>>
>>>
>>> As for the checkpoint call:
>>>
>>> directKStream.checkpoint(checkpointDuration)
>>>
>>> was an attempt to set the checkpointing interval (at some multiple of
>>> the batch interval), whereas StreamingContext.checkpoint seems like it will
>>> only set the checkpoint directory.
>>>
>>>
>>>
>>> Thanks for all the help,
>>>
>>> Susan
>>>
>>>
>>> On Wed, Aug 26, 2015 at 7:12 AM, Cody Koeninger 
>>> wrote:
>>>
 The first thing that stands out to me is
 createOnError = true

 Are you sure the checkpoint is actually loading, as opposed to failing
 and starting the job anyway?  There should be info lines that look like

 INFO DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData:
 Restoring KafkaRDD for time 144059718 ms [(test,1,162,220)


 You should be able to tell from those whether the offset ranges being
 loaded from the checkpoint look reasonable.

 Also, is there 

Re: Can RDD be shared accross the cluster by other drivers?

2015-08-26 Thread Ted Yu
Can you outline your use case ?

This seems to be related:
SPARK-2389 globally shared SparkContext / shared Spark "application"

FYI

On Wed, Aug 26, 2015 at 10:47 AM, Tao Lu  wrote:

> Hi, Guys,
>
> Is it possible that RDD created by driver A be used driver B?
>
> Thanks!
>


Dataframe collect() work but count() fails

2015-08-26 Thread Srikanth
Hello,

I'm seeing a strange behavior where count() on a DataFrame errors as shown
below but collect() works fine.
This is what I tried from spark-shell. solrRDD.queryShards() return a
javaRDD.

val rdd = solrRDD.queryShards(sc, query, "_version_", 2).rdd
> rdd: org.apache.spark.rdd.RDD[org.apache.solr.common.SolrDocument] =
> MapPartitionsRDD[3] at flatMap at SolrRDD.java:335
>


> scala> val schema = solrRDD.getQuerySchema(query)
> schema: org.apache.spark.sql.types.StructType =
> StructType(StructField(ApplicationType,StringType,true),
> StructField(Language,StringType,true),
> StructField(MfgCode,StringType,true),
> StructField(OpSystemCode,StringType,true),
> StructField(ProductCode,StringType,true),
> StructField(ProductName,StringType,true),
> StructField(ProductVersion,StringType,true),
> StructField(_version_,LongType,true), StructField(id,StringType,true))



> scala> val rows = rdd.map(doc => RowFactory.create(schema.fieldNames.map(f
> => doc.getFirstValue(f))) ) //Convert RDD[SolrDocument] to RDD[Row]
> scala> val df = sqlContext.createDataFrame(rows, schema)


scala> val data = df.collect
> data: Array[org.apache.spark.sql.Row] = Array([[Ljava.lang.Object;@2135773a],
> [[Ljava.lang.Object;@3d2691de], [[Ljava.lang.Object;@2f32a52f],
> [[Ljava.lang.Object;@25fac8de]
>


> scala> df.count
> 15/08/26 14:53:28 WARN TaskSetManager: Lost task 1.3 in stage 6.0 (TID 42,
> 172.19.110.1): java.lang.AssertionError: assertion failed: Row column
> number mismatch, expected 9 columns, but got 1.
> Row content: [[Ljava.lang.Object;@1d962eb2]
> at scala.Predef$.assert(Predef.scala:179)
> at
> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:140)
> at
> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:124)
> at
> org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277)


Any idea what is wrong here?

Srikanth


Re: Spark Streaming Checkpointing Restarts with 0 Event Batches

2015-08-26 Thread Susan Zhang
Ah, I was using the UI coupled with the job logs indicating that offsets
were being "processed" even though it corresponded to 0 events. Looks like
I wasn't matching up timestamps correctly: the 0 event batches were
queued/processed when offsets were getting skipped:

15/08/26 11:26:05 INFO storage.BlockManager: Removing RDD 0
15/08/26 11:26:05 INFO kafka.KafkaRDD: Beginning offset 831729964 is the
same as ending offset skipping install-json 1
15/08/26 11:26:05 INFO storage.ShuffleBlockFetcherIterator: Getting 0
non-empty blocks out of 6 blocks
15/08/26 11:26:08 INFO storage.BlockManager: Removing RDD 1

But eventually processing of offset 831729964 would resume:

15/08/26 11:27:18 INFO kafka.KafkaRDD: Computing topic install-json,
partition 1 offsets 831729964 -> 831729976

Lesson learned: will be more focused on reading the job logs properly in
the future.


Thanks for all the help on this!


On Wed, Aug 26, 2015 at 12:16 PM, Cody Koeninger  wrote:

> I'd be less concerned about what the streaming ui shows than what's
> actually going on with the job.  When you say you were losing messages, how
> were you observing that?  The UI, or actual job output?
>
> The log lines you posted indicate that the checkpoint was restored and
> those offsets were processed; what are the log lines for the following
> KafkaRDD ?
>
>
> On Wed, Aug 26, 2015 at 2:04 PM, Susan Zhang  wrote:
>
>> Compared offsets, and it continues from checkpoint loading:
>>
>> 15/08/26 11:24:54 INFO
>> DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
>> KafkaRDD for time 1440612035000 ms [(install-json,5,826112083,826112446),
>> (install-json,4,825772921,825773536),
>> (install-json,1,831654775,831655076),
>> (install-json,0,1296018451,1296018810),
>> (install-json,2,824785282,824785696), (install-json,3,
>> 811428882,811429181)]
>>
>> 15/08/26 11:25:19 INFO kafka.KafkaRDD: Computing topic install-json,
>> partition 0 offsets 1296018451 -> 1296018810
>> 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
>> partition 4 offsets 825773536 -> 825907428
>> 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
>> partition 2 offsets 824785696 -> 824889957
>> 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
>> partition 3 offsets 811429181 -> 811529084
>> 15/08/26 11:25:28 INFO kafka.KafkaRDD: Computing topic install-json,
>> partition 1 offsets 831655076 -> 831729964
>> ...
>>
>> But for some reason the streaming UI shows it as computing 0 events.
>>
>> Removing the call to checkpoint does remove the queueing of 0 event
>> batches, since offsets just skip to the latest (checked that the first
>> part.fromOffset in the restarted job is larger than the last
>> part.untilOffset before restart).
>>
>>
>>
>>
>> On Wed, Aug 26, 2015 at 11:19 AM, Cody Koeninger 
>> wrote:
>>
>>> When the kafka rdd is actually being iterated on the worker, there
>>> should be an info line of the form
>>>
>>> log.info(s"Computing topic ${part.topic}, partition
>>> ${part.partition} " +
>>>
>>>   s"offsets ${part.fromOffset} -> ${part.untilOffset}")
>>>
>>>
>>> You should be able to compare that to log of offsets during checkpoint
>>> loading, to see if they line up.
>>>
>>> Just out of curiosity, does removing the call to checkpoint on the
>>> stream affect anything?
>>>
>>>
>>>
>>> On Wed, Aug 26, 2015 at 1:04 PM, Susan Zhang 
>>> wrote:
>>>
 Thanks for the suggestions! I tried the following:

 I removed

 createOnError = true

 And reran the same process to reproduce. Double checked that checkpoint
 is loading:

 15/08/26 10:10:40 INFO
 DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
 KafkaRDD for time 1440608825000 ms [(install-json,5,825898270,825898528),
 (install-json,4,825400856,825401058),
 (install-json,1,831453228,831453396),
 (install-json,0,1295759888,1295760378),
 (install-json,2,824443526,82409), (install-json,3,
 811222580,811222874)]
 15/08/26 10:10:40 INFO
 DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData: Restoring
 KafkaRDD for time 144060883 ms [(install-json,5,825898528,825898791),
 (install-json,4,825401058,825401249),
 (install-json,1,831453396,831453603),
 (install-json,0,1295760378,1295760809),
 (install-json,2,82409,824445510), (install-json,3,
 811222874,811223285)]
 ...

 And the same issue is appearing as before (with 0 event batches getting
 queued corresponding to dropped messages). Our kafka brokers are on version
 0.8.2.0, if that makes a difference.

 Also as a sanity check, I took out the ZK updates and reran (just in
 case that was somehow causing problems), and that didn't change anything as
 expected.

 Furthermore, the 0 event batches seem to take longer to process than
 batches with the regular load of events: processing time for 0 event
 batches can be u

suggest configuration for debugging spark streaming, kafka

2015-08-26 Thread Joanne Contact
Hi I have a Ubuntu box with 4GB memory and duo cores. Do you think it
won't be enough to run spark streaming and kafka? I try to install
standalone mode spark kafka so I can debug them in IDE. Do I need to
install hadoop?

Thanks!

J

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Does the driver program always run local to where you submit the job from?

2015-08-26 Thread Jerry
Assuming your submitting the job from terminal; when main() is called, if I
try to open a file locally, can I assume the machine is always the one I
submitted the job from? Currently I'm working off of a single machine, but
I'm wondering if I'll run into issues when I move over to a cluster. The
file I'm opening is purely for the driver program and not something the
worker nodes are going to read from.

Thanks,
 Jerry


Re: Does the driver program always run local to where you submit the job from?

2015-08-26 Thread Marcelo Vanzin
On Wed, Aug 26, 2015 at 2:03 PM, Jerry  wrote:
> Assuming your submitting the job from terminal; when main() is called, if I
> try to open a file locally, can I assume the machine is always the one I
> submitted the job from?

See the "--deploy-mode" option. "client" works as you describe;
"cluster" does not.

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Does the driver program always run local to where you submit the job from?

2015-08-26 Thread Jerry
Thanks!

On Wed, Aug 26, 2015 at 2:06 PM, Marcelo Vanzin  wrote:

> On Wed, Aug 26, 2015 at 2:03 PM, Jerry  wrote:
> > Assuming your submitting the job from terminal; when main() is called,
> if I
> > try to open a file locally, can I assume the machine is always the one I
> > submitted the job from?
>
> See the "--deploy-mode" option. "client" works as you describe;
> "cluster" does not.
>
> --
> Marcelo
>


Help! Stuck using withColumn

2015-08-26 Thread Saif.A.Ellafi
This simple comand call:

val final_df = data.select("month_balance").withColumn("month_date", 
data.col("month_date_curr"))

Is throwing:

org.apache.spark.sql.AnalysisException: resolved attribute(s) 
month_date_curr#324 missing from month_balance#234 in operator !Project 
[month_balance#234, month_date_curr#324 AS month_date_curr#408];
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:121)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931)
at org.apache.spark.sql.DataFrame.(DataFrame.scala:131)
at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595)
at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1039)





RE: Help! Stuck using withColumn

2015-08-26 Thread Saif.A.Ellafi
I can reproduce this even simpler with the following:

val gf = sc.parallelize(Array(3,6,4,7,3,4,5,5,31,4,5,2)).toDF("ASD")
val ff = sc.parallelize(Array(4,6,2,3,5,1,4,6,23,6,4,7)).toDF("GFD")

gf.withColumn("DSA", ff.col("GFD"))

org.apache.spark.sql.AnalysisException: resolved attribute(s) GFD#421 missing 
from ASD#419 in operator !Project [ASD#419,GFD#421 AS DSA#422];
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:121)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931)
at org.apache.spark.sql.DataFrame.(DataFrame.scala:131)
at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595)
at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1039)


From: saif.a.ell...@wellsfargo.com [mailto:saif.a.ell...@wellsfargo.com]
Sent: Wednesday, August 26, 2015 6:47 PM
To: user@spark.apache.org
Subject: Help! Stuck using withColumn

This simple comand call:

val final_df = data.select("month_balance").withColumn("month_date", 
data.col("month_date_curr"))

Is throwing:

org.apache.spark.sql.AnalysisException: resolved attribute(s) 
month_date_curr#324 missing from month_balance#234 in operator !Project 
[month_balance#234, month_date_curr#324 AS month_date_curr#408];
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:121)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931)
at org.apache.spark.sql.DataFrame.(DataFrame.scala:131)
at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595)
at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1039)





Spark.ml vs Spark.mllib

2015-08-26 Thread njoshi
Hi,

We are in the process of developing a new product/Spark application. While 
the official Spark 1.4.1 page
   invites users and
developers to use *Spark.mllib* and optionally contribute to *Spark.ml*, 
this

  
StackOverflow post refers to the /design doc/, saying the Spark.mllib will
be deprecated eventually. 

Could you please confirm which of these is true, and if we need to worry if
we are planning to develop the app using Spark.mlli? What would be the
timeline for this migration?

Thanks in advance,
Nikhil



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-ml-vs-Spark-mllib-tp24465.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Help! Stuck using withColumn

2015-08-26 Thread Silvio Fiorito
Hi Saif,

In both cases you’re referencing columns that don’t exist in the current 
DataFrame.

The first email you did a select and then a withColumn for ‘month_date_cur' on 
the resulting DF, but that column does not exist, because you did a select for 
only ‘month_balance’.

In the second email you’re using 2 different DFs and trying to select a column 
from one in a withColumn on the other, that just wouldn’t work. Also, there’s 
no explicit column names given to either DF, so that column doesn’t exist.

Did you intend to do a join instead?

Thanks,
Silvio

From: "saif.a.ell...@wellsfargo.com"
Date: Wednesday, August 26, 2015 at 6:06 PM
To: "saif.a.ell...@wellsfargo.com", 
"user@spark.apache.org"
Subject: RE: Help! Stuck using withColumn

I can reproduce this even simpler with the following:

val gf = sc.parallelize(Array(3,6,4,7,3,4,5,5,31,4,5,2)).toDF("ASD")
val ff = sc.parallelize(Array(4,6,2,3,5,1,4,6,23,6,4,7)).toDF("GFD")

gf.withColumn("DSA", ff.col("GFD"))

org.apache.spark.sql.AnalysisException: resolved attribute(s) GFD#421 missing 
from ASD#419 in operator !Project [ASD#419,GFD#421 AS DSA#422];
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:121)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931)
at org.apache.spark.sql.DataFrame.(DataFrame.scala:131)
at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595)
at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1039)


From: saif.a.ell...@wellsfargo.com 
[mailto:saif.a.ell...@wellsfargo.com]
Sent: Wednesday, August 26, 2015 6:47 PM
To: user@spark.apache.org
Subject: Help! Stuck using withColumn

This simple comand call:

val final_df = data.select("month_balance").withColumn("month_date", 
data.col("month_date_curr"))

Is throwing:

org.apache.spark.sql.AnalysisException: resolved attribute(s) 
month_date_curr#324 missing from month_balance#234 in operator !Project 
[month_balance#234, month_date_curr#324 AS month_date_curr#408];
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:121)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:98)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:42)
at 
org.apache.spark.sql.SQLContext$QueryExecution.assertAnalyzed(SQLContext.scala:931)
at org.apache.spark.sql.DataFrame.(DataFrame.scala:131)
at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$logicalPlanToDataFrame(DataFrame.scala:154)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:595)
at org.apache.spark.sql.DataFrame.withColumn(DataFrame.scala:1039)





Re: Join with multiple conditions (In reference to SPARK-7197)

2015-08-26 Thread Michal Monselise
Davies, I created an issue - SPARK-10246


On Tue, Aug 25, 2015 at 12:53 PM, Davies Liu  wrote:

> It's good to support this, could you create a JIRA for it and target for
> 1.6?
>
> On Tue, Aug 25, 2015 at 11:21 AM, Michal Monselise
>  wrote:
> >
> > Hello All,
> >
> > PySpark currently has two ways of performing a join: specifying a join
> condition or column names.
> >
> > I would like to perform a join using a list of columns that appear in
> both the left and right DataFrames. I have created an example in this
> question on Stack Overflow.
> >
> > Basically, I would like to do the following as specified in the
> documentation in  /spark/python/pyspark/sql/dataframe.py row 560 and
> specify a list of column names:
> >
> > >>> df.join(df4, ['name', 'age']).select(df.name, df.age).collect()
> >
> > However, this produces an error.
> >
> > In JIRA issue SPARK-7197, it is mentioned that the syntax is actually
> different from the one specified in the documentation for joining using a
> condition.
> >
> > Documentation:
> > >>> cond = [df.name == df3.name, df.age == df3.age] >>> df.join(df3,
> cond, 'outer').select(df.name, df3.age).collect()
> >
> > JIRA Issue:
> >
> > a.join(b, (a.year==b.year) & (a.month==b.month), 'inner')
> >
> >
> > In other words. the join function cannot take a list.
> > I was wondering if you could also clarify what is the correct syntax for
> providing a list of columns.
> >
> >
> > Thanks,
> > Michal
> >
> >
>


error accessing vertexRDD

2015-08-26 Thread dizzy5112
Hi all, question on an issue im having with a vertexRDD. If i kick of my
spark shell with something like this:



then run:


it will finish and give me the count but is see a few errors (see below).
This is okay for this small dataset but when trying with a large data set it
doesnt finish because of the number of errors. this works okay if i kick of
my spark shell with master = local.  Any help appreciated





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/error-accessing-vertexRDD-tp24466.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: query avro hive table in spark sql

2015-08-26 Thread Michael Armbrust
I'd suggest looking at
http://spark-packages.org/package/databricks/spark-avro

On Wed, Aug 26, 2015 at 11:32 AM, gpatcham  wrote:

> Hi,
>
> I'm trying to query hive table which is based on avro in spark SQL and
> seeing below errors.
>
> 15/08/26 17:51:12 WARN avro.AvroSerdeUtils: Encountered AvroSerdeException
> determining schema. Returning signal schema to indicate problem
> org.apache.hadoop.hive.serde2.avro.AvroSerdeException: Neither
> avro.schema.literal nor avro.schema.url specified, can't determine table
> schema
> at
>
> org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.determineSchemaOrThrowException(AvroSerdeUtils.java:68)
> at
>
> org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils.determineSchemaOrReturnErrorSchema(AvroSerdeUtils.java:93)
> at
> org.apache.hadoop.hive.serde2.avro.AvroSerDe.initialize(AvroSerDe.java:60)
> at
>
> org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:375)
> at
>
> org.apache.hadoop.hive.ql.metadata.Partition.getDeserializer(Partition.java:249)
>
>
> Its not able to determine schema. Hive table is pointing to avro schema
> using url. I'm stuck and couldn't find more info on this.
>
> Any pointers ?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/query-avro-hive-table-in-spark-sql-tp24462.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark cluster multi tenancy

2015-08-26 Thread Jerrick Hoang
Would be interested to know the answer too.

On Wed, Aug 26, 2015 at 11:45 AM, Sadhan Sood  wrote:

> Interestingly, if there is nothing running on dev spark-shell, it recovers
> successfully and regains the lost executors. Attaching the log for that.
> Notice, the "Registering block manager .." statements in the very end after
> all executors were lost.
>
> On Wed, Aug 26, 2015 at 11:27 AM, Sadhan Sood 
> wrote:
>
>> Attaching log for when the dev job gets stuck (once all its executors are
>> lost due to preemption). This is a spark-shell job running in yarn-client
>> mode.
>>
>> On Wed, Aug 26, 2015 at 10:45 AM, Sadhan Sood 
>> wrote:
>>
>>> Hi All,
>>>
>>> We've set up our spark cluster on aws running on yarn (running on hadoop
>>> 2.3) with fair scheduling and preemption turned on. The cluster is shared
>>> for prod and dev work where prod runs with a higher fair share and can
>>> preempt dev jobs if there are not enough resources available for it.
>>> It appears that dev jobs which get preempted often get unstable after
>>> losing some executors and the whole jobs gets stuck (without making any
>>> progress) or end up getting restarted (and hence losing all the work done).
>>> Has someone encountered this before ? Is the solution just to set 
>>> spark.task.maxFailures
>>> to a really high value to recover from task failures in such scenarios? Are
>>> there other approaches that people have taken for spark multi tenancy that
>>> works better in such scenario?
>>>
>>> Thanks,
>>> Sadhan
>>>
>>
>>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>


Re: How to unit test HiveContext without OutOfMemoryError (using sbt)

2015-08-26 Thread Michael Armbrust
I'd suggest setting sbt to fork when running tests.

On Wed, Aug 26, 2015 at 10:51 AM, Mike Trienis 
wrote:

> Thanks for your response Yana,
>
> I can increase the MaxPermSize parameter and it will allow me to run the
> unit test a few more times before I run out of memory.
>
> However, the primary issue is that running the same unit test in the same
> JVM (multiple times) results in increased memory (each run of the unit
> test) and I believe it has something to do with HiveContext not reclaiming
> memory after it is finished (or I'm not shutting it down properly).
>
> It could very well be related to sbt, however, it's not clear to me.
>
>
> On Tue, Aug 25, 2015 at 1:12 PM, Yana Kadiyska 
> wrote:
>
>> The PermGen space error is controlled with MaxPermSize parameter. I run
>> with this in my pom, I think copied pretty literally from Spark's own
>> tests... I don't know what the sbt equivalent is but you should be able to
>> pass it...possibly via SBT_OPTS?
>>
>>
>>  
>>   org.scalatest
>>   scalatest-maven-plugin
>>   1.0
>>   
>>
>> ${project.build.directory}/surefire-reports
>>   false
>>   .
>>   SparkTestSuite.txt
>>   -Xmx3g -XX:MaxPermSize=256m
>> -XX:ReservedCodeCacheSize=512m
>>   
>>   
>>   true
>>   1
>>   false
>>
>> true
>>   
>>   
>>   
>>   
>>   test
>>   
>>   test
>>   
>>   
>>   
>>   
>>   
>>
>>
>> On Tue, Aug 25, 2015 at 2:10 PM, Mike Trienis 
>> wrote:
>>
>>> Hello,
>>>
>>> I am using sbt and created a unit test where I create a `HiveContext`
>>> and execute some query and then return. Each time I run the unit test the
>>> JVM will increase it's memory usage until I get the error:
>>>
>>> Internal error when running tests: java.lang.OutOfMemoryError: PermGen
>>> space
>>> Exception in thread "Thread-2" java.io.EOFException
>>>
>>> As a work-around, I can fork a new JVM each time I run the unit test,
>>> however, it seems like a bad solution as takes a while to run the unit
>>> test.
>>>
>>> By the way, I tried to importing the TestHiveContext:
>>>
>>>- import org.apache.spark.sql.hive.test.TestHiveContext
>>>
>>> However, it suffers from the same memory issue. Has anyone else suffered
>>> from the same problem? Note that I am running these unit tests on my mac.
>>>
>>> Cheers, Mike.
>>>
>>>
>>
>


Re: Differing performance in self joins

2015-08-26 Thread Michael Armbrust
-dev +user

I'd suggest running .explain() on both dataframes to understand the
performance better.  The problem is likely that we have a pattern that
looks for cases where you have an equality predicate where either side can
be evaluated using one side of the join.  We turn this into a hash join.

(df("eday") - laggard("p_eday")) === 1) is pretty tricky for us to
understand, and so the pattern misses the possible optimized plan.

On Wed, Aug 26, 2015 at 6:10 PM, David Smith  wrote:

> I've noticed that two queries, which return identical results, have very
> different performance. I'd be interested in any hints about how avoid
> problems like this.
>
> The DataFrame df contains a string field "series" and an integer "eday",
> the
> number of days since (or before) the 1970-01-01 epoch.
>
> I'm doing some analysis over a sliding date window and, for now, avoiding
> UDAFs. I'm therefore using a self join. First, I create
>
> val laggard = df.withColumnRenamed("series",
> "p_series").withColumnRenamed("eday", "p_eday")
>
> Then, the following query runs in 16s:
>
> df.join(laggard, (df("series") === laggard("p_series")) && (df("eday") ===
> (laggard("p_eday") + 1))).count
>
> while the following query runs in 4 - 6 minutes:
>
> df.join(laggard, (df("series") === laggard("p_series")) && ((df("eday") -
> laggard("p_eday")) === 1)).count
>
> It's worth noting that the series term is necessary to keep the query from
> doing a complete cartesian product over the data.
>
> Ideally, I'd like to look at lags of more than one day, but the following
> is
> equally slow:
>
> df.join(laggard, (df("series") === laggard("p_series")) && (df("eday") -
> laggard("p_eday")).between(1,7)).count
>
> Any advice about the general principle at work here would be welcome.
>
> Thanks,
> David
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Differing-performance-in-self-joins-tp13864.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
> For additional commands, e-mail: dev-h...@spark.apache.org
>
>


spark streaming 1.3 kafka topic error

2015-08-26 Thread Shushant Arora
Hi

My streaming application gets killed with below error

5/08/26 21:55:20 ERROR kafka.DirectKafkaInputDStream:
ArrayBuffer(kafka.common.NotLeaderForPartitionException,
kafka.common.NotLeaderForPartitionException,
kafka.common.NotLeaderForPartitionException,
kafka.common.NotLeaderForPartitionException,
kafka.common.NotLeaderForPartitionException,
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([testtopic,223], [testtopic,205], [testtopic,64], [testtopic,100],
[testtopic,193]))
15/08/26 21:55:20 ERROR scheduler.JobScheduler: Error generating jobs for
time 144062612 ms
org.apache.spark.SparkException:
ArrayBuffer(kafka.common.NotLeaderForPartitionException,
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([testtopic,115]))
at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:94)
at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:116)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299)
at



Kafka params in job logs printed are :
 value.serializer = class
org.apache.kafka.common.serialization.StringSerializer
key.serializer = class
org.apache.kafka.common.serialization.StringSerializer
block.on.buffer.full = true
retry.backoff.ms = 100
buffer.memory = 1048576
batch.size = 16384
metrics.sample.window.ms = 3
metadata.max.age.ms = 30
receive.buffer.bytes = 32768
timeout.ms = 3
max.in.flight.requests.per.connection = 5
bootstrap.servers = [broker1:9092, broker2:9092, broker3:9092]
metric.reporters = []
client.id =
compression.type = none
retries = 0
max.request.size = 1048576
send.buffer.bytes = 131072
acks = all
reconnect.backoff.ms = 10
linger.ms = 0
metrics.num.samples = 2
metadata.fetch.timeout.ms = 6


Is it kafka broker getting down and job is getting killed ? Whats the best
way to handle it ?
Increasing retries and backoff time  wil help and to what values those
should be set to never have streaming application failure - rather it keep
on retrying after few seconds and send a event so that my custom code can
send notification of kafka broker down if its because of that.


Thanks


Re: spark streaming 1.3 kafka buffer size

2015-08-26 Thread Shushant Arora
Can I change this param fetch.message.max.bytes  or
spark.streaming.kafka.maxRatePerPartition
at run time across batches.
Say I detected some fail condition in my system and I decided to sonsume i
next batch interval only 10 messages per partition and if that succeed I
reset the max limit to unlimited again .

On Wed, Aug 26, 2015 at 9:32 PM, Cody Koeninger  wrote:

> see http://kafka.apache.org/documentation.html#consumerconfigs
>
> fetch.message.max.bytes
>
> in the kafka params passed to the constructor
>
>
> On Wed, Aug 26, 2015 at 10:39 AM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> whats the default buffer in spark streaming 1.3 for  kafka messages.
>>
>> Say In this run it has to fetch messages from offset 1 to 1. will it
>> fetch all in one go or internally it fetches messages in  few messages
>> batch.
>>
>> Is there any setting to configure this no of offsets fetched in one batch?
>>
>
>


Re: Build k-NN graph for large dataset

2015-08-26 Thread Jaonary Rabarisoa
Thank you all for these links. I'll check them.

On Wed, Aug 26, 2015 at 5:05 PM, Charlie Hack 
wrote:

> +1 to all of the above esp.  Dimensionality reduction and locality
> sensitive hashing / min hashing.
>
> There's also an algorithm implemented in MLlib called DIMSUM which was
> developed at Twitter for this purpose. I've been meaning to try it and
> would be interested to hear about results you get.
>
> https://blog.twitter.com/2014/all-pairs-similarity-via-dimsum
>
> ​Charlie
>
>
> — Sent from Mailbox
>
> On Wednesday, Aug 26, 2015 at 09:57, Michael Malak <
> michaelma...@yahoo.com.invalid>, wrote:
>
>> Yes. And a paper that describes using grids (actually varying grids) is
>> http://research.microsoft.com/en-us/um/people/jingdw/pubs%5CCVPR12-GraphConstruction.pdf
>>  In
>> the Spark GraphX In Action book that Robin East and I are writing, we
>> implement a drastically simplified version of this in chapter 7, which
>> should become available in the MEAP mid-September.
>> http://www.manning.com/books/spark-graphx-in-action
>>
>>
>> --
>>
>> If you don't want to compute all N^2 similarities, you need to implement
>> some kind of blocking first. For example, LSH (locally sensitive hashing).
>> A quick search gave this link to a Spark implementation:
>>
>>
>> http://stackoverflow.com/questions/2771/spark-implementation-for-locality-sensitive-hashing
>>
>>
>>
>> On Wed, Aug 26, 2015 at 7:35 AM, Jaonary Rabarisoa 
>> wrote:
>>
>> Dear all,
>>
>> I'm trying to find an efficient way to build a k-NN graph for a large
>> dataset. Precisely, I have a large set of high dimensional vector (say d
>> >>> 1) and I want to build a graph where those high dimensional points
>> are the vertices and each one is linked to the k-nearest neighbor based on
>> some kind similarity defined on the vertex spaces.
>> My problem is to implement an efficient algorithm to compute the weight
>> matrix of the graph. I need to compute a N*N similarities and the only way
>> I know is to use "cartesian" operation follow by "map" operation on RDD.
>> But, this is very slow when the N is large. Is there a more cleaver way to
>> do this for an arbitrary similarity function ?
>>
>> Cheers,
>>
>> Jao
>>
>>
>>
>>
>>