Getting info from DecisionTreeClassificationModel

2015-10-21 Thread rake
I’m trying to use Spark ml to create a classification tree model and examine 
the resulting model.

I have managed to create a DecisionTreeClassificationModel (class 
org.apache.spark.ml.classification.DecisionTreeClassificationModel), but have 
not been able to obtain basic information from the model.

For example, I am able to obtain the root node of the tree via the ‘rootNode’ 
field.  But how do I get to other nodes?

If I get to a Node object ( class = org.apache.park.ml.tree.Node ), how do I 
get essential information from a particular node, such as what are the counts 
or probabilities of each of the target classes at that node in the training 
data?  It seems like the only piece of info I’m allowed to know is the 
predicted class, via the ‘prediction’ field.


Randy Kerber






-
Randy Kerber
Data Science Consultant
San Jose, California

--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Getting-info-from-DecisionTreeClassificationModel-tp25152.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Issue in spark batches

2015-10-21 Thread Tathagata Das
Unfortunately, you will have to write that code yourself.

TD

On Tue, Oct 20, 2015 at 11:28 PM, varun sharma 
wrote:

> Hi TD,
> Is there any way in spark  I can fail/retry batch in case of any
> exceptions or do I have to write code to explicitly keep on retrying?
> Also If some batch fail, I want to block further batches to be processed
> as it would create inconsistency in updation of zookeeper offsets and maybe
> kill the job itself after lets say 3 retries.
>
> Any pointers to achieve same are appreciated.
>
> On Wed, Oct 21, 2015 at 1:15 AM, Tathagata Das 
> wrote:
>
>> That is actually a bug in the UI that got fixed in 1.5.1. The batch is
>> actually completing with exception, the UI does not update correctly.
>>
>> On Tue, Oct 20, 2015 at 8:38 AM, varun sharma 
>> wrote:
>>
>>> Also, As you can see the timestamps in attached image. batches coming
>>> after the Cassandra server comes up(21:04) are processed and batches which
>>> are in hung state(21:03) never get processed.
>>> So, How do I fail those batches so that those can be processed again.
>>>
>>> On Tue, Oct 20, 2015 at 9:02 PM, varun sharma >> > wrote:
>>>
 Hi TD,
 Yes saveToCassandra throws exception. How do I fail that task
 explicitly if i catch any exceptions?.
 Right now that batch doesn't fail and remain in hung state. Is there
 any way I fail that batch so that it can be tried again.

 Thanks
 Varun

 On Tue, Oct 20, 2015 at 2:50 AM, Tathagata Das 
 wrote:

> If cassandra is down, does saveToCassandra throw an exception? If it
> does, you can catch that exception and write your own logic to retry 
> and/or
> no update. Once the foreachRDD function completes, that batch will be
> internally marked as completed.
>
> TD
>
> On Mon, Oct 19, 2015 at 5:48 AM, varun sharma <
> varunsharman...@gmail.com> wrote:
>
>> Hi,
>> I am facing this issue consistently in spark-cassandra-kafka *streaming
>> job.*
>> *Spark 1.4.0*
>> *cassandra connector 1.4.0-M3*
>> *Issue is:*
>>
>> I am reading data from *Kafka* using DirectStream, writing to
>> *Cassandra* after parsing the json and the subsequently updating the
>> offsets in *zookeeper*.
>> If Cassandra cluster is down, it throws exception but the batch which
>> arrives in that time window is not processed ever though the offsets are
>> updated in zookeeper.
>> It is resulting data loss.
>> Once the Cassandra cluster is up, this job process the data normally.
>> PFA the screenshots of hung batches and code.
>>
>> *Code:*
>>
>> data_rdd.foreachRDD(rdd=> {
>>   val stream = rdd
>> .map(x =>JsonUtility.deserialize(x))
>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
>> StreamModel.getColumns)
>>
>>
>>   //commit the offsets once everything is done
>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>> })
>>
>> *I have even tried this variant:*
>>
>> data_rdd.foreachRDD(rdd=> {
>>   val stream = rdd
>> .map(x =>JsonUtility.deserialize(x))
>>   stream.saveToCassandra(CASSANDRA_KEY_SPACE, SIGNATURE_TABLE, 
>> StreamModel.getColumns)
>> })
>>
>> data_rdd.foreachRDD(rdd=> {
>>
>>   //commit the offsets once everything is done
>>
>>   ZookeeperManager.updateOffsetsinZk(zkProperties, rdd)
>>
>> }
>>
>> Exception when cassandra cluster is down:
>> [2015-10-19 12:49:20] [JobScheduler] [ERROR]
>> [org.apache.spark.streaming.scheduler.JobScheduler] - Error running job
>> streaming job 144523914 ms.3
>> java.io.IOException: Failed to open native connection to Cassandra at
>> {..}
>>
>> --
>> *VARUN SHARMA*
>> *Flipkart*
>> *Bangalore*
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>


 --
 *VARUN SHARMA*
 *Flipkart*
 *Bangalore*

>>>
>>>
>>>
>>> --
>>> *VARUN SHARMA*
>>> *Flipkart*
>>> *Bangalore*
>>>
>>
>>
>
>
> --
> *VARUN SHARMA*
> *Flipkart*
> *Bangalore*
>


Re: Job splling to disk and memory in Spark Streaming

2015-10-21 Thread Tathagata Das
Well, reduceByKey needs to shutffle if your intermediate data is not
already partitioned in the same way as reduceByKey's partitioning.

reduceByKey() has other signatures that take in a partitioner, or simply
number of partitions. So you can set the same partitioner as your previous
stage. Without any further insight into the structure of your code its hard
to say anything more.

On Tue, Oct 20, 2015 at 5:59 PM, swetha  wrote:

> Hi,
>
> Currently I have a job that has spills to disk and memory due to usage of
> reduceByKey and a lot of intermediate data in reduceByKey that gets
> shuffled.
>
> How to use custom partitioner in Spark Streaming for  an intermediate stage
> so that  the next stage that uses reduceByKey does not have to do shuffles?
>
> Thanks,
> Swetha
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Job-splling-to-disk-and-memory-in-Spark-Streaming-tp25149.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Ahhhh... Spark creates >30000 partitions... What can I do?

2015-10-21 Thread Ranadip Chatterjee
T3l,

Did Sean Owen's suggestion help? If not, can you please share the behaviour?

Cheers.
On 20 Oct 2015 11:02 pm, "Lan Jiang"  wrote:

> I think the data file is binary per the original post. So in this case,
> sc.binaryFiles should be used. However, I still recommend against using so
> many small binary files as
>
> 1. They are not good for batch I/O
> 2. They put too many memory pressure on namenode.
>
> Lan
>
>
> On Oct 20, 2015, at 11:20 AM, Deenar Toraskar 
> wrote:
>
> also check out wholeTextFiles
>
>
> https://spark.apache.org/docs/1.4.0/api/java/org/apache/spark/SparkContext.html#wholeTextFiles(java.lang.String,%20int)
>
> On 20 October 2015 at 15:04, Lan Jiang  wrote:
>
>> As Francois pointed out, you are encountering a classic small file
>> anti-pattern. One solution I used in the past is to wrap all these small
>> binary files into a sequence file or avro file. For example, the avro
>> schema can have two fields: filename: string and binaryname:byte[]. Thus
>> your file is splittable and will not create so many partitions.
>>
>> Lan
>>
>>
>> On Oct 20, 2015, at 8:03 AM, François Pelletier <
>> newslett...@francoispelletier.org> wrote:
>>
>> You should aggregate your files in larger chunks before doing anything
>> else. HDFS is not fit for small files. It will bloat it and cause you a lot
>> of performance issues. Target a few hundred MB chunks partition size and
>> then save those files back to hdfs and then delete the original ones. You
>> can read, use coalesce and the saveAsXXX on the result.
>>
>> I had the same kind of problem once and solved it in bunching 100's of
>> files together in larger ones. I used text files with bzip2 compression.
>>
>>
>>
>> Le 2015-10-20 08:42, Sean Owen a écrit :
>>
>> coalesce without a shuffle? it shouldn't be an action. It just treats
>> many partitions as one.
>>
>> On Tue, Oct 20, 2015 at 1:00 PM, t3l  wrote:
>>
>>>
>>> I have dataset consisting of 5 binary files (each between 500kb and
>>> 2MB). They are stored in HDFS on a Hadoop cluster. The datanodes of the
>>> cluster are also the workers for Spark. I open the files as a RDD using
>>> sc.binaryFiles("hdfs:///path_to_directory").When I run the first action
>>> that
>>> involves this RDD, Spark spawns a RDD with more than 3 partitions.
>>> And
>>> this takes ages to process these partitions even if you simply run
>>> "count".
>>> Performing a "repartition" directly after loading does not help, because
>>> Spark seems to insist on materializing the RDD created by binaryFiles
>>> first.
>>>
>>> How I can get around this?
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/A-Spark-creates-3-partitions-What-can-I-do-tp25140.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>>> .
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>>
>
>


Re: Job splling to disk and memory in Spark Streaming

2015-10-21 Thread Adrian Tanase
+1 – you can definitely make it work by making sure you are using the same 
partitioner (including the same number of partitions).

For most operations like reduceByKey, updateStateByKey – simply specifying it 
enough.

There are some gotchas for other operations:

  *   mapValues and flatMapValues preserve partitioning
  *   map and flatMap don’t as they can’t be sure of your logic. If you are 
absolutely sure that the emitted values will remain on the same partition then 
you can also override the partitioner to avoid shuffle
  *   Union on 2 Dstreams throws away partitioning. Again, if you know that 
it’s safe to do it, then you need to look at transformWith and push down to 
RDD.union which preserves partitioning

By using these tricks I’ve successfully forced a pretty complex streaming 
pipeline (including 2 updateStateByKey, unions, flatmaps, repartitions, custom 
partitioner, etc) to execute in a single stage.

Hope this helps,
-adrian

From: Tathagata Das
Date: Wednesday, October 21, 2015 at 10:36 AM
To: swetha
Cc: user
Subject: Re: Job splling to disk and memory in Spark Streaming

Well, reduceByKey needs to shutffle if your intermediate data is not already 
partitioned in the same way as reduceByKey's partitioning.

reduceByKey() has other signatures that take in a partitioner, or simply number 
of partitions. So you can set the same partitioner as your previous stage. 
Without any further insight into the structure of your code its hard to say 
anything more.

On Tue, Oct 20, 2015 at 5:59 PM, swetha 
mailto:swethakasire...@gmail.com>> wrote:
Hi,

Currently I have a job that has spills to disk and memory due to usage of
reduceByKey and a lot of intermediate data in reduceByKey that gets
shuffled.

How to use custom partitioner in Spark Streaming for  an intermediate stage
so that  the next stage that uses reduceByKey does not have to do shuffles?

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Job-splling-to-disk-and-memory-in-Spark-Streaming-tp25149.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org




Re: [spark1.5.1] HiveQl.parse throws org.apache.spark.sql.AnalysisException: null

2015-10-21 Thread Sebastian Nadorp
What we're trying to achieve is a fast way of testing the validity of our
SQL queries within Unit tests without going through the time consuming task
of setting up an Hive Test Context.
If there is any way to speed this step up, any help would be appreciated.

Thanks,
Sebastian

*Sebastian Nadorp*
Software Developer

nugg.ad AG - Predictive Behavioral Targeting
Rotherstraße 16 - 10245 Berlin

sebastian.nad...@nugg.ad

www.nugg.ad * http://blog.nugg.ad/ * www.twitter.com/nuggad *
www.facebook.com/nuggad

*Registergericht/District court*: Charlottenburg HRB 102226 B
*Vorsitzender des Aufsichtsrates/Chairman of the supervisory board: *Dr.
Detlev Ruland
*Vorstand/Executive board:* Martin Hubert

*nugg.ad  is a company of Deutsche Post DHL.*


On Tue, Oct 20, 2015 at 9:20 PM, Xiao Li  wrote:

> Just curious why you are using parseSql APIs?
>
> It works well if you use the external APIs. For example, in your case:
>
> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> hiveContext.sql("CREATE EXTERNAL TABLE IF NOT EXISTS `t`(`id` STRING,
> `foo` INT) PARTITIONED BY (year INT, month INT, day INT) STORED AS PARQUET
> Location 'temp'")
>
> Good luck,
>
> Xiao Li
>
>
> 2015-10-20 10:23 GMT-07:00 Michael Armbrust :
>
>> Thats not really intended to be a public API as there is some internal
>> setup that needs to be done for Hive to work.  Have you created a
>> HiveContext in the same thread?  Is there more to that stacktrace?
>>
>> On Tue, Oct 20, 2015 at 2:25 AM, Ayoub 
>> wrote:
>>
>>> Hello,
>>>
>>> when upgrading to spark 1.5.1 from 1.4.1 the following code crashed on
>>> runtime. It is mainly used to parse HiveQL queries and check that they
>>> are
>>> valid.
>>>
>>> package org.apache.spark.sql.hive
>>>
>>> val sql = "CREATE EXTERNAL TABLE IF NOT EXISTS `t`(`id` STRING, `foo`
>>> INT)
>>> PARTITIONED BY (year INT, month INT, day INT) STORED AS PARQUET Location
>>> 'temp'"
>>>
>>> HiveQl.parseSql(sql)
>>>
>>> org.apache.spark.sql.AnalysisException: null;
>>> at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:303)
>>> at
>>>
>>> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
>>> at
>>>
>>> org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
>>> at
>>> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
>>> at
>>> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
>>> at
>>>
>>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
>>> at
>>>
>>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
>>> at
>>> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
>>> at
>>>
>>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
>>> at
>>>
>>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
>>> at
>>> scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
>>> at
>>>
>>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
>>> at
>>>
>>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
>>> at
>>> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
>>> at
>>>
>>> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
>>> at
>>>
>>> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>> at
>>> scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
>>> at
>>>
>>> scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
>>> at
>>>
>>> org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
>>> at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:277)
>>> at
>>> org.apache.spark.sql.hive.SQLChecker$$anonfun$1.apply(SQLChecker.scala:9)
>>> at
>>> org.apache.spark.sql.hive.SQLChecker$$anonfun$1.apply(SQLChecker.scala:9)
>>>
>>> Should that be done differently on spark 1.5.1 ?
>>>
>>> Thanks,
>>> Ayoub
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/spark1-5-1-HiveQl-parse-throws-org-apache-spark-sql-AnalysisException-null-tp25138.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


How to use And Operator in filter (PySpark)

2015-10-21 Thread Jeff Zhang
I can do it in scala api, but not sure what's the syntax in pyspark.
(Didn't find it in python api)

Here's what I tried, both failed

>>> df.filter(df.age>3 & df.name=="Andy").collect()
>>> df.filter(df.age>3 and df.name=="Andy").collect()

-- 
Best Regards

Jeff Zhang


Spark-Testing-Base Q/A

2015-10-21 Thread Mark Vervuurt
Hi Everyone,

I am busy trying out ‘Spark-Testing-Base 
’. I have the following 
questions?

Can you test Spark Streaming Jobs using Java?
Can I use Spark-Testing-Base 1.3.0_0.1.1 together with Spark 1.3.1?

Thanks.

Greetings,
Mark

Re: spark-shell (1.5.1) not starting cleanly on Windows.

2015-10-21 Thread Steve Loughran
you've hit this

https://wiki.apache.org/hadoop/WindowsProblems

the next version of hadoop will fail with a more useful message, including that 
wiki link


On 21 Oct 2015, at 00:36, Renato Perini 
mailto:renato.per...@gmail.com>> wrote:

java.lang.RuntimeException: java.lang.NullPointerException
at 
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
at 
org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:171)
at 
org.apache.spark.sql.hive.HiveContext.executionHive$lzycompute(HiveContext.scala:162)
at 
org.apache.spark.sql.hive.HiveContext.executionHive(HiveContext.scala:160)
at org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:167)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
at 
org.apache.spark.repl.SparkILoop.createSQLContext(SparkILoop.scala:1028)
at $iwC$$iwC.(:9)
at $iwC.(:18)
at (:20)
at .(:24)
at .()
at .(:7)
at .()
at $print()
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
at 
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
at 
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
at 
org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)



Spark on Yarn

2015-10-21 Thread Raghuveer Chanda
Hi all,

I am trying to run spark on yarn in quickstart cloudera vm.It already has
spark 1.3 and Hadoop 2.6.0-cdh5.4.0 installed.(I am not using spark-submit
since I want to run a different version of spark).

I am able to run spark 1.3 on yarn but get the below error for spark 1.4.

The log shows its running on spark 1.4 but still gives a error on a method
which is present in 1.4 and not 1.3. Even the fat jar contains the class
files of 1.4.

As far as running in yarn the installed spark version shouldnt matter, but
still its running on the other version.


*Hadoop Version:*
Hadoop 2.6.0-cdh5.4.0
Subversion http://github.com/cloudera/hadoop -r
c788a14a5de9ecd968d1e2666e8765c5f018c271
Compiled by jenkins on 2015-04-21T19:18Z
Compiled with protoc 2.5.0
>From source with checksum cd78f139c66c13ab5cee96e15a629025
This command was run using /usr/lib/hadoop/hadoop-common-2.6.0-cdh5.4.0.jar

*Error:*
LogType:stderr
Log Upload Time:Tue Oct 20 21:58:56 -0700 2015
LogLength:2334
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/var/lib/hadoop-yarn/cache/yarn/nm-local-dir/filecache/10/simple-yarn-app-1.1.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
15/10/20 21:58:50 INFO spark.SparkContext: *Running Spark version 1.4.0*
15/10/20 21:58:53 INFO spark.SecurityManager: Changing view acls to: yarn
15/10/20 21:58:53 INFO spark.SecurityManager: Changing modify acls to: yarn
15/10/20 21:58:53 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(yarn); users with modify permissions: Set(yarn)
*Exception in thread "main" java.lang.NoSuchMethodError:
org.apache.spark.network.util.JavaUtils.timeStringAsSec(Ljava/lang/String;)J*
at org.apache.spark.util.Utils$.timeStringAsSeconds(Utils.scala:1027)
at org.apache.spark.SparkConf.getTimeAsSeconds(SparkConf.scala:194)
at
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:68)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
at
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1991)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1982)
at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
at org.apache.spark.rpc.akka.AkkaRpcEnvFactory.create(AkkaRpcEnv.scala:245)
at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:52)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:247)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:188)
at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:267)
at org.apache.spark.SparkContext.(SparkContext.scala:424)
at
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
at com.hortonworks.simpleyarnapp.HelloWorld.main(HelloWorld.java:50)
15/10/20 21:58:53 INFO util.Utils: Shutdown hook called

Please help :)

--
Regards and Thanks,
Raghuveer Chanda


Problem with applying Multivariate Gaussian Model

2015-10-21 Thread Eyal Sharon
  Hi ,

I have been trying to apply an Anomaly Detection model  using Spark MLib.
I am using this library

org.apache.spark.mllib.stat.distribution.MultivariateGaussian


As an input, I give the model a mean vector and a Covariance matrix,
assuming my features have Covariance , hence the covariane matrix has
all non zero elements.

The model returns zero for each data point.

While using a model with out covariance, meaning the matrix has zero
in all element except the diagonal. The model is working.

Now, there is a little documentation for this model.

Does anyone have experience applying this model ?

any reference will be welcome too

These are the model input


*mu vector - *

1054.8, 1069.8, 1.3 ,1040.1

*cov matrix - *

165496.0 , 167996.0,  11.0 , 163037.0
167996.0,  170631.0,  19.0,  165405.0
11.0,   19.0 , 0.0,   2.0
163037.0,   165405.0 2.0 ,  160707.0

*and for non covariance case *

165496.0,  0.0 ,   0.0,   0.0
0.0,   170631.0,   0.0,   0.0
0.0 ,   0.0 ,   0.8,   0.0
0.0 ,   0.0,0.0,  160594.2


Thanks !


Eyal

-- 


*This email and any files transmitted with it are confidential and intended 
solely for the use of the individual or entity to whom they are 
addressed. Please note that any disclosure, copying or distribution of the 
content of this information is strictly forbidden. If you have received 
this email message in error, please destroy it immediately and notify its 
sender.*


Re: Spark-Testing-Base Q/A

2015-10-21 Thread Holden Karau
On Wednesday, October 21, 2015, Mark Vervuurt 
wrote:

> Hi Everyone,
>
> I am busy trying out ‘Spark-Testing-Base
> ’. I have the following
> questions?
>
>
>- Can you test Spark Streaming Jobs using Java?
>
> The current base class for testing streaming jobs is implemented using a
Scala test library (and one in Python too), I can add one using a junit
base for streaming if it would be useful for you.

>
>- Can I use Spark-Testing-Base 1.3.0_0.1.1 together with Spark 1.3.1?
>
>  You should be able to, the API changes were small enough I didn't publish
a seperate package, but if you run into any issues let me know.

>
>
>
> Thanks.
>
> Greetings,
> Mark
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


RE: Spark on Yarn

2015-10-21 Thread Jean-Baptiste Onofré


Hi
The compiled version (master side) and client version diverge on spark network 
JavaUtils. You should use the same/aligned version.
RegardsJB


Sent from my Samsung device

 Original message 
From: Raghuveer Chanda  
Date: 21/10/2015  12:33  (GMT+01:00) 
To: user@spark.apache.org 
Subject: Spark on Yarn 

Hi all,
I am trying to run spark on yarn in quickstart cloudera vm.It already has spark 
1.3 and Hadoop 2.6.0-cdh5.4.0 installed.(I am not using spark-submit since I 
want to run a different version of spark). I am able to run spark 1.3 on yarn 
but get the below error for spark 1.4.
The log shows its running on spark 1.4 but still gives a error on a method 
which is present in 1.4 and not 1.3. Even the fat jar contains the class files 
of 1.4.

As far as running in yarn the installed spark version shouldnt matter, but 
still its running on the other version.

Hadoop Version:Hadoop 2.6.0-cdh5.4.0Subversion 
http://github.com/cloudera/hadoop -r 
c788a14a5de9ecd968d1e2666e8765c5f018c271Compiled by jenkins on 
2015-04-21T19:18ZCompiled with protoc 2.5.0From source with checksum 
cd78f139c66c13ab5cee96e15a629025This command was run using 
/usr/lib/hadoop/hadoop-common-2.6.0-cdh5.4.0.jar
Error:LogType:stderrLog Upload Time:Tue Oct 20 21:58:56 -0700 
2015LogLength:2334Log Contents:SLF4J: Class path contains multiple SLF4J 
bindings.SLF4J: Found binding in 
[jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J:
 Found binding in 
[jar:file:/var/lib/hadoop-yarn/cache/yarn/nm-local-dir/filecache/10/simple-yarn-app-1.1.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J:
 See http://www.slf4j.org/codes.html#multiple_bindings for an 
explanation.SLF4J: Actual binding is of type 
[org.slf4j.impl.Log4jLoggerFactory]15/10/20 21:58:50 INFO spark.SparkContext: 
Running Spark version 1.4.015/10/20 21:58:53 INFO spark.SecurityManager: 
Changing view acls to: yarn15/10/20 21:58:53 INFO spark.SecurityManager: 
Changing modify acls to: yarn15/10/20 21:58:53 INFO spark.SecurityManager: 
SecurityManager: authentication disabled; ui acls disabled; users with view 
permissions: Set(yarn); users with modify permissions: Set(yarn)Exception in 
thread "main" java.lang.NoSuchMethodError: 
org.apache.spark.network.util.JavaUtils.timeStringAsSec(Ljava/lang/String;)J   
at org.apache.spark.util.Utils$.timeStringAsSeconds(Utils.scala:1027)   at 
org.apache.spark.SparkConf.getTimeAsSeconds(SparkConf.scala:194) at 
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:68)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) at 
org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53) at 
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1991)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at 
org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1982)at 
org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)   at 
org.apache.spark.rpc.akka.AkkaRpcEnvFactory.create(AkkaRpcEnv.scala:245) at 
org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:52) at 
org.apache.spark.SparkEnv$.create(SparkEnv.scala:247)at 
org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:188)   at 
org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:267) at 
org.apache.spark.SparkContext.(SparkContext.scala:424) at 
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61) at 
com.hortonworks.simpleyarnapp.HelloWorld.main(HelloWorld.java:50)15/10/20 
21:58:53 INFO util.Utils: Shutdown hook called
Please help :)

--Regards and Thanks,Raghuveer Chanda




Re: Spark on Yarn

2015-10-21 Thread Raghuveer Chanda
Hi,

So does this mean I can't run spark 1.4 fat jar on yarn without installing
spark 1.4.

I am including spark 1.4 in my pom.xml so doesn't this mean its compiling
in 1.4.


On Wed, Oct 21, 2015 at 4:38 PM, Jean-Baptiste Onofré 
wrote:

> Hi
>
> The compiled version (master side) and client version diverge on spark
> network JavaUtils. You should use the same/aligned version.
>
> Regards
> JB
>
>
>
> Sent from my Samsung device
>
>
>  Original message 
> From: Raghuveer Chanda 
> Date: 21/10/2015 12:33 (GMT+01:00)
> To: user@spark.apache.org
> Subject: Spark on Yarn
>
> Hi all,
>
> I am trying to run spark on yarn in quickstart cloudera vm.It already has
> spark 1.3 and Hadoop 2.6.0-cdh5.4.0 installed.(I am not using
> spark-submit since I want to run a different version of spark).
>
> I am able to run spark 1.3 on yarn but get the below error for spark 1.4.
>
> The log shows its running on spark 1.4 but still gives a error on a method
> which is present in 1.4 and not 1.3. Even the fat jar contains the class
> files of 1.4.
>
> As far as running in yarn the installed spark version shouldnt matter, but
> still its running on the other version.
>
>
> *Hadoop Version:*
> Hadoop 2.6.0-cdh5.4.0
> Subversion http://github.com/cloudera/hadoop -r
> c788a14a5de9ecd968d1e2666e8765c5f018c271
> Compiled by jenkins on 2015-04-21T19:18Z
> Compiled with protoc 2.5.0
> From source with checksum cd78f139c66c13ab5cee96e15a629025
> This command was run using /usr/lib/hadoop/hadoop-common-2.6.0-cdh5.4.0.jar
>
> *Error:*
> LogType:stderr
> Log Upload Time:Tue Oct 20 21:58:56 -0700 2015
> LogLength:2334
> Log Contents:
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/var/lib/hadoop-yarn/cache/yarn/nm-local-dir/filecache/10/simple-yarn-app-1.1.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> 15/10/20 21:58:50 INFO spark.SparkContext: *Running Spark version 1.4.0*
> 15/10/20 21:58:53 INFO spark.SecurityManager: Changing view acls to: yarn
> 15/10/20 21:58:53 INFO spark.SecurityManager: Changing modify acls to: yarn
> 15/10/20 21:58:53 INFO spark.SecurityManager: SecurityManager:
> authentication disabled; ui acls disabled; users with view permissions:
> Set(yarn); users with modify permissions: Set(yarn)
> *Exception in thread "main" java.lang.NoSuchMethodError:
> org.apache.spark.network.util.JavaUtils.timeStringAsSec(Ljava/lang/String;)J*
> at org.apache.spark.util.Utils$.timeStringAsSeconds(Utils.scala:1027)
> at org.apache.spark.SparkConf.getTimeAsSeconds(SparkConf.scala:194)
> at
> org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:68)
> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
> at
> org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1991)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1982)
> at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
> at org.apache.spark.rpc.akka.AkkaRpcEnvFactory.create(AkkaRpcEnv.scala:245)
> at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:52)
> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:247)
> at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:188)
> at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:267)
> at org.apache.spark.SparkContext.(SparkContext.scala:424)
> at
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
> at com.hortonworks.simpleyarnapp.HelloWorld.main(HelloWorld.java:50)
> 15/10/20 21:58:53 INFO util.Utils: Shutdown hook called
>
> Please help :)
>
> --
> Regards and Thanks,
> Raghuveer Chanda
>



-- 
Regards,
Raghuveer Chanda
Computer Science and Engineering
IIT Kharagpur
+91-9475470374


Re: Spark-Testing-Base Q/A

2015-10-21 Thread Mark Vervuurt
Hi Holden,

Thanks for the information, I think that a Java Base Class in order to test 
SparkStreaming using Java would be useful for the community. Unfortunately not 
all of our customers are willing to use Scala or Python.

If i am not wrong it’s 4:00 AM for you in California ;)

Regards,
Mark

> On 21 Oct 2015, at 12:42, Holden Karau  wrote:
> 
> 
> 
> On Wednesday, October 21, 2015, Mark Vervuurt  > wrote:
> Hi Everyone,
> 
> I am busy trying out ‘Spark-Testing-Base 
> ’. I have the following 
> questions?
> 
> Can you test Spark Streaming Jobs using Java?
> The current base class for testing streaming jobs is implemented using a 
> Scala test library (and one in Python too), I can add one using a junit base 
> for streaming if it would be useful for you. 
> Can I use Spark-Testing-Base 1.3.0_0.1.1 together with Spark 1.3.1?
>  You should be able to, the API changes were small enough I didn't publish a 
> seperate package, but if you run into any issues let me know.
> 
> Thanks.
> 
> Greetings,
> Mark
> 
> 
> -- 
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau 



Re: Spark on Yarn

2015-10-21 Thread Adrian Tanase
The question is the spark dependency is marked as provided or is included in 
the fat jar.

For example, we are compiling the spark distro separately for java 8 + scala 
2.11 + hadoop 2.6 (with maven) and marking it as provided in sbt.

-adrian

From: Raghuveer Chanda
Date: Wednesday, October 21, 2015 at 2:14 PM
To: Jean-Baptiste Onofré
Cc: "user@spark.apache.org"
Subject: Re: Spark on Yarn

Hi,

So does this mean I can't run spark 1.4 fat jar on yarn without installing 
spark 1.4.

I am including spark 1.4 in my pom.xml so doesn't this mean its compiling in 
1.4.


On Wed, Oct 21, 2015 at 4:38 PM, Jean-Baptiste Onofré 
mailto:j...@nanthrax.net>> wrote:
Hi

The compiled version (master side) and client version diverge on spark network 
JavaUtils. You should use the same/aligned version.

Regards
JB



Sent from my Samsung device


 Original message 
From: Raghuveer Chanda 
mailto:raghuveer.cha...@gmail.com>>
Date: 21/10/2015 12:33 (GMT+01:00)
To: user@spark.apache.org
Subject: Spark on Yarn

Hi all,

I am trying to run spark on yarn in quickstart cloudera vm.It already has spark 
1.3 and Hadoop 2.6.0-cdh5.4.0 installed.(I am not using spark-submit since I 
want to run a different version of spark).

I am able to run spark 1.3 on yarn but get the below error for spark 1.4.

The log shows its running on spark 1.4 but still gives a error on a method 
which is present in 1.4 and not 1.3. Even the fat jar contains the class files 
of 1.4.

As far as running in yarn the installed spark version shouldnt matter, but 
still its running on the other version.


Hadoop Version:
Hadoop 2.6.0-cdh5.4.0
Subversion http://github.com/cloudera/hadoop -r 
c788a14a5de9ecd968d1e2666e8765c5f018c271
Compiled by jenkins on 2015-04-21T19:18Z
Compiled with protoc 2.5.0
From source with checksum cd78f139c66c13ab5cee96e15a629025
This command was run using /usr/lib/hadoop/hadoop-common-2.6.0-cdh5.4.0.jar

Error:
LogType:stderr
Log Upload Time:Tue Oct 20 21:58:56 -0700 2015
LogLength:2334
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/var/lib/hadoop-yarn/cache/yarn/nm-local-dir/filecache/10/simple-yarn-app-1.1.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
15/10/20 21:58:50 INFO spark.SparkContext: Running Spark version 1.4.0
15/10/20 21:58:53 INFO spark.SecurityManager: Changing view acls to: yarn
15/10/20 21:58:53 INFO spark.SecurityManager: Changing modify acls to: yarn
15/10/20 21:58:53 INFO spark.SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(yarn); users with 
modify permissions: Set(yarn)
Exception in thread "main" java.lang.NoSuchMethodError: 
org.apache.spark.network.util.JavaUtils.timeStringAsSec(Ljava/lang/String;)J
at org.apache.spark.util.Utils$.timeStringAsSeconds(Utils.scala:1027)
at org.apache.spark.SparkConf.getTimeAsSeconds(SparkConf.scala:194)
at 
org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:68)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
at 
org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1991)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1982)
at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
at org.apache.spark.rpc.akka.AkkaRpcEnvFactory.create(AkkaRpcEnv.scala:245)
at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:52)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:247)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:188)
at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:267)
at org.apache.spark.SparkContext.(SparkContext.scala:424)
at org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
at com.hortonworks.simpleyarnapp.HelloWorld.main(HelloWorld.java:50)
15/10/20 21:58:53 INFO util.Utils: Shutdown hook called

Please help :)

--
Regards and Thanks,
Raghuveer Chanda



--
Regards,
Raghuveer Chanda
Computer Science and Engineering
IIT Kharagpur
+91-9475470374


Inner Joins on Cassandra RDDs

2015-10-21 Thread Priya Ch
Hello All,

   I have two Cassandra RDDs. I am using joinWithCassandraTable which is
doing a cartesian join because of which we are getting unwanted rows.


How to perform inner join on Cassandra RDDs ? If I intend to use normal
join, i have to read entire table which is costly.

Is there any specific transformations available that enable inner joins ??

Regards,
Padma CH


Re: Spark on Yarn

2015-10-21 Thread Raghuveer Chanda
Please find the attached pom.xml. I am using maven to build the fat jar and
trying to run it in yarn using

*hadoop jar simple-yarn-app-master/target/simple-yarn-app-1.1.0-shaded.jar
com.hortonworks.simpleyarnapp.Client
hdfs://quickstart.cloudera:8020/simple-yarn-app-1.1.0-shaded.jar*

Basically I am following the below code and changed the Application Master
to run a Spark application class.

https://github.com/hortonworks/simple-yarn-app

It works for 1.3 the installed version in cdh but throws error for 1.4.
When I am bundling the spark within the jar it shouldn't be the case right ?



On Wed, Oct 21, 2015 at 5:11 PM, Adrian Tanase  wrote:

> The question is the spark dependency is marked as provided or is included
> in the fat jar.
>
> For example, we are compiling the spark distro separately for java 8 +
> scala 2.11 + hadoop 2.6 (with maven) and marking it as provided in sbt.
>
> -adrian
>
> From: Raghuveer Chanda
> Date: Wednesday, October 21, 2015 at 2:14 PM
> To: Jean-Baptiste Onofré
> Cc: "user@spark.apache.org"
> Subject: Re: Spark on Yarn
>
> Hi,
>
> So does this mean I can't run spark 1.4 fat jar on yarn without installing
> spark 1.4.
>
> I am including spark 1.4 in my pom.xml so doesn't this mean its compiling
> in 1.4.
>
>
> On Wed, Oct 21, 2015 at 4:38 PM, Jean-Baptiste Onofré 
> wrote:
>
>> Hi
>>
>> The compiled version (master side) and client version diverge on spark
>> network JavaUtils. You should use the same/aligned version.
>>
>> Regards
>> JB
>>
>>
>>
>> Sent from my Samsung device
>>
>>
>>  Original message 
>> From: Raghuveer Chanda 
>> Date: 21/10/2015 12:33 (GMT+01:00)
>> To: user@spark.apache.org
>> Subject: Spark on Yarn
>>
>> Hi all,
>>
>> I am trying to run spark on yarn in quickstart cloudera vm.It already
>> has spark 1.3 and Hadoop 2.6.0-cdh5.4.0 installed.(I am not using
>> spark-submit since I want to run a different version of spark).
>>
>> I am able to run spark 1.3 on yarn but get the below error for spark 1.4.
>>
>> The log shows its running on spark 1.4 but still gives a error on a
>> method which is present in 1.4 and not 1.3. Even the fat jar contains the
>> class files of 1.4.
>>
>> As far as running in yarn the installed spark version shouldnt matter,
>> but still its running on the other version.
>>
>>
>> *Hadoop Version:*
>> Hadoop 2.6.0-cdh5.4.0
>> Subversion http://github.com/cloudera/hadoop -r
>> c788a14a5de9ecd968d1e2666e8765c5f018c271
>> Compiled by jenkins on 2015-04-21T19:18Z
>> Compiled with protoc 2.5.0
>> From source with checksum cd78f139c66c13ab5cee96e15a629025
>> This command was run using
>> /usr/lib/hadoop/hadoop-common-2.6.0-cdh5.4.0.jar
>>
>> *Error:*
>> LogType:stderr
>> Log Upload Time:Tue Oct 20 21:58:56 -0700 2015
>> LogLength:2334
>> Log Contents:
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/var/lib/hadoop-yarn/cache/yarn/nm-local-dir/filecache/10/simple-yarn-app-1.1.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> 15/10/20 21:58:50 INFO spark.SparkContext: *Running Spark version 1.4.0*
>> 15/10/20 21:58:53 INFO spark.SecurityManager: Changing view acls to: yarn
>> 15/10/20 21:58:53 INFO spark.SecurityManager: Changing modify acls to:
>> yarn
>> 15/10/20 21:58:53 INFO spark.SecurityManager: SecurityManager:
>> authentication disabled; ui acls disabled; users with view permissions:
>> Set(yarn); users with modify permissions: Set(yarn)
>> *Exception in thread "main" java.lang.NoSuchMethodError:
>> org.apache.spark.network.util.JavaUtils.timeStringAsSec(Ljava/lang/String;)J*
>> at org.apache.spark.util.Utils$.timeStringAsSeconds(Utils.scala:1027)
>> at org.apache.spark.SparkConf.getTimeAsSeconds(SparkConf.scala:194)
>> at
>> org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:68)
>> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54)
>> at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53)
>> at
>> org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1991)
>> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
>> at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1982)
>> at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56)
>> at
>> org.apache.spark.rpc.akka.AkkaRpcEnvFactory.create(AkkaRpcEnv.scala:245)
>> at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:52)
>> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:247)
>> at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:188)
>> at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:267)
>> at org.apache.spark.SparkContext.(SparkContext.scala:424)

spark SQL thrift server - support for more features via jdbc (table catalog)

2015-10-21 Thread rkrist
Hello,

there seems to be missing support for some operations in spark SQL thrift
server. To be more specific - when connected to our spark SQL instance
(1.5.1, standallone deployment) from standard jdbc sql client (squirrel SQL
and few others) via the thrift server, sql query processing seem to work
fine except of one thing - sql client is unable to fetch the list of tables
(objects) present in the spark SQL instance. Tables can be used in sql
queries, which work perfect, but are not visible in the list of objects,
which displays single database/tablespace (default) containing empty
sub-folders (INDEX_TABLE, TABLE, VIEW, UDT).
This makes advanced features of sql client (like code completion and syntax
highlighting) not to work as expected and writing sql queries is not as
comfortable as with ordinary DBMS.

After a short debugging session I found out, that problem seems to be in the
implementation of
org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager,
which overrides only single kind of operation - execute statement, leaving
other operations to their default Hive implementations (returning empty
database in our case). SQL clients seem to be using standard JDBC API calls
to query database tables and their metadata which will end up in
org.apache.hive.service.cli.operation.OperationManager (others than execute
statement - get catalogs, get tables,...) and as there is no Spark
SQL-specific operation, tables registered within Spark SQL are not visible
in those calls.

Is it a known/desired behavior, or is there any plan to add the
implementation of the missing jdbc operations to the
SparkSQLOperationManager to provide the full-featured JDBC functionality?

Thank You for Your answer (and for the great work You are doing there).

Best regards

Rastislav Krist





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-SQL-thrift-server-support-for-more-features-via-jdbc-table-catalog-tp25155.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Lost leader exception in Kafka Direct for Streaming

2015-10-21 Thread Cody Koeninger
You can try running the driver in the cluster manager with --supervise, but
that's basically the same as restarting it when it fails.

There is no reasonable automatic "recovery" when something is fundamentally
wrong with your kafka cluster.

On Wed, Oct 21, 2015 at 12:46 AM, swetha kasireddy <
swethakasire...@gmail.com> wrote:

> Hi Cody,
>
> What other options do I have other than monitoring and restarting the job?
> Can the job recover automatically?
>
> Thanks,
> Sweth
>
> On Thu, Oct 1, 2015 at 7:18 AM, Cody Koeninger  wrote:
>
>> Did you check you kafka broker logs to see what was going on during that
>> time?
>>
>> The direct stream will handle normal leader loss / rebalance by retrying
>> tasks.
>>
>> But the exception you got indicates that something with kafka was wrong,
>> such that offsets were being re-used.
>>
>> ie. your job already processed up through beginning offset 15027734702
>>
>> but when asking kafka for the highest available offsets, it returns
>> ending offset 15027725493
>>
>> which is lower, in other words kafka lost messages.  This might happen
>> because you lost a leader and recovered from a replica that wasn't in sync,
>> or someone manually screwed up a topic, or ... ?
>>
>> If you really want to just blindly "recover" from this situation (even
>> though something is probably wrong with your data), the most
>> straightforward thing to do is monitor and restart your job.
>>
>>
>>
>>
>> On Wed, Sep 30, 2015 at 4:31 PM, swetha 
>> wrote:
>>
>>>
>>> Hi,
>>>
>>> I see this sometimes in our Kafka Direct approach in our Streaming job.
>>> How
>>> do we make sure that the job recovers from such errors and works normally
>>> thereafter?
>>>
>>> 15/09/30 05:14:18 ERROR KafkaRDD: Lost leader for topic x_stream
>>> partition
>>> 19,  sleeping for 200ms
>>> 15/09/30 05:14:18 ERROR KafkaRDD: Lost leader for topic x_stream
>>> partition
>>> 5,  sleeping for 200ms
>>>
>>> Followed by every task failing with something like this:
>>>
>>> 15/09/30 05:26:20 ERROR Executor: Exception in task 4.0 in stage 84281.0
>>> (TID 818804)
>>> kafka.common.NotLeaderForPartitionException
>>>
>>> And:
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 15
>>> in stage 84958.0 failed 4 times, most recent failure: Lost task 15.3 in
>>> stage 84958.0 (TID 819461, 10.227.68.102): java.lang.AssertionError:
>>> assertion failed: Beginning offset 15027734702 is after the ending
>>> offset
>>> 15027725493 for topic hubble_stream partition 12. You either provided an
>>> invalid fromOffset, or the Kafka topic has been damaged
>>>
>>>
>>> Thanks,
>>> Swetha
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Lost-leader-exception-in-Kafka-Direct-for-Streaming-tp24891.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Can we add an unsubscribe link in the footer of every email?

2015-10-21 Thread Nicholas Chammas
Every week or so someone emails the list asking to unsubscribe.

Of course, that's not the right way to do it. You're supposed to email
a different
address  than this one to
unsubscribe, yet this is not in-your-face obvious, so many people miss it.
And someone steps up almost every time to point people in the right
direction.

The vast majority of mailing lists I'm familiar with include a small footer
at the bottom of each email with a link to unsubscribe. I think this is
what most people expect, and it's where they check first.

Can we add a footer like that?

I think it would cut down on the weekly emails from people wanting to
unsubscribe, and it would match existing mailing list conventions elsewhere.

Nick


Spark 1.5.1 with Hive 0.13.1

2015-10-21 Thread Sébastien Rainville
Hi,

I'm trying to get Spark 1.5.1 to work with Hive 0.13.1. I set the following
properties in spark-defaults.conf:

spark.sql.hive.metastore.version 0.13.1
spark.sql.hive.metastore.jars
/usr/lib/hadoop/client/*:/opt/hive/current/lib/*


but I get the following exception when launching the shell:

java.lang.ClassNotFoundException: java.lang.NoClassDefFoundError:
com/google/common/base/Preconditions when creating Hive client using
classpath: ...

Please make sure that jars for your version of hive and hadoop are included
in the paths passed to SQLConfEntry(key = spark.sql.hive.metastore.jars,
defaultValue=builtin, doc=

 Location of the jars that should be used to instantiate the
HiveMetastoreClient.

 This property can be one of three options: "

 1. "builtin"

   Use Hive 1.2.1, which is bundled with the Spark assembly jar when

   -Phive is enabled. When this option is chosen,

   spark.sql.hive.metastore.version must be either

   1.2.1 or not defined.

 2. "maven"

   Use Hive jars of specified version downloaded from Maven repositories.

 3. A classpath in the standard format for both Hive and Hadoop.

, isPublic = true).

at
org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:189)

at
org.apache.spark.sql.hive.client.IsolatedClientLoader.(IsolatedClientLoader.scala:179)

at
org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:263)

at
org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:185)

at
org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:392)

at
org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:235)

at
org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:234)

at scala.collection.Iterator$class.foreach(Iterator.scala:727)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at org.apache.spark.sql.SQLContext.(SQLContext.scala:234)

at
org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:72)


I cut the classpath portion for brevity, but it does include
guava-11.0.2.jar, so in theory that class should be available. It works if
I add it to the driver's classpath:

spark.driver.extraClassPath /usr/lib/hadoop/client/guava-11.0.2.jar

This works in most of my use cases except when my own assembly jar has a
dependency on a different version of guava and then my job fails with
runtime exceptions because of incompatible guava classes.

Sounds like I shouldn't set spark.driver.extraClassPath for this. Am I
doing something wrong? Is this a bug in Spark? Could it be somehow related
to the shading of guava that spark does? The following line seem suspicious
because it basically says that for guava classes the regular spark class
loader should be used but then it cannot find them:

https://github.com/apache/spark/blob/v1.5.1/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/IsolatedClientLoader.scala#L135

Thanks,

- Sebastien


Re: Can we add an unsubscribe link in the footer of every email?

2015-10-21 Thread Ted Yu
The number of occurrences of such incidence is low.

I think currently we don't need to add the footer. I checked several other
Apache projects whose user@ I subscribe to - there is no such footer.

Cheers

On Wed, Oct 21, 2015 at 7:38 AM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> Every week or so someone emails the list asking to unsubscribe.
>
> Of course, that's not the right way to do it. You're supposed to email a 
> different
> address  than this one to
> unsubscribe, yet this is not in-your-face obvious, so many people miss it.
> And someone steps up almost every time to point people in the right
> direction.
>
> The vast majority of mailing lists I'm familiar with include a small
> footer at the bottom of each email with a link to unsubscribe. I think this
> is what most people expect, and it's where they check first.
>
> Can we add a footer like that?
>
> I think it would cut down on the weekly emails from people wanting to
> unsubscribe, and it would match existing mailing list conventions elsewhere.
>
> Nick
>
>


How to check whether the RDD is empty or not

2015-10-21 Thread diplomatic Guru
Hello All,

I have a Spark Streaming job that should  do some action only if the RDD is
not empty. This can be done easily with the spark batch RDD as I could
.take(1) and check whether it is empty or  not. But this cannot been done
in Spark Streaming DStrem


JavaPairInputDStream input =
ssc.fileStream(iFolder, LongWritable.class,Text.class,
TextInputFormat.class);

 if(inputLines!=null){
//do some action if it is not empty
}

Any ideas please?


Re: How to check whether the RDD is empty or not

2015-10-21 Thread diplomatic Guru
I tried below code but still carrying out the action even though there
is no new data.

JavaPairInputDStream input =
ssc.fileStream(iFolder, LongWritable.class,Text.class,
TextInputFormat.class);

 if(input != null){
//do some action if it is not empty
}


On 21 October 2015 at 18:00, diplomatic Guru 
wrote:

>
> Hello All,
>
> I have a Spark Streaming job that should  do some action only if the RDD
> is not empty. This can be done easily with the spark batch RDD as I could
> .take(1) and check whether it is empty or  not. But this cannot been done
> in Spark Streaming DStrem
>
>
> JavaPairInputDStream input = ssc.fileStream(iFolder, 
> LongWritable.class,Text.class, TextInputFormat.class);
>
>  if(inputLines!=null){
> //do some action if it is not empty
> }
>
> Any ideas please?
>
>
>
>


[Spark Streaming] Design Patterns forEachRDD

2015-10-21 Thread Nipun Arora
Hi All,

Can anyone provide a design pattern for the following code shown in the
Spark User Manual, in JAVA ? I have the same exact use-case, and for some
reason the design pattern for Java is missing.

 Scala version taken from :
http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
  }}


I have googled for it and haven't really found a solution. This seems to be
an important piece of information, especially for people who need to ship
their code necessarily in Java because of constraints in the company (like
me) :)

I'd really appreciate any help

Thanks
Nipun


Re: Spark_1.5.1_on_HortonWorks

2015-10-21 Thread Ajay Chander
Thanks for your kind inputs. Right now I am running spark-1.3.1 on YARN(4
node cluster) on a HortonWorks distribution. Now I want to upgrade
spark-1.3.1 to
spark-1.5.1. So at this point of time, do I have to manually go and copy
spark-1.5.1 tarbal to all the nodes or is there any alternative so that I
can get it upgraded through Ambari UI ? If possible can anyone point me to
a documentation online? Thank you.

Regards,
Ajay

On Wednesday, October 21, 2015, Saisai Shao  wrote:

> Hi Frans,
>
> You could download Spark 1.5.1-hadoop 2.6 pre-built tarball and copy into
> HDP 2.3 sandbox or master node. Then copy all the conf files from
> /usr/hdp/current/spark-client/ to your /conf, or you could
> refer to this tech preview (
> http://hortonworks.com/hadoop-tutorial/apache-spark-1-4-1-technical-preview-with-hdp/
> ), in "installing chapter", step 4 ~ 8 is what you need to do.
>
> Thanks
> Saisai
>
> On Wed, Oct 21, 2015 at 1:27 PM, Frans Thamura  > wrote:
>
>> Doug
>>
>> is it possible to put in HDP 2.3?
>>
>> esp in Sandbox
>>
>> can share how do you install it?
>>
>>
>> F
>> --
>> Frans Thamura (曽志胜)
>> Java Champion
>> Shadow Master and Lead Investor
>> Meruvian.
>> Integrated Hypermedia Java Solution Provider.
>>
>> Mobile: +628557888699
>> Blog: http://blogs.mervpolis.com/roller/flatburger (id)
>>
>> FB: http://www.facebook.com/meruvian
>> TW: http://www.twitter.com/meruvian / @meruvian
>> Website: http://www.meruvian.org
>>
>> "We grow because we share the same belief."
>>
>>
>> On Wed, Oct 21, 2015 at 12:24 PM, Doug Balog > > wrote:
>> > I have been running 1.5.1 with Hive in secure mode on HDP 2.2.4 without
>> any problems.
>> >
>> > Doug
>> >
>> >> On Oct 21, 2015, at 12:05 AM, Ajay Chander > > wrote:
>> >>
>> >> Hi Everyone,
>> >>
>> >> Any one has any idea if spark-1.5.1 is available as a service on
>> HortonWorks ? I have spark-1.3.1 installed on the Cluster and it is a
>> HortonWorks distribution. Now I want upgrade it to spark-1.5.1. Anyone here
>> have any idea about it? Thank you in advance.
>> >>
>> >> Regards,
>> >> Ajay
>> >
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> 
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> 
>> >
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> 
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 
>>
>>
>


Re: [Spark Streaming] Design Patterns forEachRDD

2015-10-21 Thread Sandip Mehta
Does this help ?

final JavaHBaseContext hbaseContext = new JavaHBaseContext(javaSparkContext, 
conf);
customerModels.foreachRDD(new Function, Void>() {
  private static final long serialVersionUID = 1L;
  @Override
  public Void call(JavaRDD currentRDD) throws Exception {
JavaRDD customerWithPromotion = 
hbaseContext.mapPartition(currentRDD, new PromotionLookupFunction());
customerWithPromotion.persist(StorageLevel.MEMORY_AND_DISK_SER());
customerWithPromotion.foreachPartition();
  }
});


> On 21-Oct-2015, at 10:55 AM, Nipun Arora  wrote:
> 
> Hi All,
> 
> Can anyone provide a design pattern for the following code shown in the Spark 
> User Manual, in JAVA ? I have the same exact use-case, and for some reason 
> the design pattern for Java is missing.
> 
>  Scala version taken from : 
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>  
> 
> 
> dstream.foreachRDD { rdd =>
>   rdd.foreachPartition { partitionOfRecords =>
> val connection = createNewConnection()
> partitionOfRecords.foreach(record => connection.send(record))
> connection.close()
>   }
> }
> 
> I have googled for it and haven't really found a solution. This seems to be 
> an important piece of information, especially for people who need to ship 
> their code necessarily in Java because of constraints in the company (like 
> me) :)
> 
> I'd really appreciate any help
> 
> Thanks
> Nipun



dataframe average error: Float does not take parameters

2015-10-21 Thread Carol McDonald
This used to work :

// What's the min number of bids per item? what's the average? what's the
max?
auction.groupBy("item", "auctionid").count.agg(min("count"),
avg("count"),max("count")).show

// MIN(count) AVG(count)MAX(count)
// 1  16.992025518341308 75

but this now gives an error

val res = auction.groupBy("item", "auctionid").count.agg(min("count"),
avg("count"),max("count"))

:42: error: Float does not take parameters

val res = auction.groupBy("item", "auctionid").count.agg(min("count"),
avg("count"),max("count"))

min and max still work .

Do I need to cast the count to a float ?

auction.groupBy("item", "auctionid").count.agg(min("count"),
max("count")).show

MIN(count) MAX(count)
 1  75


Re: How to distinguish columns when joining DataFrames with shared parent?

2015-10-21 Thread Ali Tajeldin EDU
Furthermore, even adding aliasing as suggested by the warning doesn't seem to 
help either.  Slight modification to example below:

> scala> val largeValues = df.filter('value >= 10).as("lv")

And just looking at the join results:
> scala> val j = smallValues
>   .join(largeValues, smallValues("key") === largeValues("key"))

scala> j.select($"value").show
This will throw an exception indicating that "value" is ambiguous (to be 
expected).

scala> j.select(smallValues("value")).show
This will show the left (small values) "values" column as expected.

scala> j.select(largeValues("value")).show
This will show the left (small values) "values" column (resolved to the wrong 
column)

scala> j.select(largeValues("lv.value")).show
This will show the left (small values) "values" column (resolved to the wrong 
column even though we explicitly specified the alias and used the right hand df)

scala> j.select($"lv.value").show
Produces a cannot resolve 'lv.value' exception (so the lv alias is not 
preserved in the join result).

Anyone know the appropriate way to use the aliases in DataFrame operations or 
is this a bug?
--
Ali


On Oct 20, 2015, at 5:23 PM, Isabelle Phan  wrote:

> Hello,
> 
> When joining 2 DataFrames which originate from the same initial DataFrame, 
> why can't org.apache.spark.sql.DataFrame.apply(colName: String) method 
> distinguish which column to read?
> 
> Let me illustrate this question with a simple example (ran on Spark 1.5.1):
> 
> //my initial DataFrame
> scala> df
> res39: org.apache.spark.sql.DataFrame = [key: int, value: int]
> 
> scala> df.show
> +---+-+
> |key|value|
> +---+-+
> |  1|1|
> |  1|   10|
> |  2|3|
> |  3|   20|
> |  3|5|
> |  4|   10|
> +---+-+
> 
> 
> //2 children DataFrames
> scala> val smallValues = df.filter('value < 10)
> smallValues: org.apache.spark.sql.DataFrame = [key: int, value: int]
> 
> scala> smallValues.show
> +---+-+
> |key|value|
> +---+-+
> |  1|1|
> |  2|3|
> |  3|5|
> +---+-+
> 
> 
> scala> val largeValues = df.filter('value >= 10)
> largeValues: org.apache.spark.sql.DataFrame = [key: int, value: int]
> 
> scala> largeValues.show
> +---+-+
> |key|value|
> +---+-+
> |  1|   10|
> |  3|   20|
> |  4|   10|
> +---+-+
> 
> 
> //Joining the children
> scala> smallValues
>   .join(largeValues, smallValues("key") === largeValues("key"))
>   .withColumn("diff", smallValues("value") - largeValues("value"))
>   .show
> 15/10/20 16:59:59 WARN Column: Constructing trivially true equals predicate, 
> 'key#41 = key#41'. Perhaps you need to use aliases.
> +---+-+---+-++
> |key|value|key|value|diff|
> +---+-+---+-++
> |  1|1|  1|   10|   0|
> |  3|5|  3|   20|   0|
> +---+-+---+-++
> 
> 
> This last command issued a warning, but still executed the join correctly 
> (rows with key 2 and 4 don't appear in result set). However, the "diff" 
> column is incorrect.
> 
> Is this a bug or am I missing something here?
> 
> 
> Thanks a lot for any input,
> 
> Isabelle



Mapping to multiple groups in Apache Spark

2015-10-21 Thread Jeffrey Richley
I am in a situation where I am using Apache Spark and its map/reduce
functionality. I am now at a stage where I have been able to map to a data
set that conceptually has many "rows" of data.

Now what I am needing is to do a reduce which usually is a straight forward
thing. My real need though is to reduce on "overlapping" rows. For example,
the first reduce uses "rows" 1-30, the second uses 11-40, the third 21-50
and so on. How would this work in a Spark environment?

I appreciate any insight or directions anyone can give,

Jeff Richley


Re: dataframe average error: Float does not take parameters

2015-10-21 Thread Ali Tajeldin EDU
Which version of Spark are you using?  I just tried the example below on 1.5.1 
and it seems to work as expected:

scala> val res = df.groupBy("key").count.agg(min("count"), avg("count"))
res: org.apache.spark.sql.DataFrame = [min(count): bigint, avg(count): double]

scala> res.show
+--+--+
|min(count)|avg(count)|
+--+--+
| 1|   1.0|
+--+--+


scala> res.printSchema
root
 |-- min(count): long (nullable = true)
 |-- avg(count): double (nullable = true)

On Oct 21, 2015, at 11:12 AM, Carol McDonald  wrote:

> This used to work : 
> 
> // What's the min number of bids per item? what's the average? what's the 
> max? 
> auction.groupBy("item", "auctionid").count.agg(min("count"), 
> avg("count"),max("count")).show
> 
> // MIN(count) AVG(count)MAX(count)
> // 1  16.992025518341308 75
> 
> but this now gives an error
> 
> val res = auction.groupBy("item", "auctionid").count.agg(min("count"), 
> avg("count"),max("count"))
> 
> :42: error: Float does not take parameters
> 
> val res = auction.groupBy("item", "auctionid").count.agg(min("count"), 
> avg("count"),max("count"))
> 
> min and max still work . 
> 
> Do I need to cast the count to a float ? 
> 
> auction.groupBy("item", "auctionid").count.agg(min("count"), 
> max("count")).show
> 
> MIN(count) MAX(count)  
>  1  75  



Re: can I use Spark as alternative for gem fire cache ?

2015-10-21 Thread Jags Ramnarayanan
Kali,
   This is possible depending on the access pattern by your ETL logic. If
you only read (no point mutations) and you can pay the additional price of
having to scan your dimension data each time you have to lookup something
then spark could work out. Note that a KV RDD isn't really a Map
internally. Each partition will scan looking for your key.

btw, if you are planning to avoid using GemFire being a commercial product,
it is now also a Apache Incubation project  - Geode
.
And, Geode also has a Spark connector

that makes working with GemFire region data (in parallel from each spark
partition) a breeze - any region is visible as a RDD.

-- Jags
(www.snappydata.io)

On Tue, Oct 20, 2015 at 2:42 AM, Deenar Toraskar 
wrote:

> Kali
>
> >> can I cache a RDD in memory for a whole day ? as of I know RDD will get
> empty once the spark code finish executing (correct me if I am wrong).
>
> Spark can definitely be used as a replacement for in memory databases for
> certain use cases. Spark RDDs are not shared amongst contextss. You need a
> long running Spark context and a REST API  (see JobServer) or some other
> RPC mechanism to allow clients access information from the cached RDD in
> the long running context.
>
> Things to note are RDDs are immutable and do not support granular updates
> and operations like key value lookups out of the box (though IndexedRDD
> addresses some of these use cases). Spark will not be suitable for all IMDB
> usecases. If you are using IMDBs for aggregation and reporting, Spark is a
> much better fit. If you are using IMDBs for maintaining shared mutable
> state then Spark is not designed for these use cases.
>
> Hope that helps.
>
> Deenar
>
>
>
> Deenar
>
> On 17 October 2015 at 19:05, Ndjido Ardo Bar  wrote:
>
>> Hi Kali,
>>
>> If I do understand you well, Tachyon ( http://tachyon-project.org) can
>> be good alternative. You can use Spark Api to load and persist data into
>> Tachyon.
>> Hope that will help.
>>
>> Ardo
>>
>> > On 17 Oct 2015, at 15:28, "kali.tumm...@gmail.com" <
>> kali.tumm...@gmail.com> wrote:
>> >
>> > Hi All,
>> >
>> > Can spark be used as an alternative to gem fire cache ? we use gem fire
>> > cache to save (cache) dimension data in memory which is later used by
>> our
>> > Java custom made ETL tool can I do something like below ?
>> >
>> > can I cache a RDD in memory for a whole day ? as of I know RDD will get
>> > empty once the spark code finish executing (correct me if I am wrong).
>> >
>> > Spark:-
>> > create a RDD
>> > rdd.persistance
>> >
>> > Thanks
>> >
>> >
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/can-I-use-Spark-as-alternative-for-gem-fire-cache-tp25106.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> >
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: How to distinguish columns when joining DataFrames with shared parent?

2015-10-21 Thread Michael Armbrust
Unfortunately, the mechanisms that we use to differentiate columns
automatically don't work particularly well in the presence of self joins.
However, you can get it work if you use the $"column" syntax consistently:

val df = Seq((1, 1), (1, 10), (2, 3), (3, 20), (3, 5), (4,
10)).toDF("key", "value")val smallValues = df.filter('value <
10).as("sv")val largeValues = df.filter('value >= 10).as("lv")
​
smallValues
  .join(largeValues, $"sv.key" === $"lv.key")
  .select($"sv.key".as("key"), $"sv.value".as("small_value"),
$"lv.value".as("large_value"))
  .withColumn("diff", $"small_value" - $"large_value")
  .show()
+---+---+---++|key|small_value|large_value|diff|+---+---+---++|
 1|  1| 10|  -9||  3|  5| 20|
-15|+---+---+---++


The problem with the other cases is that calling smallValues("columnName")
or largeValues("columnName") is eagerly resolving the attribute to the same
column (since the data is actually coming from the same place).  By the
time we realize that you are joining the data with itself (at which point
we rewrite one side of the join to use different expression ids) its too
late.  At the core the problem is that in Scala we have no easy way to
differentiate largeValues("columnName") from smallValues("columnName").
This is because the data is coming from the same DataFrame and we don't
actually know which variable name you are using.  There are things we can
change here, but its pretty hard to change the semantics without breaking
other use cases.

So, this isn't a straight forward "bug", but its definitely a usability
issue.  For now, my advice would be: only use unresolved columns (i.e.
$"[alias.]column" or col("[alias.]column")) when working with self joins.

Michael


Kafka Streaming and Filtering > 3000 partitons

2015-10-21 Thread Dave Ariens
Hey folks,

I have a very large number of Kafka topics (many thousands of partitions) that 
I want to consume, filter based on topic-specific filters, then produce back to 
filtered topics in Kafka.

Using the receiver-less based approach with Spark 1.4.1 (described 
here) 
I am able to use either KafkaUtils.createDirectStream or KafkaUtils.createRDD, 
consume from many topics, and filter them with the same filters but I can't 
seem to wrap my head around how to apply topic-specific filters, or to finally 
produce to topic-specific destination topics.

Another point would be that I will need to checkpoint the metadata after each 
successful batch and set starting offsets per partition back to ZK.  I expect I 
can do that on the final RDDs after casting them accordingly, but if anyone has 
any expertise/guidance doing that and is willing to share, I'd be pretty 
grateful.


Re: SF Spark Office Hours Experiment - Friday Afternoon

2015-10-21 Thread Jacek Laskowski
Hi Holden,

What a great idea! I'd love to join, but since I'm in Europe it's not
gonna happen by this Fri. Any plans to visit Europe or perhaps Warsaw,
Poland and host office hours here? ;-)

p.s. What about an virtual event with Google Hangout on Air on?

Pozdrawiam,
Jacek

--
Jacek Laskowski | http://blog.japila.pl | http://blog.jaceklaskowski.pl
Follow me at https://twitter.com/jaceklaskowski
Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski


On Wed, Oct 21, 2015 at 12:55 AM, Holden Karau  wrote:
> Hi SF based folks,
>
> I'm going to try doing some simple office hours this Friday afternoon
> outside of Paramo Coffee. If no one comes by I'll just be drinking coffee
> hacking on some Spark PRs so if you just want to hangout and hack on Spark
> as a group come by too. (See
> https://twitter.com/holdenkarau/status/656592409455779840 ). If you have any
> questions you'd like to ask in particular shoot me an e-mail in advance I'll
> even try and be prepared.
>
> Cheers,
>
> Holden :)
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Using spark in cluster mode

2015-10-21 Thread Jacek Laskowski
Hi,

Start here -> 
http://spark.apache.org/docs/latest/programming-guide.html#resilient-distributed-datasets-rdds
and then hop to
http://spark.apache.org/docs/latest/spark-standalone.html. Once done,
be back with your questions. I think it's gonna help a lot.

Pozdrawiam,
Jacek

--
Jacek Laskowski | http://blog.japila.pl | http://blog.jaceklaskowski.pl
Follow me at https://twitter.com/jaceklaskowski
Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski


On Tue, Oct 20, 2015 at 5:48 PM, masoom alam  wrote:
> Dear all
>
> I want to setup spark in cluster mode. The problem is that each worker node
> is looking for a file to process.in its local directory.is it
> possible to setup some thing hdfs so that each worker node take  its part of
> a file from hdfsany good tutorials for this?
>
> Thanks

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka Streaming and Filtering > 3000 partitons

2015-10-21 Thread Cody Koeninger
The rdd partitions are 1:1 with kafka topicpartitions, so you can use
offsets ranges to figure out which topic a given rdd partition is for and
proceed accordingly.  See the kafka integration guide in the spark
streaming docs for more details, or
https://github.com/koeninger/kafka-exactly-once

As far as setting offsets in ZK, there's a private interface in the spark
codebase that would make it a little easier for you to do that.  You can
see that code for reference, or there's an outstanding ticket for making it
public https://issues.apache.org/jira/browse/SPARK-10963

On Wed, Oct 21, 2015 at 1:50 PM, Dave Ariens  wrote:

> Hey folks,
>
>
>
> I have a very large number of Kafka topics (many thousands of partitions)
> that I want to consume, filter based on topic-specific filters, then
> produce back to filtered topics in Kafka.
>
>
>
> Using the receiver-less based approach with Spark 1.4.1 (described here
> )
> I am able to use either KafkaUtils.createDirectStream or
> KafkaUtils.createRDD, consume from many topics, and filter them with the
> same filters but I can't seem to wrap my head around how to apply
> topic-specific filters, or to finally produce to topic-specific destination
> topics.
>
>
>
> Another point would be that I will need to checkpoint the metadata after
> each successful batch and set starting offsets per partition back to ZK.  I
> expect I can do that on the final RDDs after casting them accordingly, but
> if anyone has any expertise/guidance doing that and is willing to share,
> I'd be pretty grateful.
>


Re: How to check whether the RDD is empty or not

2015-10-21 Thread Tathagata Das
What do you mean by checking when a "DStream is empty"? DStream represents
an endless stream of data, and at point of time checking whether it is
empty or not does not make sense.

FYI, there is RDD.isEmpty()



On Wed, Oct 21, 2015 at 10:03 AM, diplomatic Guru 
wrote:

> I tried below code but still carrying out the action even though there is no 
> new data.
>
> JavaPairInputDStream input = ssc.fileStream(iFolder, 
> LongWritable.class,Text.class, TextInputFormat.class);
>
>  if(input != null){
> //do some action if it is not empty
> }
>
>
> On 21 October 2015 at 18:00, diplomatic Guru 
> wrote:
>
>>
>> Hello All,
>>
>> I have a Spark Streaming job that should  do some action only if the RDD
>> is not empty. This can be done easily with the spark batch RDD as I could
>> .take(1) and check whether it is empty or  not. But this cannot been done
>> in Spark Streaming DStrem
>>
>>
>> JavaPairInputDStream input = ssc.fileStream(iFolder, 
>> LongWritable.class,Text.class, TextInputFormat.class);
>>
>>  if(inputLines!=null){
>> //do some action if it is not empty
>> }
>>
>> Any ideas please?
>>
>>
>>
>>
>


Mapping to multiple groups in Apache Spark

2015-10-21 Thread jeffrichley
I am in a situation where I am using Apache Spark and its map/reduce
functionality. I am now at a stage where I have been able to map to a data
set that conceptually has many "rows" of data.

Now what I am needing is to do a reduce which usually is a straight forward
thing. My real need though is to reduce on "overlapping" rows. For example,
the first reduce uses "rows" 1-30, the second uses 11-40, the third 21-50
and so on. How would this work in a Spark environment?

I appreciate any insight or directions anyone can give,

Jeff Richley



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Mapping-to-multiple-groups-in-Apache-Spark-tp25156.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SF Spark Office Hours Experiment - Friday Afternoon

2015-10-21 Thread Holden Karau
Probably no trips to Warsaw planned by me in the next little while, but a
few people have asked for a hangouts office hours. I'll try and schedule
one after Spark Summit Europe :)

On Wed, Oct 21, 2015 at 11:54 AM, Jacek Laskowski  wrote:

> Hi Holden,
>
> What a great idea! I'd love to join, but since I'm in Europe it's not
> gonna happen by this Fri. Any plans to visit Europe or perhaps Warsaw,
> Poland and host office hours here? ;-)
>
> p.s. What about an virtual event with Google Hangout on Air on?
>
> Pozdrawiam,
> Jacek
>
> --
> Jacek Laskowski | http://blog.japila.pl | http://blog.jaceklaskowski.pl
> Follow me at https://twitter.com/jaceklaskowski
> Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski
>
>
> On Wed, Oct 21, 2015 at 12:55 AM, Holden Karau 
> wrote:
> > Hi SF based folks,
> >
> > I'm going to try doing some simple office hours this Friday afternoon
> > outside of Paramo Coffee. If no one comes by I'll just be drinking coffee
> > hacking on some Spark PRs so if you just want to hangout and hack on
> Spark
> > as a group come by too. (See
> > https://twitter.com/holdenkarau/status/656592409455779840 ). If you
> have any
> > questions you'd like to ask in particular shoot me an e-mail in advance
> I'll
> > even try and be prepared.
> >
> > Cheers,
> >
> > Holden :)
> > --
> > Cell : 425-233-8271
> > Twitter: https://twitter.com/holdenkarau
>



-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: How to check whether the RDD is empty or not

2015-10-21 Thread Gerard Maas
As TD mentions, there's no such thing as an 'empty DStream'. Some intervals
of a DStream could be empty, in which case the related RDD will be empty.
This means that you should express such condition based on the RDD's of the
DStream. Translated in code:

dstream.foreachRDD{ rdd =>
 if (!rdd.isEmpty) {
...do stuff ...
}
}


On Wed, Oct 21, 2015 at 9:00 PM, Tathagata Das  wrote:

> What do you mean by checking when a "DStream is empty"? DStream represents
> an endless stream of data, and at point of time checking whether it is
> empty or not does not make sense.
>
> FYI, there is RDD.isEmpty()
>
>
>
> On Wed, Oct 21, 2015 at 10:03 AM, diplomatic Guru <
> diplomaticg...@gmail.com> wrote:
>
>> I tried below code but still carrying out the action even though there is no 
>> new data.
>>
>> JavaPairInputDStream input = ssc.fileStream(iFolder, 
>> LongWritable.class,Text.class, TextInputFormat.class);
>>
>>  if(input != null){
>> //do some action if it is not empty
>> }
>>
>>
>> On 21 October 2015 at 18:00, diplomatic Guru 
>> wrote:
>>
>>>
>>> Hello All,
>>>
>>> I have a Spark Streaming job that should  do some action only if the RDD
>>> is not empty. This can be done easily with the spark batch RDD as I could
>>> .take(1) and check whether it is empty or  not. But this cannot been done
>>> in Spark Streaming DStrem
>>>
>>>
>>> JavaPairInputDStream input = ssc.fileStream(iFolder, 
>>> LongWritable.class,Text.class, TextInputFormat.class);
>>>
>>>  if(inputLines!=null){
>>> //do some action if it is not empty
>>> }
>>>
>>> Any ideas please?
>>>
>>>
>>>
>>>
>>
>


Slow activation using Spark Streaming's new receiver scheduling mechanism

2015-10-21 Thread Budde, Adam
Hi all,

My team uses Spark Streaming to implement the batch processing component of a 
lambda architecture with 5 min intervals. We process roughly 15 TB/day using 
three discrete Spark clusters and about 250 receivers per cluster. We've been 
having some issues migrating our platform from Spark 1.4.x to Spark 1.5.x.

The first issue we've been having relates to receiver scheduling. Under Spark 
1.4.x, each receiver becomes active almost immediately and the application 
quickly reaches its peak input throughput. Under the new receiver scheduling 
mechanism introduced in Spark 1.5.x 
(SPARK-8882) we see that it 
takes quite a while for our receivers to become active. I haven't spent too 
much time gathering hard numbers on this, but my estimate would be that it 
takes over half an hour for half the receivers to become active and well over 
an hour for all of them to become active.

I spent some time digging into the code for the ReceiverTracker, 
ReceiverSchedulingPolicy, and ReceiverSupervisor classes and recompiling Spark 
with some added debug logging. As far as I can tell, this is what is happening:

  *   On program start, the ReceiverTracker RPC endpoint receives a 
StartAllReceivers message via its own launchReceivers() method (itself invoked 
by start())
  *   The handler for StartAllReceivers invokes 
ReceiverSchedulingPolicy.scheduleReceivers() to generate a desired receiver to 
executor mapping and calls ReceiverTracker.startReceiver() for each receiver
  *   startReceiver() uses the SparkContext to submit a job that creates an 
instance of ReceiverSupervisorImpl to run the receiver on a random executor
  *   While bootstrapping the receiver, the 
ReceiverSupervisorImpl.onReceiverStart() sends a RegisterReceiver message to 
the ReceiverTracker RPC endpoint
  *   The handler for RegisterReceiver checks if the randomly-selected executor 
was the one the receiver was assigned to by 
ReceiverSchedulingPolicy.scheduleReceivers() and fails the job if it isn't
  *   ReceiverTracker restarts the failed receiver job and this process 
continues until all receivers are assigned to their proper executor

Assuming this order of operations is correct, I have the following questions:

  1.  Is there any way to coerce SparkContext.submitJob() into scheduling a job 
on a specific executor? Put another way, is there a mechanism we can use to 
ensure that each receiver job is run on the executor it was assigned to on the 
first call to ReceiverSchedulingPolicy.scheduleReceivers()?
  2.  If (1) is not possible, is there anything we can do to speed up the 
StartReceiver -> RegisterReceiver -> RestartReceiver loop? Right now, it seems 
to take about 30-40 sec between attempts to invoke RegisterReceiver on a given 
receiver.

Thanks for the help!

Adam


Re: Spark_1.5.1_on_HortonWorks

2015-10-21 Thread Artem Ervits
You can use these steps
http://hortonworks.com/hadoop-tutorial/apache-spark-1-4-1-technical-preview-with-hdp/

1.5.1 is not officially supported yet but should be coming in a month or so.
On Oct 21, 2015 1:56 PM, "Ajay Chander"  wrote:

> Thanks for your kind inputs. Right now I am running spark-1.3.1 on YARN(4
> node cluster) on a HortonWorks distribution. Now I want to upgrade 
> spark-1.3.1 to
> spark-1.5.1. So at this point of time, do I have to manually go and copy
> spark-1.5.1 tarbal to all the nodes or is there any alternative so that I
> can get it upgraded through Ambari UI ? If possible can anyone point me to
> a documentation online? Thank you.
>
> Regards,
> Ajay
>
> On Wednesday, October 21, 2015, Saisai Shao 
> wrote:
>
>> Hi Frans,
>>
>> You could download Spark 1.5.1-hadoop 2.6 pre-built tarball and copy into
>> HDP 2.3 sandbox or master node. Then copy all the conf files from
>> /usr/hdp/current/spark-client/ to your /conf, or you could
>> refer to this tech preview (
>> http://hortonworks.com/hadoop-tutorial/apache-spark-1-4-1-technical-preview-with-hdp/
>> ), in "installing chapter", step 4 ~ 8 is what you need to do.
>>
>> Thanks
>> Saisai
>>
>> On Wed, Oct 21, 2015 at 1:27 PM, Frans Thamura 
>> wrote:
>>
>>> Doug
>>>
>>> is it possible to put in HDP 2.3?
>>>
>>> esp in Sandbox
>>>
>>> can share how do you install it?
>>>
>>>
>>> F
>>> --
>>> Frans Thamura (曽志胜)
>>> Java Champion
>>> Shadow Master and Lead Investor
>>> Meruvian.
>>> Integrated Hypermedia Java Solution Provider.
>>>
>>> Mobile: +628557888699
>>> Blog: http://blogs.mervpolis.com/roller/flatburger (id)
>>>
>>> FB: http://www.facebook.com/meruvian
>>> TW: http://www.twitter.com/meruvian / @meruvian
>>> Website: http://www.meruvian.org
>>>
>>> "We grow because we share the same belief."
>>>
>>>
>>> On Wed, Oct 21, 2015 at 12:24 PM, Doug Balog 
>>> wrote:
>>> > I have been running 1.5.1 with Hive in secure mode on HDP 2.2.4
>>> without any problems.
>>> >
>>> > Doug
>>> >
>>> >> On Oct 21, 2015, at 12:05 AM, Ajay Chander 
>>> wrote:
>>> >>
>>> >> Hi Everyone,
>>> >>
>>> >> Any one has any idea if spark-1.5.1 is available as a service on
>>> HortonWorks ? I have spark-1.3.1 installed on the Cluster and it is a
>>> HortonWorks distribution. Now I want upgrade it to spark-1.5.1. Anyone here
>>> have any idea about it? Thank you in advance.
>>> >>
>>> >> Regards,
>>> >> Ajay
>>> >
>>> >
>>> > -
>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> > For additional commands, e-mail: user-h...@spark.apache.org
>>> >
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>


Re: How to check whether the RDD is empty or not

2015-10-21 Thread diplomatic Guru
Tathagata, thank you for the response.

I have two receivers in my Spark Stream job;  1 reads an endless stream of
data from flume and the other reads data from HDFS directory. However,
files do not get moved into HDFS frequently (let's say it gets moved every
10 minutes). This is where I need to check of there are any events in the
HDFS before doing any action on it.

The RDD.isEmpty() is available in JavaRDD and JavaPairRDD but
not JavaDStream and JavaPairDStream, but I could use foreach and then check
the RDD but it's long winded.

On 21 October 2015 at 20:00, Tathagata Das  wrote:

> What do you mean by checking when a "DStream is empty"? DStream represents
> an endless stream of data, and at point of time checking whether it is
> empty or not does not make sense.
>
> FYI, there is RDD.isEmpty()
>
>
>
> On Wed, Oct 21, 2015 at 10:03 AM, diplomatic Guru <
> diplomaticg...@gmail.com> wrote:
>
>> I tried below code but still carrying out the action even though there is no 
>> new data.
>>
>> JavaPairInputDStream input = ssc.fileStream(iFolder, 
>> LongWritable.class,Text.class, TextInputFormat.class);
>>
>>  if(input != null){
>> //do some action if it is not empty
>> }
>>
>>
>> On 21 October 2015 at 18:00, diplomatic Guru 
>> wrote:
>>
>>>
>>> Hello All,
>>>
>>> I have a Spark Streaming job that should  do some action only if the RDD
>>> is not empty. This can be done easily with the spark batch RDD as I could
>>> .take(1) and check whether it is empty or  not. But this cannot been done
>>> in Spark Streaming DStrem
>>>
>>>
>>> JavaPairInputDStream input = ssc.fileStream(iFolder, 
>>> LongWritable.class,Text.class, TextInputFormat.class);
>>>
>>>  if(inputLines!=null){
>>> //do some action if it is not empty
>>> }
>>>
>>> Any ideas please?
>>>
>>>
>>>
>>>
>>
>


spark streaming 1.51. uses very old version of twitter4j

2015-10-21 Thread Andy Davidson
While digging around the spark source today I discovered it depends on
version 3.0.3 of twitter4j. This version was released on dec 2 2012. I
noticed that the current version is 4.0.4 and was released on 6/23/2015

I am not aware of any particular problems. Are they any plans to upgrade?
What is the spark policy on upgrading dependencies in general?

Kind regards

Andy

https://github.com/yusuke/twitter4j/releases

http://twitter4j.org/en/index.html#sourceCode


Maven Integration
You can integrate the latest Twitter4J build easily by just including the
following lines in your pom.xml.
   
  
   org.twitter4j
   twitter4j-core
   [4.0,)
   
   ...
   




Distributed caching of a file in SPark Streaming

2015-10-21 Thread swetha
Hi,

I need to cache a file in a distributed fashion like Hadoop Distributed
Cache and to be able to use it when needed. Is doing the following a right
way of doing the same? Also, by doing SparkFiles.get(fileName) , would it
just give all the contents in the form of a String?

SparkContext.addFile()

SparkFiles.get(fileName) 

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Distributed-caching-of-a-file-in-SPark-Streaming-tp25157.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SF Spark Office Hours Experiment - Friday Afternoon

2015-10-21 Thread Luciano Resende
On Tue, Oct 20, 2015 at 3:55 PM, Holden Karau  wrote:

> Hi SF based folks,
>
> I'm going to try doing some simple office hours this Friday afternoon
> outside of Paramo Coffee. If no one comes by I'll just be drinking coffee
> hacking on some Spark PRs so if you just want to hangout and hack on Spark
> as a group come by too. (See
> https://twitter.com/holdenkarau/status/656592409455779840 ). If you have
> any questions you'd like to ask in particular shoot me an e-mail in advance
> I'll even try and be prepared.
>
> Cheers,
>
> Holden :)
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>

+1, Very good initiative !!! Will try to attend some as time permits.

-- 
Luciano Resende
http://people.apache.org/~lresende
http://twitter.com/lresende1975
http://lresende.blogspot.com/


Re: dataframe average error: Float does not take parameters

2015-10-21 Thread Carol McDonald
version 1.3.1

scala> auction.printSchema

root

 |-- auctionid: string (nullable = true)

 |-- bid: float (nullable = false)

 |-- bidtime: float (nullable = false)

 |-- bidder: string (nullable = true)

 |-- bidderrate: integer (nullable = true)

 |-- openbid: float (nullable = false)

 |-- price: float (nullable = false)

 |-- item: string (nullable = true)

 |-- daystolive: integer (nullable = true)


scala> auction.groupBy("auctionid", "item").count.show

auctionid  itemcount



3016429446 palm10

8211851222 xbox28



On Wed, Oct 21, 2015 at 2:38 PM, Ali Tajeldin EDU 
wrote:

> Which version of Spark are you using?  I just tried the example below on
> 1.5.1 and it seems to work as expected:
>
> scala> val res = df.groupBy("key").count.agg(min("count"), avg("count"))
> res: org.apache.spark.sql.DataFrame = [min(count): bigint, avg(count):
> double]
>
> scala> res.show
> +--+--+
> |min(count)|avg(count)|
> +--+--+
> | 1|   1.0|
> +--+--+
>
>
> scala> res.printSchema
> root
>  |-- min(count): long (nullable = true)
>  |-- avg(count): double (nullable = true)
>
> On Oct 21, 2015, at 11:12 AM, Carol McDonald 
> wrote:
>
> This used to work :
>
> // What's the min number of bids per item? what's the average? what's the
> max?
> auction.groupBy("item", "auctionid").count.agg(min("count"),
> avg("count"),max("count")).show
>
> // MIN(count) AVG(count)MAX(count)
> // 1  16.992025518341308 75
>
> but this now gives an error
>
> val res = auction.groupBy("item", "auctionid").count.agg(min("count"),
> avg("count"),max("count"))
>
> :42: error: Float does not take parameters
>
> val res = auction.groupBy("item", "auctionid").count.agg(min("count"),
> avg("count"),max("count"))
>
> min and max still work .
>
> Do I need to cast the count to a float ?
>
> auction.groupBy("item", "auctionid").count.agg(min("count"),
> max("count")).show
>
> MIN(count) MAX(count)
>  1  75
>
>
>


Poor use cases for Spark

2015-10-21 Thread Ben Thompson
Hello,

I'm interested in hearing use cases and parallelism problems where Spark
was *not* a good fit for you. This is an effort to understand the limits of
MapReduce style parallelism.

Some broad things that pop out:
-- recursion
-- problems where the task graph is not known ahead of time
-- some graph problems (
http://www.frankmcsherry.org/graph/scalability/cost/2015/01/15/COST.html)

This is not in anyway an attack on Spark. It's an amazing tool that does
it's job very well. I'm just curious where it starts breaking down. Let me
know if you have any experiences!

Thanks very much,
Ben


Re: How to distinguish columns when joining DataFrames with shared parent?

2015-10-21 Thread Isabelle Phan
Thanks Michael and Ali for the reply!

I'll make sure to use unresolved columns when working with self joins then.

As pointed by Ali, isn't there still an issue with the aliasing? It works
when using org.apache.spark.sql.functions.col(colName: String) method, but
not when using org.apache.spark.sql.DataFrame.apply(colName: String):

scala> j.select(col("lv.value")).show
+-+
|value|
+-+
|   10|
|   20|
+-+


scala> j.select(largeValues("lv.value")).show
+-+
|value|
+-+
|1|
|5|
+-+

Or does this behavior have the same root cause as detailed in Michael's
email?


-Isabelle




On Wed, Oct 21, 2015 at 11:46 AM, Michael Armbrust 
wrote:

> Unfortunately, the mechanisms that we use to differentiate columns
> automatically don't work particularly well in the presence of self joins.
> However, you can get it work if you use the $"column" syntax consistently:
>
> val df = Seq((1, 1), (1, 10), (2, 3), (3, 20), (3, 5), (4, 10)).toDF("key", 
> "value")val smallValues = df.filter('value < 10).as("sv")val largeValues = 
> df.filter('value >= 10).as("lv")
> ​
> smallValues
>   .join(largeValues, $"sv.key" === $"lv.key")
>   .select($"sv.key".as("key"), $"sv.value".as("small_value"), 
> $"lv.value".as("large_value"))
>   .withColumn("diff", $"small_value" - $"large_value")
>   .show()
> +---+---+---++|key|small_value|large_value|diff|+---+---+---++|
>   1|  1| 10|  -9||  3|  5| 20| 
> -15|+---+---+---++
>
>
> The problem with the other cases is that calling smallValues("columnName")
> or largeValues("columnName") is eagerly resolving the attribute to the
> same column (since the data is actually coming from the same place).  By
> the time we realize that you are joining the data with itself (at which
> point we rewrite one side of the join to use different expression ids) its
> too late.  At the core the problem is that in Scala we have no easy way to
> differentiate largeValues("columnName") from smallValues("columnName").
> This is because the data is coming from the same DataFrame and we don't
> actually know which variable name you are using.  There are things we can
> change here, but its pretty hard to change the semantics without breaking
> other use cases.
>
> So, this isn't a straight forward "bug", but its definitely a usability
> issue.  For now, my advice would be: only use unresolved columns (i.e.
> $"[alias.]column" or col("[alias.]column")) when working with self joins.
>
> Michael
>


Re: How to distinguish columns when joining DataFrames with shared parent?

2015-10-21 Thread Michael Armbrust
Yeah, I was suggesting that you avoid using
org.apache.spark.sql.DataFrame.apply(colName:
String) when you are working with selfjoins as it eagerly binds to a
specific column in a what that breaks when we do the rewrite of one side of
the query.  Using the apply method constructs a resolved column eagerly
(which looses the alias information).

On Wed, Oct 21, 2015 at 1:49 PM, Isabelle Phan  wrote:

> Thanks Michael and Ali for the reply!
>
> I'll make sure to use unresolved columns when working with self joins then.
>
> As pointed by Ali, isn't there still an issue with the aliasing? It works
> when using org.apache.spark.sql.functions.col(colName: String) method, but
> not when using org.apache.spark.sql.DataFrame.apply(colName: String):
>
> scala> j.select(col("lv.value")).show
> +-+
> |value|
> +-+
> |   10|
> |   20|
> +-+
>
>
> scala> j.select(largeValues("lv.value")).show
> +-+
> |value|
> +-+
> |1|
> |5|
> +-+
>
> Or does this behavior have the same root cause as detailed in Michael's
> email?
>
>
> -Isabelle
>
>
>
>
> On Wed, Oct 21, 2015 at 11:46 AM, Michael Armbrust  > wrote:
>
>> Unfortunately, the mechanisms that we use to differentiate columns
>> automatically don't work particularly well in the presence of self joins.
>> However, you can get it work if you use the $"column" syntax
>> consistently:
>>
>> val df = Seq((1, 1), (1, 10), (2, 3), (3, 20), (3, 5), (4, 10)).toDF("key", 
>> "value")val smallValues = df.filter('value < 10).as("sv")val largeValues = 
>> df.filter('value >= 10).as("lv")
>> ​
>> smallValues
>>   .join(largeValues, $"sv.key" === $"lv.key")
>>   .select($"sv.key".as("key"), $"sv.value".as("small_value"), 
>> $"lv.value".as("large_value"))
>>   .withColumn("diff", $"small_value" - $"large_value")
>>   .show()
>> +---+---+---++|key|small_value|large_value|diff|+---+---+---++|
>>   1|  1| 10|  -9||  3|  5| 20| 
>> -15|+---+---+---++
>>
>>
>> The problem with the other cases is that calling
>> smallValues("columnName") or largeValues("columnName") is eagerly
>> resolving the attribute to the same column (since the data is actually
>> coming from the same place).  By the time we realize that you are joining
>> the data with itself (at which point we rewrite one side of the join to use
>> different expression ids) its too late.  At the core the problem is that in
>> Scala we have no easy way to differentiate largeValues("columnName")
>> from smallValues("columnName").  This is because the data is coming from
>> the same DataFrame and we don't actually know which variable name you are
>> using.  There are things we can change here, but its pretty hard to change
>> the semantics without breaking other use cases.
>>
>> So, this isn't a straight forward "bug", but its definitely a usability
>> issue.  For now, my advice would be: only use unresolved columns (i.e.
>> $"[alias.]column" or col("[alias.]column")) when working with self joins.
>>
>> Michael
>>
>
>


Poor use cases for Spark

2015-10-21 Thread tbenthompson
Hello,

I'm interested in hearing use cases and parallelism problems where Spark was
*not* a good fit for you. This is an effort to understand the limits of
MapReduce style parallelism. 

Some broad things that pop out:
-- recursion
-- problems where the task graph is not known ahead of time
-- some graph problems
(http://www.frankmcsherry.org/graph/scalability/cost/2015/01/15/COST.html)

This is not in anyway an attack on Spark. It's an amazing tool that does
it's job very well. I'm just curious where it starts breaking down. Let me
know if you have any experiences!

Thanks very much,
Ben



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Poor-use-cases-for-Spark-tp25158.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Kafka Streaming and Filtering > 3000 partitons

2015-10-21 Thread Dave Ariens
Cody,

First off--thanks for your contributions and blog post, I actually linked to in 
my original question. You'll have to forgive me as I've only been using Spark 
and writing Scala for a few days. I'm aware that the RDD partitions are 1:1 
with Kafka topic partitions and you can get the offset ranges.  But my 
understand is that the below code would need to be executed after the stream 
has been processed.

Let's say we're storing our filters in a key value map where the key is the 
topic name, and the value is a string that a message within a partition of that 
topic must contain to match.

Is this the approach you're suggesting (using your example code)?

// This would get built up on the driver, likely fetching the topic and filters 
from ZK
val topicFilters = Map("topic1" -> "this text must match", "topic2" -> "this 
other text must match")


val stream = KafkaUtils.createDirectStream(...)
  ...
  stream.foreachRDD { rdd =>
// Cast the rdd to an interface that lets us get an array of OffsetRange
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

rdd.foreachPartition { iter =>
  // index to get the correct offset range for the rdd partition we're 
working on
  val osr: OffsetRange = offsetRanges(TaskContext.get.partitionId)

  // get any needed data from the offset range
  val topic = osr.topic
  val kafkaPartitionId = osr.partition
  val begin = osr.fromOffset
  val end = osr.untilOffset

  // Now we know the topic name, we can filter something
  // Or could we have referenced the topic name from
  // offsetRanges(TaskContext.get.partitionId) earlier
  // before we entered into stream.foreachRDD...?





From: Cody Koeninger [mailto:c...@koeninger.org]
Sent: Wednesday, October 21, 2015 3:01 PM
To: Dave Ariens
Cc: user@spark.apache.org
Subject: Re: Kafka Streaming and Filtering > 3000 partitons

The rdd partitions are 1:1 with kafka topicpartitions, so you can use offsets 
ranges to figure out which topic a given rdd partition is for and proceed 
accordingly.  See the kafka integration guide in the spark streaming docs for 
more details, or  https://github.com/koeninger/kafka-exactly-once

As far as setting offsets in ZK, there's a private interface in the spark 
codebase that would make it a little easier for you to do that.  You can see 
that code for reference, or there's an outstanding ticket for making it public 
https://issues.apache.org/jira/browse/SPARK-10963

On Wed, Oct 21, 2015 at 1:50 PM, Dave Ariens 
mailto:dari...@blackberry.com>> wrote:
Hey folks,

I have a very large number of Kafka topics (many thousands of partitions) that 
I want to consume, filter based on topic-specific filters, then produce back to 
filtered topics in Kafka.

Using the receiver-less based approach with Spark 1.4.1 (described 
here) 
I am able to use either KafkaUtils.createDirectStream or KafkaUtils.createRDD, 
consume from many topics, and filter them with the same filters but I can't 
seem to wrap my head around how to apply topic-specific filters, or to finally 
produce to topic-specific destination topics.

Another point would be that I will need to checkpoint the metadata after each 
successful batch and set starting offsets per partition back to ZK.  I expect I 
can do that on the final RDDs after casting them accordingly, but if anyone has 
any expertise/guidance doing that and is willing to share, I'd be pretty 
grateful.



Re: spark streaming 1.51. uses very old version of twitter4j

2015-10-21 Thread Luciano Resende
Thanks for catching that, I have created a JIRA to track it, and hopefully
I can submit a fix for the next release

https://issues.apache.org/jira/browse/SPARK-11245

On Wed, Oct 21, 2015 at 1:11 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> While digging around the spark source today I discovered it depends on
> version 3.0.3 of twitter4j. This version was released on dec 2 2012. I
> noticed that the current version is 4.0.4 and was released on 6/23/2015
>
> I am not aware of any particular problems. Are they any plans to upgrade?
> What is the spark policy on upgrading dependencies in general?
>
> Kind regards
>
> Andy
>
> https://github.com/yusuke/twitter4j/releases
>
> http://twitter4j.org/en/index.html#sourceCode
>
>
> Maven Integration
>
> You can integrate the latest Twitter4J build easily by just including the
> following lines in your pom.xml.
>
>
>   
>org.twitter4j
>twitter4j-core
>[4.0,)
>
>...
>
>
>


-- 
Luciano Resende
http://people.apache.org/~lresende
http://twitter.com/lresende1975
http://lresende.blogspot.com/


Re: Kafka Streaming and Filtering > 3000 partitons

2015-10-21 Thread Cody Koeninger
Yeah, that's the general idea.

Regarding the question in your code comments ... The code inside of
foreachPartition is what's running on the executor.  It wouldn't make any
sense to try to get a partition ID before that block.

On Wed, Oct 21, 2015 at 4:07 PM, Dave Ariens  wrote:

> Cody,
>
>
>
> First off--thanks for your contributions and blog post, I actually linked
> to in my original question. You'll have to forgive me as I've only been
> using Spark and writing Scala for a few days. I'm aware that the RDD
> partitions are 1:1 with Kafka topic partitions and you can get the offset
> ranges.  But my understand is that the below code would need to be executed
> after the stream has been processed.
>
>
>
> Let's say we're storing our filters in a key value map where the key is
> the topic name, and the value is a string that a message within a partition
> of that topic must contain to match.
>
>
>
> Is this the approach you're suggesting (using your example code)?
>
>
>
> // This would get built up on the driver, likely fetching the topic and
> filters from ZK
>
> val topicFilters = Map("topic1" -> "this text must match", "topic2" ->
> "this other text must match")
>
>
>
>
>
> val stream = KafkaUtils.createDirectStream(...)
>
>   ...
>
>   stream.foreachRDD { rdd =>
>
> // Cast the rdd to an interface that lets us get an array of
> OffsetRange
>
> val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>
>
>
> rdd.foreachPartition { iter =>
>
>   // index to get the correct offset range for the rdd partition we're
> working on
>
>   val osr: OffsetRange = offsetRanges(TaskContext.get.partitionId)
>
>
>
>   // get any needed data from the offset range
>
>   val topic = osr.topic
>
>   val kafkaPartitionId = osr.partition
>
>   val begin = osr.fromOffset
>
>   val end = osr.untilOffset
>
>
>
>   // Now we know the topic name, we can filter something
>
>   // Or could we have referenced the topic name from
>
>   // offsetRanges(TaskContext.get.partitionId) earlier
>
>   // before we entered into stream.foreachRDD...?
>
>
>
>
>
>
>
>
>
>
>
> *From:* Cody Koeninger [mailto:c...@koeninger.org]
> *Sent:* Wednesday, October 21, 2015 3:01 PM
> *To:* Dave Ariens
> *Cc:* user@spark.apache.org
> *Subject:* Re: Kafka Streaming and Filtering > 3000 partitons
>
>
>
> The rdd partitions are 1:1 with kafka topicpartitions, so you can use
> offsets ranges to figure out which topic a given rdd partition is for and
> proceed accordingly.  See the kafka integration guide in the spark
> streaming docs for more details, or
> https://github.com/koeninger/kafka-exactly-once
>
>
>
> As far as setting offsets in ZK, there's a private interface in the spark
> codebase that would make it a little easier for you to do that.  You can
> see that code for reference, or there's an outstanding ticket for making it
> public https://issues.apache.org/jira/browse/SPARK-10963
>
>
>
> On Wed, Oct 21, 2015 at 1:50 PM, Dave Ariens 
> wrote:
>
> Hey folks,
>
>
>
> I have a very large number of Kafka topics (many thousands of partitions)
> that I want to consume, filter based on topic-specific filters, then
> produce back to filtered topics in Kafka.
>
>
>
> Using the receiver-less based approach with Spark 1.4.1 (described here
> )
> I am able to use either KafkaUtils.createDirectStream or
> KafkaUtils.createRDD, consume from many topics, and filter them with the
> same filters but I can't seem to wrap my head around how to apply
> topic-specific filters, or to finally produce to topic-specific destination
> topics.
>
>
>
> Another point would be that I will need to checkpoint the metadata after
> each successful batch and set starting offsets per partition back to ZK.  I
> expect I can do that on the final RDDs after casting them accordingly, but
> if anyone has any expertise/guidance doing that and is willing to share,
> I'd be pretty grateful.
>
>
>


--jars option not working for spark on Mesos in cluster mode

2015-10-21 Thread Virag Kothari
Hi,

I am trying to run a spark job on mesos in cluster mode using the following
command
./bin/spark-submit  --deploy-mode cluster --master mesos://172.17.0.1:7077
 —-jars  http://172.17.0.2:18630/mesos/extraJars.jar --class MyClass
http://172.17.0.2:18630/mesos/foo.jar

The application jar “foo.jar” is downloaded to the working directory of
mesos slave, however the jars passed through —-jars option (extraJars.jar)
are ignored. The client mode works fine.
Is it a bug or am I doing something wrong?

Thanks,
Virag


Re: Spark-Testing-Base Q/A

2015-10-21 Thread Holden Karau
On Wednesday, October 21, 2015, Mark Vervuurt 
wrote:

> Hi Holden,
>
> Thanks for the information, I think that a Java Base Class in order to
> test SparkStreaming using Java would be useful for the community.
> Unfortunately not all of our customers are willing to use Scala or Python.
>
Sounds reasonable, I'll add it this week.

>
> If i am not wrong it’s 4:00 AM for you in California ;)
>
> Yup, I'm not great a regular schedules but I make up for it by doing stuff
when I've had too much coffee to sleep :p

> Regards,
> Mark
>
> On 21 Oct 2015, at 12:42, Holden Karau  wrote:
>
>
>
> On Wednesday, October 21, 2015, Mark Vervuurt 
> wrote:
>
>> Hi Everyone,
>>
>> I am busy trying out ‘Spark-Testing-Base
>> ’. I have the following
>> questions?
>>
>>
>>- Can you test Spark Streaming Jobs using Java?
>>
>> The current base class for testing streaming jobs is implemented using a
> Scala test library (and one in Python too), I can add one using a junit
> base for streaming if it would be useful for you.
>
>>
>>- Can I use Spark-Testing-Base 1.3.0_0.1.1 together with Spark 1.3.1?
>>
>>  You should be able to, the API changes were small enough I didn't
> publish a seperate package, but if you run into any issues let me know.
>
>>
>>
>>
>> Thanks.
>>
>> Greetings,
>> Mark
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>
>
>


Re: How to distinguish columns when joining DataFrames with shared parent?

2015-10-21 Thread Isabelle Phan
Ok, got it.
Thanks a lot Michael for the detailed reply!
On Oct 21, 2015 1:54 PM, "Michael Armbrust"  wrote:

> Yeah, I was suggesting that you avoid using  
> org.apache.spark.sql.DataFrame.apply(colName:
> String) when you are working with selfjoins as it eagerly binds to a
> specific column in a what that breaks when we do the rewrite of one side of
> the query.  Using the apply method constructs a resolved column eagerly
> (which looses the alias information).
>
> On Wed, Oct 21, 2015 at 1:49 PM, Isabelle Phan  wrote:
>
>> Thanks Michael and Ali for the reply!
>>
>> I'll make sure to use unresolved columns when working with self joins
>> then.
>>
>> As pointed by Ali, isn't there still an issue with the aliasing? It works
>> when using org.apache.spark.sql.functions.col(colName: String) method, but
>> not when using org.apache.spark.sql.DataFrame.apply(colName: String):
>>
>> scala> j.select(col("lv.value")).show
>> +-+
>> |value|
>> +-+
>> |   10|
>> |   20|
>> +-+
>>
>>
>> scala> j.select(largeValues("lv.value")).show
>> +-+
>> |value|
>> +-+
>> |1|
>> |5|
>> +-+
>>
>> Or does this behavior have the same root cause as detailed in Michael's
>> email?
>>
>>
>> -Isabelle
>>
>>
>>
>>
>> On Wed, Oct 21, 2015 at 11:46 AM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>>> Unfortunately, the mechanisms that we use to differentiate columns
>>> automatically don't work particularly well in the presence of self joins.
>>> However, you can get it work if you use the $"column" syntax
>>> consistently:
>>>
>>> val df = Seq((1, 1), (1, 10), (2, 3), (3, 20), (3, 5), (4, 10)).toDF("key", 
>>> "value")val smallValues = df.filter('value < 10).as("sv")val largeValues = 
>>> df.filter('value >= 10).as("lv")
>>> ​
>>> smallValues
>>>   .join(largeValues, $"sv.key" === $"lv.key")
>>>   .select($"sv.key".as("key"), $"sv.value".as("small_value"), 
>>> $"lv.value".as("large_value"))
>>>   .withColumn("diff", $"small_value" - $"large_value")
>>>   .show()
>>> +---+---+---++|key|small_value|large_value|diff|+---+---+---++|
>>>   1|  1| 10|  -9||  3|  5| 20| 
>>> -15|+---+---+---++
>>>
>>>
>>> The problem with the other cases is that calling
>>> smallValues("columnName") or largeValues("columnName") is eagerly
>>> resolving the attribute to the same column (since the data is actually
>>> coming from the same place).  By the time we realize that you are joining
>>> the data with itself (at which point we rewrite one side of the join to use
>>> different expression ids) its too late.  At the core the problem is that in
>>> Scala we have no easy way to differentiate largeValues("columnName")
>>> from smallValues("columnName").  This is because the data is coming
>>> from the same DataFrame and we don't actually know which variable name you
>>> are using.  There are things we can change here, but its pretty hard to
>>> change the semantics without breaking other use cases.
>>>
>>> So, this isn't a straight forward "bug", but its definitely a usability
>>> issue.  For now, my advice would be: only use unresolved columns (i.e.
>>> $"[alias.]column" or col("[alias.]column")) when working with self
>>> joins.
>>>
>>> Michael
>>>
>>
>>
>


java.util.NoSuchElementException: key not found error

2015-10-21 Thread Sourav Mazumder
In 1.5.0 if I use randomSplit on a data frame I get this error.

Here is teh code snippet -

val splitData = merged.randomSplit(Array(70,30))
val trainData = splitData(0).persist()
val testData = splitData(1)

trainData.registerTempTable("trn")

%sql select * from trn

The exception goes like this -

java.util.NoSuchElementException: key not found: 1910 at
scala.collection.MapLike$class.default(MapLike.scala:228) at
scala.collection.AbstractMap.default(Map.scala:58) at
scala.collection.mutable.HashMap.apply(HashMap.scala:64) at
org.apache.spark.sql.columnar.compression.DictionaryEncoding$Encoder.compress(compressionSchemes.scala:258)
at
org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.build(CompressibleColumnBuilder.scala:110)
at
org.apache.spark.sql.columnar.NativeColumnBuilder.build(ColumnBuilder.scala:87)
at
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply(InMemoryColumnarTableScan.scala:152)
at
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply(InMemoryColumnarTableScan.scala:152)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:152)
at
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:120)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at
org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at
org.apache.spark.scheduler.Task.run(Task.scala:88) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Any idea ?

regards,
Sourav


problems with spark 1.5.1 streaming TwitterUtils.createStream()

2015-10-21 Thread Andy Davidson
Hi

I want to use twitters public streaming api to follow a set of ids. I want
to implement my driver using java. The current TwitterUtils is a wrapper
around twitter4j and does not expose the full twitter streaming api.

I started by digging through the source code. Unfortunately I do not know
scala

spark-1.5.1/external/twitter/src/main/scala/org/apache/spark/streaming/twitt
er/TwitterUtils.scala
spark-1.5.1/external/twitter/src/main/scala/org/apache/spark/streaming/twitt
er/TwitterInputDStream.scala

   String[] filter = {³topic1², ³topic2²};

JavaDStream tweets = TwitterUtils.createStream(ssc,
twitterAuth, filter);


Does anyone know why filters is defined as String[]? Internally spark
creates a twitter4J FilterQueryClass. Ideally I would like to pass an
filterQuery object. It exposes the part of the twitter streaming api I need
to use to follow a set of user.

Here is a link to the 4.0.4 version of the java doc
http://twitter4j.org/oldjavadocs/4.0.4/index.html

Turns out spark 1.5.1 uses version 3.0.3.
http://twitter4j.org/oldjavadocs/3.0.3/index.html . Both versions implement
java.io.Serializable

I put a comment in where I think the change needs to go. It looks like it
might be trivial. I guess in the short term I can try and rewrite the
TwitterUtils and TwitterReceiver to do what I need to do in Java

Thanks in advance

Andy

object TwitterUtils {

  /**

   * Create a input stream that returns tweets received from Twitter.

   * @param ssc StreamingContext object

   * @param twitterAuth Twitter4J authentication, or None to use Twitter4J's
default OAuth

   *authorization; this uses the system properties
twitter4j.oauth.consumerKey,

   *twitter4j.oauth.consumerSecret, twitter4j.oauth.accessToken and

   *twitter4j.oauth.accessTokenSecret

   * @param filters Set of filter strings to get only those tweets that
match them

   * @param storageLevel Storage level to use for storing the received
objects

   */

  def createStream(

  ssc: StreamingContext,

  twitterAuth: Option[Authorization],

  filters: Seq[String] = Nil, // ??? Can we pass a FilterQuery object
instead

  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2

): ReceiverInputDStream[Status] = {

new TwitterInputDStream(ssc, twitterAuth, filters, storageLevel)

  }





private[streaming]

class TwitterReceiver(

twitterAuth: Authorization,

filters: Seq[String],

storageLevel: StorageLevel

  ) extends Receiver[Status](storageLevel) with Logging {



  @volatile private var twitterStream: TwitterStream = _

  @volatile private var stopped = false



  def onStart() {

try {

  val newTwitterStream = new
TwitterStreamFactory().getInstance(twitterAuth)

  newTwitterStream.addListener(new StatusListener {

def onStatus(status: Status): Unit = {

  store(status)

}

// Unimplemented

def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}

def onTrackLimitationNotice(i: Int) {}

def onScrubGeo(l: Long, l1: Long) {}

def onStallWarning(stallWarning: StallWarning) {}

def onException(e: Exception) {

  if (!stopped) {

restart("Error receiving tweets", e)

  }

}

  })



  val query = new FilterQuery // ??? Can we pass a FilterQuery object
instead

  if (filters.size > 0) {

query.track(filters.toArray)

newTwitterStream.filter(query)

  } else {

newTwitterStream.sample()

  }

  setTwitterStream(newTwitterStream)

  logInfo("Twitter receiver started")

  stopped = false

} catch {

  case e: Exception => restart("Error starting Twitter stream", e)

}

  }








Sporadic error after moving from kafka receiver to kafka direct stream

2015-10-21 Thread Conor Fennell
Hi,

Firstly want to say a big thanks to Cody for contributing the kafka direct
stream.

I have been using the receiver based approach for months but the direct
stream is a much better solution for my use case.

The job in question is now ported over to the direct stream doing
idempotent outputs to Cassandra and outputting to kafka.
I am also saving the offsets to Cassandra.

But unfortunately I am sporadically getting the error below.
It recovers and continues but gives a large spike in the processing delay.
And it can happen in every 3 or 4 batches.
I still have other receiver jobs running and they never throw these
exceptions.

I would be very appreciative for any direction and I can happily provide
more detail.

Thanks,
Conor

15/10/21 23:30:31 INFO consumer.SimpleConsumer: Reconnect due to
socket error: java.nio.channels.ClosedChannelException
15/10/21 23:31:01 ERROR executor.Executor: Exception in task 6.0 in
stage 66.0 (TID 406)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/10/21 23:31:01 INFO executor.CoarseGrainedExecutorBackend: Got
assigned task 407


Re: java.util.NoSuchElementException: key not found error

2015-10-21 Thread Josh Rosen
This is https://issues.apache.org/jira/browse/SPARK-10422, which has been
fixed in Spark 1.5.1.

On Wed, Oct 21, 2015 at 4:40 PM, Sourav Mazumder <
sourav.mazumde...@gmail.com> wrote:

> In 1.5.0 if I use randomSplit on a data frame I get this error.
>
> Here is teh code snippet -
>
> val splitData = merged.randomSplit(Array(70,30))
> val trainData = splitData(0).persist()
> val testData = splitData(1)
>
> trainData.registerTempTable("trn")
>
> %sql select * from trn
>
> The exception goes like this -
>
> java.util.NoSuchElementException: key not found: 1910 at
> scala.collection.MapLike$class.default(MapLike.scala:228) at
> scala.collection.AbstractMap.default(Map.scala:58) at
> scala.collection.mutable.HashMap.apply(HashMap.scala:64) at
> org.apache.spark.sql.columnar.compression.DictionaryEncoding$Encoder.compress(compressionSchemes.scala:258)
> at
> org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.build(CompressibleColumnBuilder.scala:110)
> at
> org.apache.spark.sql.columnar.NativeColumnBuilder.build(ColumnBuilder.scala:87)
> at
> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply(InMemoryColumnarTableScan.scala:152)
> at
> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1$$anonfun$next$2.apply(InMemoryColumnarTableScan.scala:152)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at
> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at
> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:152)
> at
> org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:120)
> at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:262) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) at
> org.apache.spark.rdd.RDD.iterator(RDD.scala:264) at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at
> org.apache.spark.scheduler.Task.run(Task.scala:88) at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Any idea ?
>
> regards,
> Sourav
>


Sporadic error after moving from kafka receiver to kafka direct stream

2015-10-21 Thread Conor Fennell
Hi,

Firstly want to say a big thanks to Cody for contributing the kafka direct
stream.

I have been using the receiver based approach for months but the direct
stream is a much better solution for my use case.

The job in question is now ported over to the direct stream doing
idempotent outputs to Cassandra and outputting to kafka.
I am also saving the offsets to Cassandra.

But unfortunately I am sporadically getting the error below.
It recovers and continues but gives a large spike in the processing delay.
And it can happen in every 3 or 4 batches.
I still have other receiver jobs running and they never throw these
exceptions.

I would be very appreciative for any direction and I can happily provide
more detail.

Thanks,
Conor

15/10/21 23:30:31 INFO consumer.SimpleConsumer: Reconnect due to
socket error: java.nio.channels.ClosedChannelException
15/10/21 23:31:01 ERROR executor.Executor: Exception in task 6.0 in
stage 66.0 (TID 406)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/10/21 23:31:01 INFO executor.CoarseGrainedExecutorBackend: Got
assigned task 407


Re: Spark_1.5.1_on_HortonWorks

2015-10-21 Thread Saisai Shao
Hi Ajay,

You don't need to copy tarball to all the nodes, only one node you want to
run spark application is enough (mostly the master node), Yarn will help to
distribute the Spark dependencies. The link I mentioned before is the one
you could follow, please read my previous mail.

Thanks
Saisai



On Thu, Oct 22, 2015 at 1:56 AM, Ajay Chander  wrote:

> Thanks for your kind inputs. Right now I am running spark-1.3.1 on YARN(4
> node cluster) on a HortonWorks distribution. Now I want to upgrade 
> spark-1.3.1 to
> spark-1.5.1. So at this point of time, do I have to manually go and copy
> spark-1.5.1 tarbal to all the nodes or is there any alternative so that I
> can get it upgraded through Ambari UI ? If possible can anyone point me to
> a documentation online? Thank you.
>
> Regards,
> Ajay
>
>
> On Wednesday, October 21, 2015, Saisai Shao 
> wrote:
>
>> Hi Frans,
>>
>> You could download Spark 1.5.1-hadoop 2.6 pre-built tarball and copy into
>> HDP 2.3 sandbox or master node. Then copy all the conf files from
>> /usr/hdp/current/spark-client/ to your /conf, or you could
>> refer to this tech preview (
>> http://hortonworks.com/hadoop-tutorial/apache-spark-1-4-1-technical-preview-with-hdp/
>> ), in "installing chapter", step 4 ~ 8 is what you need to do.
>>
>> Thanks
>> Saisai
>>
>> On Wed, Oct 21, 2015 at 1:27 PM, Frans Thamura 
>> wrote:
>>
>>> Doug
>>>
>>> is it possible to put in HDP 2.3?
>>>
>>> esp in Sandbox
>>>
>>> can share how do you install it?
>>>
>>>
>>> F
>>> --
>>> Frans Thamura (曽志胜)
>>> Java Champion
>>> Shadow Master and Lead Investor
>>> Meruvian.
>>> Integrated Hypermedia Java Solution Provider.
>>>
>>> Mobile: +628557888699
>>> Blog: http://blogs.mervpolis.com/roller/flatburger (id)
>>>
>>> FB: http://www.facebook.com/meruvian
>>> TW: http://www.twitter.com/meruvian / @meruvian
>>> Website: http://www.meruvian.org
>>>
>>> "We grow because we share the same belief."
>>>
>>>
>>> On Wed, Oct 21, 2015 at 12:24 PM, Doug Balog 
>>> wrote:
>>> > I have been running 1.5.1 with Hive in secure mode on HDP 2.2.4
>>> without any problems.
>>> >
>>> > Doug
>>> >
>>> >> On Oct 21, 2015, at 12:05 AM, Ajay Chander 
>>> wrote:
>>> >>
>>> >> Hi Everyone,
>>> >>
>>> >> Any one has any idea if spark-1.5.1 is available as a service on
>>> HortonWorks ? I have spark-1.3.1 installed on the Cluster and it is a
>>> HortonWorks distribution. Now I want upgrade it to spark-1.5.1. Anyone here
>>> have any idea about it? Thank you in advance.
>>> >>
>>> >> Regards,
>>> >> Ajay
>>> >
>>> >
>>> > -
>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> > For additional commands, e-mail: user-h...@spark.apache.org
>>> >
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>


Spark_sql

2015-10-21 Thread Ajay Chander
Hi Everyone,

I have a use case where I have to create a DataFrame inside the map()
function. To create a DataFrame it need sqlContext or hiveContext. Now how
do I pass the context to my map function ? And I am doing it in java. I
tried creating a class "TestClass" which implements "Function"
and inside the call method I want to create the DataFrame, so I created a
parameterized constructor to pass context from driver program to TestClass
and use that context to create DataFrame. But it seems like it's a wrong
way of doing. Can anyone help me in this? Thanks in advance.

Regards,
Aj


Spark_1.5.1_on_HortonWorks

2015-10-21 Thread Ajay Chander
Hi Sasai,

Thanks for your time. I have followed your inputs and downloaded
"spark-1.5.1-bin-hadoop2.6" on one of the node say node1. And when I did a
pie test everything seems to be working fine, except that the spark-history
-server running on this node1 has gone down. It was complaining about
 missing class:

15/10/21 16:41:28 INFO HistoryServer: Registered signal handlers for [TERM,
HUP, INT]
15/10/21 16:41:28 WARN SparkConf: The configuration key
'spark.yarn.applicationMaster.waitTries' has been deprecated as of Spark
1.3 and and may be removed in the future. Please use the new key
'spark.yarn.am.waitTime' instead.
15/10/21 16:41:29 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
15/10/21 16:41:29 INFO SecurityManager: Changing view acls to: root
15/10/21 16:41:29 INFO SecurityManager: Changing modify acls to: root
15/10/21 16:41:29 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(root); users
with modify permissions: Set(root)
Exception in thread "main" java.lang.ClassNotFoundException:
org.apache.spark.deploy.yarn.history.YarnHistoryProvider
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:173)
at
org.apache.spark.deploy.history.HistoryServer$.main(HistoryServer.scala:231)
at
org.apache.spark.deploy.history.HistoryServer.main(HistoryServer.scala)


I went to the lib folder and noticed that
"spark-assembly-1.5.1-hadoop2.6.0.jar" is missing that class. I was able to
get the spark history server started with 1.3.1 but not 1.5.1. Any inputs
on this?

Really appreciate your help. Thanks

Regards,
Ajay



On Wednesday, October 21, 2015, Saisai Shao > wrote:

> Hi Ajay,
>
> You don't need to copy tarball to all the nodes, only one node you want to
> run spark application is enough (mostly the master node), Yarn will help to
> distribute the Spark dependencies. The link I mentioned before is the one
> you could follow, please read my previous mail.
>
> Thanks
> Saisai
>
>
>
> On Thu, Oct 22, 2015 at 1:56 AM, Ajay Chander  wrote:
>
>> Thanks for your kind inputs. Right now I am running spark-1.3.1 on YARN(4
>> node cluster) on a HortonWorks distribution. Now I want to upgrade 
>> spark-1.3.1 to
>> spark-1.5.1. So at this point of time, do I have to manually go and copy
>> spark-1.5.1 tarbal to all the nodes or is there any alternative so that I
>> can get it upgraded through Ambari UI ? If possible can anyone point me to
>> a documentation online? Thank you.
>>
>> Regards,
>> Ajay
>>
>>
>> On Wednesday, October 21, 2015, Saisai Shao 
>> wrote:
>>
>>> Hi Frans,
>>>
>>> You could download Spark 1.5.1-hadoop 2.6 pre-built tarball and copy
>>> into HDP 2.3 sandbox or master node. Then copy all the conf files from
>>> /usr/hdp/current/spark-client/ to your /conf, or you could
>>> refer to this tech preview (
>>> http://hortonworks.com/hadoop-tutorial/apache-spark-1-4-1-technical-preview-with-hdp/
>>> ), in "installing chapter", step 4 ~ 8 is what you need to do.
>>>
>>> Thanks
>>> Saisai
>>>
>>> On Wed, Oct 21, 2015 at 1:27 PM, Frans Thamura 
>>> wrote:
>>>
 Doug

 is it possible to put in HDP 2.3?

 esp in Sandbox

 can share how do you install it?


 F
 --
 Frans Thamura (曽志胜)
 Java Champion
 Shadow Master and Lead Investor
 Meruvian.
 Integrated Hypermedia Java Solution Provider.

 Mobile: +628557888699
 Blog: http://blogs.mervpolis.com/roller/flatburger (id)

 FB: http://www.facebook.com/meruvian
 TW: http://www.twitter.com/meruvian / @meruvian
 Website: http://www.meruvian.org

 "We grow because we share the same belief."


 On Wed, Oct 21, 2015 at 12:24 PM, Doug Balog 
 wrote:
 > I have been running 1.5.1 with Hive in secure mode on HDP 2.2.4
 without any problems.
 >
 > Doug
 >
 >> On Oct 21, 2015, at 12:05 AM, Ajay Chander 
 wrote:
 >>
 >> Hi Everyone,
 >>
 >> Any one has any idea if spark-1.5.1 is available as a service on
 HortonWorks ? I have spark-1.3.1 installed on the Cluster and it is a
 HortonWorks distribution. Now I want upgrade it to spark-1.5.1. Anyone here
 have any idea about it? Thank you in advance.
 >>
 >> Regards,
 >> Ajay
 >
 >
 > -
 > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 > For additional commands, e-mail: user-h...@spark.apache.org
 >

 -

Re: Spark_1.5.1_on_HortonWorks

2015-10-21 Thread Saisai Shao
How you start history server, do you still use the history server of 1.3.1,
or you started the history server in 1.5.1?

The Spark tarball you used is the community version, so Application
TimelineServer based history provider is not supported, you could comment
this configuration "spark.history.provider", so it will use default
FsHistoryProvider, or you could configure "spark.history.provider" to
"org.apache.spark.deploy.history.FsHistoryProvider".

If you still want to use this ATS based history server, you have to wait
for the technical preview release of Hortonworks.

Thanks
Saisai


On Thu, Oct 22, 2015 at 9:47 AM, Ajay Chander  wrote:

> Hi Sasai,
>
> Thanks for your time. I have followed your inputs and downloaded
> "spark-1.5.1-bin-hadoop2.6" on one of the node say node1. And when I did a
> pie test everything seems to be working fine, except that the spark-history
> -server running on this node1 has gone down. It was complaining about
>  missing class:
>
> 15/10/21 16:41:28 INFO HistoryServer: Registered signal handlers for
> [TERM, HUP, INT]
> 15/10/21 16:41:28 WARN SparkConf: The configuration key
> 'spark.yarn.applicationMaster.waitTries' has been deprecated as of Spark
> 1.3 and and may be removed in the future. Please use the new key
> 'spark.yarn.am.waitTime' instead.
> 15/10/21 16:41:29 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/10/21 16:41:29 INFO SecurityManager: Changing view acls to: root
> 15/10/21 16:41:29 INFO SecurityManager: Changing modify acls to: root
> 15/10/21 16:41:29 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(root); users
> with modify permissions: Set(root)
> Exception in thread "main" java.lang.ClassNotFoundException:
> org.apache.spark.deploy.yarn.history.YarnHistoryProvider
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.spark.util.Utils$.classForName(Utils.scala:173)
> at
> org.apache.spark.deploy.history.HistoryServer$.main(HistoryServer.scala:231)
> at
> org.apache.spark.deploy.history.HistoryServer.main(HistoryServer.scala)
>
>
> I went to the lib folder and noticed that
> "spark-assembly-1.5.1-hadoop2.6.0.jar" is missing that class. I was able to
> get the spark history server started with 1.3.1 but not 1.5.1. Any inputs
> on this?
>
> Really appreciate your help. Thanks
>
> Regards,
> Ajay
>
>
>
> On Wednesday, October 21, 2015, Saisai Shao 
> wrote:
>
>> Hi Ajay,
>>
>> You don't need to copy tarball to all the nodes, only one node you want
>> to run spark application is enough (mostly the master node), Yarn will help
>> to distribute the Spark dependencies. The link I mentioned before is the
>> one you could follow, please read my previous mail.
>>
>> Thanks
>> Saisai
>>
>>
>>
>> On Thu, Oct 22, 2015 at 1:56 AM, Ajay Chander 
>> wrote:
>>
>>> Thanks for your kind inputs. Right now I am running spark-1.3.1 on
>>> YARN(4 node cluster) on a HortonWorks distribution. Now I want to upgrade
>>> spark-1.3.1 to spark-1.5.1. So at this point of time, do I have to
>>> manually go and copy spark-1.5.1 tarbal to all the nodes or is there any
>>> alternative so that I can get it upgraded through Ambari UI ? If possible
>>> can anyone point me to a documentation online? Thank you.
>>>
>>> Regards,
>>> Ajay
>>>
>>>
>>> On Wednesday, October 21, 2015, Saisai Shao 
>>> wrote:
>>>
 Hi Frans,

 You could download Spark 1.5.1-hadoop 2.6 pre-built tarball and copy
 into HDP 2.3 sandbox or master node. Then copy all the conf files from
 /usr/hdp/current/spark-client/ to your /conf, or you could
 refer to this tech preview (
 http://hortonworks.com/hadoop-tutorial/apache-spark-1-4-1-technical-preview-with-hdp/
 ), in "installing chapter", step 4 ~ 8 is what you need to do.

 Thanks
 Saisai

 On Wed, Oct 21, 2015 at 1:27 PM, Frans Thamura 
 wrote:

> Doug
>
> is it possible to put in HDP 2.3?
>
> esp in Sandbox
>
> can share how do you install it?
>
>
> F
> --
> Frans Thamura (曽志胜)
> Java Champion
> Shadow Master and Lead Investor
> Meruvian.
> Integrated Hypermedia Java Solution Provider.
>
> Mobile: +628557888699
> Blog: http://blogs.mervpolis.com/roller/flatburger (id)
>
> FB: http://www.facebook.com/meruvian
> TW: http://www.twitter.com/meruvian / @meruvian
> Website: http://www.meruvian.org
>
> "We grow because we share the same belief."
>
>
> On Wed, Oct 21, 2015 at 12:24 PM, Doug 

how to use Trees and ensembles: class probabilities

2015-10-21 Thread r7raul1...@163.com
how to use trees and ensembles: class probabilities in spark 1.5.0  . Any 
example or document ?



r7raul1...@163.com


Re: Spark_sql

2015-10-21 Thread Ted Yu
I don't think passing sqlContext to map() is supported.

Can you describe your use case in more detail ? Why do you need to create a
DataFrame inside the map() function ?

Cheers

On Wed, Oct 21, 2015 at 6:32 PM, Ajay Chander  wrote:

> Hi Everyone,
>
> I have a use case where I have to create a DataFrame inside the map()
> function. To create a DataFrame it need sqlContext or hiveContext. Now how
> do I pass the context to my map function ? And I am doing it in java. I
> tried creating a class "TestClass" which implements "Function"
> and inside the call method I want to create the DataFrame, so I created a
> parameterized constructor to pass context from driver program to TestClass
> and use that context to create DataFrame. But it seems like it's a wrong
> way of doing. Can anyone help me in this? Thanks in advance.
>
> Regards,
> Aj
>


Re: Spark_1.5.1_on_HortonWorks

2015-10-21 Thread Frans Thamura
talking about spark in hdp

Is there reference about Spark-R, and what version should we install in R?
--
Frans Thamura (曽志胜)
Java Champion
Shadow Master and Lead Investor
Meruvian.
Integrated Hypermedia Java Solution Provider.

Mobile: +628557888699
Blog: http://blogs.mervpolis.com/roller/flatburger (id)

FB: http://www.facebook.com/meruvian
TW: http://www.twitter.com/meruvian / @meruvian
Website: http://www.meruvian.org

"We grow because we share the same belief."


On Thu, Oct 22, 2015 at 8:56 AM, Saisai Shao  wrote:
> How you start history server, do you still use the history server of 1.3.1,
> or you started the history server in 1.5.1?
>
> The Spark tarball you used is the community version, so Application
> TimelineServer based history provider is not supported, you could comment
> this configuration "spark.history.provider", so it will use default
> FsHistoryProvider, or you could configure "spark.history.provider" to
> "org.apache.spark.deploy.history.FsHistoryProvider".
>
> If you still want to use this ATS based history server, you have to wait for
> the technical preview release of Hortonworks.
>
> Thanks
> Saisai
>
>
> On Thu, Oct 22, 2015 at 9:47 AM, Ajay Chander  wrote:
>>
>> Hi Sasai,
>>
>> Thanks for your time. I have followed your inputs and downloaded
>> "spark-1.5.1-bin-hadoop2.6" on one of the node say node1. And when I did a
>> pie test everything seems to be working fine, except that the spark-history
>> -server running on this node1 has gone down. It was complaining about
>> missing class:
>>
>> 15/10/21 16:41:28 INFO HistoryServer: Registered signal handlers for
>> [TERM, HUP, INT]
>> 15/10/21 16:41:28 WARN SparkConf: The configuration key
>> 'spark.yarn.applicationMaster.waitTries' has been deprecated as of Spark 1.3
>> and and may be removed in the future. Please use the new key
>> 'spark.yarn.am.waitTime' instead.
>> 15/10/21 16:41:29 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> 15/10/21 16:41:29 INFO SecurityManager: Changing view acls to: root
>> 15/10/21 16:41:29 INFO SecurityManager: Changing modify acls to: root
>> 15/10/21 16:41:29 INFO SecurityManager: SecurityManager: authentication
>> disabled; ui acls disabled; users with view permissions: Set(root); users
>> with modify permissions: Set(root)
>> Exception in thread "main" java.lang.ClassNotFoundException:
>> org.apache.spark.deploy.yarn.history.YarnHistoryProvider
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at org.apache.spark.util.Utils$.classForName(Utils.scala:173)
>> at
>> org.apache.spark.deploy.history.HistoryServer$.main(HistoryServer.scala:231)
>> at
>> org.apache.spark.deploy.history.HistoryServer.main(HistoryServer.scala)
>>
>>
>> I went to the lib folder and noticed that
>> "spark-assembly-1.5.1-hadoop2.6.0.jar" is missing that class. I was able to
>> get the spark history server started with 1.3.1 but not 1.5.1. Any inputs on
>> this?
>>
>> Really appreciate your help. Thanks
>>
>> Regards,
>> Ajay
>>
>>
>>
>> On Wednesday, October 21, 2015, Saisai Shao 
>> wrote:
>>>
>>> Hi Ajay,
>>>
>>> You don't need to copy tarball to all the nodes, only one node you want
>>> to run spark application is enough (mostly the master node), Yarn will help
>>> to distribute the Spark dependencies. The link I mentioned before is the one
>>> you could follow, please read my previous mail.
>>>
>>> Thanks
>>> Saisai
>>>
>>>
>>>
>>> On Thu, Oct 22, 2015 at 1:56 AM, Ajay Chander 
>>> wrote:

 Thanks for your kind inputs. Right now I am running spark-1.3.1 on
 YARN(4 node cluster) on a HortonWorks distribution. Now I want to upgrade
 spark-1.3.1 to spark-1.5.1. So at this point of time, do I have to manually
 go and copy spark-1.5.1 tarbal to all the nodes or is there any alternative
 so that I can get it upgraded through Ambari UI ? If possible can anyone
 point me to a documentation online? Thank you.

 Regards,
 Ajay


 On Wednesday, October 21, 2015, Saisai Shao 
 wrote:
>
> Hi Frans,
>
> You could download Spark 1.5.1-hadoop 2.6 pre-built tarball and copy
> into HDP 2.3 sandbox or master node. Then copy all the conf files from
> /usr/hdp/current/spark-client/ to your /conf, or you could
> refer to this tech preview (
> http://hortonworks.com/hadoop-tutorial/apache-spark-1-4-1-technical-preview-with-hdp/
> ), in "installing chapter", step 4 ~ 8 is what you need to do.
>
> Thanks
> Saisai
>
> On Wed, Oct 21, 2015 at 1:27 PM, Frans Thamura 
> wrote:
>>
>>

Re: Spark_1.5.1_on_HortonWorks

2015-10-21 Thread Saisai Shao
SparkR is shipped with Hortonworks version of Spark 1.4.1, there's no
difference compared to community version, you could refer to the docs of
Apache Spark. It would be better to ask HDP related questions in (
http://hortonworks.com/community/forums/forum/spark/ ). Sorry for not so
familiar with SparkR related things.

Thanks
Saisai

On Thu, Oct 22, 2015 at 11:02 AM, Frans Thamura  wrote:

> talking about spark in hdp
>
> Is there reference about Spark-R, and what version should we install in R?
> --
> Frans Thamura (曽志胜)
> Java Champion
> Shadow Master and Lead Investor
> Meruvian.
> Integrated Hypermedia Java Solution Provider.
>
> Mobile: +628557888699
> Blog: http://blogs.mervpolis.com/roller/flatburger (id)
>
> FB: http://www.facebook.com/meruvian
> TW: http://www.twitter.com/meruvian / @meruvian
> Website: http://www.meruvian.org
>
> "We grow because we share the same belief."
>
>
> On Thu, Oct 22, 2015 at 8:56 AM, Saisai Shao 
> wrote:
> > How you start history server, do you still use the history server of
> 1.3.1,
> > or you started the history server in 1.5.1?
> >
> > The Spark tarball you used is the community version, so Application
> > TimelineServer based history provider is not supported, you could comment
> > this configuration "spark.history.provider", so it will use default
> > FsHistoryProvider, or you could configure "spark.history.provider" to
> > "org.apache.spark.deploy.history.FsHistoryProvider".
> >
> > If you still want to use this ATS based history server, you have to wait
> for
> > the technical preview release of Hortonworks.
> >
> > Thanks
> > Saisai
> >
> >
> > On Thu, Oct 22, 2015 at 9:47 AM, Ajay Chander 
> wrote:
> >>
> >> Hi Sasai,
> >>
> >> Thanks for your time. I have followed your inputs and downloaded
> >> "spark-1.5.1-bin-hadoop2.6" on one of the node say node1. And when I
> did a
> >> pie test everything seems to be working fine, except that the
> spark-history
> >> -server running on this node1 has gone down. It was complaining about
> >> missing class:
> >>
> >> 15/10/21 16:41:28 INFO HistoryServer: Registered signal handlers for
> >> [TERM, HUP, INT]
> >> 15/10/21 16:41:28 WARN SparkConf: The configuration key
> >> 'spark.yarn.applicationMaster.waitTries' has been deprecated as of
> Spark 1.3
> >> and and may be removed in the future. Please use the new key
> >> 'spark.yarn.am.waitTime' instead.
> >> 15/10/21 16:41:29 WARN NativeCodeLoader: Unable to load native-hadoop
> >> library for your platform... using builtin-java classes where applicable
> >> 15/10/21 16:41:29 INFO SecurityManager: Changing view acls to: root
> >> 15/10/21 16:41:29 INFO SecurityManager: Changing modify acls to: root
> >> 15/10/21 16:41:29 INFO SecurityManager: SecurityManager: authentication
> >> disabled; ui acls disabled; users with view permissions: Set(root);
> users
> >> with modify permissions: Set(root)
> >> Exception in thread "main" java.lang.ClassNotFoundException:
> >> org.apache.spark.deploy.yarn.history.YarnHistoryProvider
> >> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> >> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> >> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> >> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> >> at java.lang.Class.forName0(Native Method)
> >> at java.lang.Class.forName(Class.java:348)
> >> at org.apache.spark.util.Utils$.classForName(Utils.scala:173)
> >> at
> >>
> org.apache.spark.deploy.history.HistoryServer$.main(HistoryServer.scala:231)
> >> at
> >> org.apache.spark.deploy.history.HistoryServer.main(HistoryServer.scala)
> >>
> >>
> >> I went to the lib folder and noticed that
> >> "spark-assembly-1.5.1-hadoop2.6.0.jar" is missing that class. I was
> able to
> >> get the spark history server started with 1.3.1 but not 1.5.1. Any
> inputs on
> >> this?
> >>
> >> Really appreciate your help. Thanks
> >>
> >> Regards,
> >> Ajay
> >>
> >>
> >>
> >> On Wednesday, October 21, 2015, Saisai Shao 
> >> wrote:
> >>>
> >>> Hi Ajay,
> >>>
> >>> You don't need to copy tarball to all the nodes, only one node you want
> >>> to run spark application is enough (mostly the master node), Yarn will
> help
> >>> to distribute the Spark dependencies. The link I mentioned before is
> the one
> >>> you could follow, please read my previous mail.
> >>>
> >>> Thanks
> >>> Saisai
> >>>
> >>>
> >>>
> >>> On Thu, Oct 22, 2015 at 1:56 AM, Ajay Chander 
> >>> wrote:
> 
>  Thanks for your kind inputs. Right now I am running spark-1.3.1 on
>  YARN(4 node cluster) on a HortonWorks distribution. Now I want to
> upgrade
>  spark-1.3.1 to spark-1.5.1. So at this point of time, do I have to
> manually
>  go and copy spark-1.5.1 tarbal to all the nodes or is there any
> alternative
>  so that I can get it upgraded through Ambari UI ? If possible can
> anyone
>  point me to a documentation online? Thank you.
> 
>  Reg

Re: Getting info from DecisionTreeClassificationModel

2015-10-21 Thread sethah
I believe this question will give you the answer your looking for:  Decision
Tree Accuracy

  

Basically, you can traverse the tree from the root node.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Getting-info-from-DecisionTreeClassificationModel-tp25152p25159.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka Streaming and Filtering > 3000 partitons

2015-10-21 Thread varun sharma
You can try something like this to filter by topic:

val kafkaStringStream = KafkaUtils.createDirectStream[...]

//you might want to create Stream by fetching offsets from zk

kafkaStringStream.foreachRDD{rdd =>
  val topics = rdd.map(_._1).distinct().collect()
  if (topics.length > 0) {
val rdd_value = rdd.take(10).mkString("\n.\n")
Logger.log(this.getClass, INFO, BaseSLog(s"Printing all feeds\n$rdd_value"))

topics.foreach { topic =>
  val filteredRdd = rdd.collect { case (t, data) if t == topic => data }
  //do anything with this filteredRdd, like saving to data store
}
//update the offsets
ZookeeperManaager.updateOffsetsinZk(rdd)
  }
}

Regards

Varun


On Thu, Oct 22, 2015 at 2:44 AM, Cody Koeninger  wrote:

> Yeah, that's the general idea.
>
> Regarding the question in your code comments ... The code inside of
> foreachPartition is what's running on the executor.  It wouldn't make any
> sense to try to get a partition ID before that block.
>
> On Wed, Oct 21, 2015 at 4:07 PM, Dave Ariens 
> wrote:
>
>> Cody,
>>
>>
>>
>> First off--thanks for your contributions and blog post, I actually linked
>> to in my original question. You'll have to forgive me as I've only been
>> using Spark and writing Scala for a few days. I'm aware that the RDD
>> partitions are 1:1 with Kafka topic partitions and you can get the offset
>> ranges.  But my understand is that the below code would need to be executed
>> after the stream has been processed.
>>
>>
>>
>> Let's say we're storing our filters in a key value map where the key is
>> the topic name, and the value is a string that a message within a partition
>> of that topic must contain to match.
>>
>>
>>
>> Is this the approach you're suggesting (using your example code)?
>>
>>
>>
>> // This would get built up on the driver, likely fetching the topic and
>> filters from ZK
>>
>> val topicFilters = Map("topic1" -> "this text must match", "topic2" ->
>> "this other text must match")
>>
>>
>>
>>
>>
>> val stream = KafkaUtils.createDirectStream(...)
>>
>>   ...
>>
>>   stream.foreachRDD { rdd =>
>>
>> // Cast the rdd to an interface that lets us get an array of
>> OffsetRange
>>
>> val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>
>>
>>
>> rdd.foreachPartition { iter =>
>>
>>   // index to get the correct offset range for the rdd partition
>> we're working on
>>
>>   val osr: OffsetRange = offsetRanges(TaskContext.get.partitionId)
>>
>>
>>
>>   // get any needed data from the offset range
>>
>>   val topic = osr.topic
>>
>>   val kafkaPartitionId = osr.partition
>>
>>   val begin = osr.fromOffset
>>
>>   val end = osr.untilOffset
>>
>>
>>
>>   // Now we know the topic name, we can filter something
>>
>>   // Or could we have referenced the topic name from
>>
>>   // offsetRanges(TaskContext.get.partitionId) earlier
>>
>>   // before we entered into stream.foreachRDD...?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *From:* Cody Koeninger [mailto:c...@koeninger.org]
>> *Sent:* Wednesday, October 21, 2015 3:01 PM
>> *To:* Dave Ariens
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Kafka Streaming and Filtering > 3000 partitons
>>
>>
>>
>> The rdd partitions are 1:1 with kafka topicpartitions, so you can use
>> offsets ranges to figure out which topic a given rdd partition is for and
>> proceed accordingly.  See the kafka integration guide in the spark
>> streaming docs for more details, or
>> https://github.com/koeninger/kafka-exactly-once
>>
>>
>>
>> As far as setting offsets in ZK, there's a private interface in the spark
>> codebase that would make it a little easier for you to do that.  You can
>> see that code for reference, or there's an outstanding ticket for making it
>> public https://issues.apache.org/jira/browse/SPARK-10963
>>
>>
>>
>> On Wed, Oct 21, 2015 at 1:50 PM, Dave Ariens 
>> wrote:
>>
>> Hey folks,
>>
>>
>>
>> I have a very large number of Kafka topics (many thousands of partitions)
>> that I want to consume, filter based on topic-specific filters, then
>> produce back to filtered topics in Kafka.
>>
>>
>>
>> Using the receiver-less based approach with Spark 1.4.1 (described here
>> )
>> I am able to use either KafkaUtils.createDirectStream or
>> KafkaUtils.createRDD, consume from many topics, and filter them with the
>> same filters but I can't seem to wrap my head around how to apply
>> topic-specific filters, or to finally produce to topic-specific destination
>> topics.
>>
>>
>>
>> Another point would be that I will need to checkpoint the metadata after
>> each successful batch and set starting offsets per partition back to ZK.  I
>> expect I can do that on the final RDDs after casting them accordingly, but
>> if anyone has any expertise/guidance doing that and is willing to share,
>> I'd be pretty grateful.
>>
>>
>>
>
>


-- 
*VARUN SHARMA*
*Flipkart*
*Bangalore*


Analyzing consecutive elements

2015-10-21 Thread Sampo Niskanen
Hi,

I have analytics data with timestamps on each element.  I'd like to analyze
consecutive elements using Spark, but haven't figured out how to do this.

Essentially what I'd want is a transform from a sorted RDD [A, B, C, D, E]
to an RDD [(A,B), (B,C), (C,D), (D,E)].  (Or some other way to analyze
time-related elements.)

How can this be achieved?


*Sampo Niskanen*

*Lead developer / Wellmo*
sampo.niska...@wellmo.com
+358 40 820 5291


Request for submitting Spark jobs in code purely, without jar

2015-10-21 Thread ??????
Hi developers, I've encountered some problem with Spark, and before opening an 
issue, I'd like to hear your thoughts.


Currently, if you want to submit a Spark job, you'll need to write the code, 
make a jar, and then submit it with spark-submit or 
org.apache.spark.launcher.SparkLauncher. 


But sometimes, the RDD operation chain is transferred dynamically in code, from 
SQL or even GUI. thus it seems either inconvenient or not possible to make a 
separated jar. Then I tried something like below:
val conf = new SparkConf().setAppName("Demo").setMaster("yarn-client")val sc = 
new SparkContext(conf)sc.textFile("README.md").flatMap(_.split(" ")).map((_, 
1)).reduceByKey(_+_).foreach(println) // A simple word countWhen they are 
executed, a Spark job is submitted. However, there are some remaining problems:
1. It doesn't support all deploy modes, such as yarn-cluster.
2. With the "Only 1 SparkContext in 1 JVM" limit, I can not run this twice.
3. It runs within the same process with my code, no child process is created.



Thus, what I wish for is that the problems can be handle by Spark itself, and 
my request can be simply described as a "adding submit() method for 
SparkContext / StreamingContext / SQLContext". I hope if I added a line after 
the code above like this:
sc.submit()then Spark can handle all background submitting processing for me.

I already opened an issue before for this demand, but I couldn't make myself 
clear back then. So I wrote this email and try to talk to you guys. Please 
reply if you need further descriptions, and I'll open a issue for this if you 
understand my demand and believe that it's something worth doing.


Thanks a lot.


Yuhang Chen.

yuhang.c...@foxmail.com