Spark SQL with Hive error: "Conf non-local session path expected to be non-null;"

2015-10-04 Thread YaoPau
I've been experimenting with using PySpark SQL to query Hive tables for the
last week and all has been smooth, but on a command I've run hundreds of
times successfully (a basic SELECT * ...), suddenly this error started
popping up every time I ran a sqlCtx command until I restarted my session. 
It's now working again after the restart.

Any idea what this could mean?  I'd like to start rolling this out to other
users and would like to be able to help if they run into the same.

An error occurred while calling o33.sql.
: org.apache.spark.sql.AnalysisException: Conf non-local session path
expected to be non-null;
at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:260)
at
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:41)
at
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:40)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38)
at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:139)
at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:139)
at
org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:96)
at
org.apache.spark.sql.SparkSQLParser$$anonfun$org$apache$spark$sql$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:95)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(AbstractSparkSQLParser.scala:38)
at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:235)
at
org.apache.spark.sql.hive.HiveContext$$anonfun$sql$1.apply(HiveContext.scala:92)
at
org.apache.spark.sql.hive.HiveContext$$anonfun$sql$1.apply(HiveContext.scala:92)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:92)
at sun.reflect.GeneratedMethodAccessor40.invoke(Unknown Source)
at

Re: How to install a Spark Package?

2015-10-04 Thread Ted Yu
Are you talking about package which is listed on
http://spark-packages.org

The package should come with installation instructions, right ?

> On Oct 4, 2015, at 8:55 PM, jeff saremi  wrote:
> 
> So that it is available even in offline mode? I can't seem to be able to find 
> any notes on that
> thanks
> jeff


Re: Secondary Sorting in Spark

2015-10-04 Thread Koert Kuipers
See also https://github.com/tresata/spark-sorted
On Oct 5, 2015 3:41 AM, "Bill Bejeck"  wrote:

> I've written blog post on secondary sorting in Spark and I'd thought I'd
> share it with the group
>
> http://codingjunkie.net/spark-secondary-sort/
>
> Thanks,
> Bill
>


Re: How to optimize group by query fired using hiveContext.sql?

2015-10-04 Thread Alex Rovner
Can you at least copy paste the error(s) you are seeing when the job fails?
Without the error message(s), it's hard to even suggest anything.

*Alex Rovner*
*Director, Data Engineering *
*o:* 646.759.0052

* *

On Sat, Oct 3, 2015 at 9:50 AM, Umesh Kacha  wrote:

> Hi thanks I cant share yarn logs because of privacy in my company but I
> can tell you I have seen yarn logs there I have not found anything except
> YARN killing container because it is exceeds physical memory capacity.
>
> I am using the following command line script Above job launches around
> 1500 ExecutorService threads from a driver with a thread pool of 15 so at a
> time 15 jobs will be running as showing in UI.
>
> ./spark-submit --class com.xyz.abc.MySparkJob
>
> --conf "spark.executor.extraJavaOptions=-XX:MaxPermSize=512M" -
>
> -driver-java-options -XX:MaxPermSize=512m -
>
> -driver-memory 4g --master yarn-client
>
> --executor-memory 27G --executor-cores 2
>
> --num-executors 40
>
> --jars /path/to/others-jars
>
> /path/to/spark-job.jar
>
>
> On Sat, Oct 3, 2015 at 7:11 PM, Alex Rovner 
> wrote:
>
>> Can you send over your yarn logs along with the command you are using to
>> submit your job?
>>
>> *Alex Rovner*
>> *Director, Data Engineering *
>> *o:* 646.759.0052
>>
>> * *
>>
>> On Sat, Oct 3, 2015 at 9:07 AM, Umesh Kacha 
>> wrote:
>>
>>> Hi Alex thanks much for the reply. Please read the following for more
>>> details about my problem.
>>>
>>>
>>> http://stackoverflow.com/questions/32317285/spark-executor-oom-issue-on-yarn
>>>
>>> My each container has 8 core and 30 GB max memory. So I am using
>>> yarn-client mode using 40 executors with 27GB/2 cores. If I use more cores
>>> then my job start loosing more executors. I tried to set
>>> spark.yarn.executor.memoryOverhead around 2 GB even 8 GB but it does
>>> not help I loose executors no matter what. The reason is my jobs shuffles
>>> lots of data even 20 GB of data in every job in UI I have seen it. Shuffle
>>> happens because of group by and I cant avoid it in my case.
>>>
>>>
>>>
>>> On Sat, Oct 3, 2015 at 6:27 PM, Alex Rovner 
>>> wrote:
>>>
 This sounds like you need to increase YARN overhead settings with the 
 "spark.yarn.executor.memoryOverhead"
 parameter. See http://spark.apache.org/docs/latest/running-on-yarn.html
 for more information on the setting.

 If that does not work for you, please provide the error messages and
 the command line you are using to submit your jobs for further
 troubleshooting.


 *Alex Rovner*
 *Director, Data Engineering *
 *o:* 646.759.0052

 * *

 On Sat, Oct 3, 2015 at 6:19 AM, unk1102  wrote:

> Hi I have couple of Spark jobs which uses group by query which is
> getting
> fired from hiveContext.sql() Now I know group by is evil but my use
> case I
> cant avoid group by I have around 7-8 fields on which I need to do
> group by.
> Also I am using df1.except(df2) which also seems heavy operation and
> does
> lots of shuffling please see my UI snap
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n24914/IMG_20151003_151830218.jpg
> >
>
> I have tried almost all optimisation including Spark 1.5 but nothing
> seems
> to be working and my job fails hangs because of executor will reach
> physical
> memory limit and YARN will kill it. I have around 1TB of data to
> process and
> it is skewed. Please guide.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-optimize-group-by-query-fired-using-hiveContext-sql-tp24914.html
> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

>>>
>>
>


Spark 1.5.0 Error on startup

2015-10-04 Thread Julius Fernandes
I have Spark 1.5.0 (Prebuilt for Hadoop 2.6) with JDK 1.7.

NOTE: I do not have Hadoop installation.

When ever I start spark-shell, I get the following error

Caused by: java.lang.NullPointerException
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1010)
at org.apache.hadoop.util.Shell.runCommand(Shell.java:482)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:808)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:791)
at org.apache.hadoop.fs.FileUtil.execCommand(FileUtil.java:1097)
at
org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.loadPermissionInfo(RawLocalFileSystem.java:582)
at
org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.getPermission(RawLocalFileSystem.java:557)
at
org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:599)
at
org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:554)
at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:508)
... 56 more
:10: error: not found: value sqlContext
   import sqlContext.implicits._
  ^
:10: error: not found: value sqlContext
   import sqlContext.sql
  ^
scala>

Can someone point what could we resolve the issue here?


How to install a Spark Package?

2015-10-04 Thread jeff saremi
So that it is available even in offline mode? I can't seem to be able to find 
any notes on thatthanksjeff 

ml.Pipeline without train step

2015-10-04 Thread Jaonary Rabarisoa
Hi there,

The Pipeline of ml package is really a great feature and we use it in our
every day task. But we have some use case where we need a Pipeline of
Transformers only and the problem is that there's not train phase in that
case. For example, we have a pipeline of image analytics with the following
step : ImageDecoder -> FeatureExtraction -> Maching Learning algo. Some
times, the machine learning algorithms are not part of the pipeline but we
still need the ImageDecoder + FeatureExtraction steps.
The FeatureExtraction step has no train phase (FFT, SIFT, PreTrained CNN,
...) and it this case calling .fit method in order to get the pipeline
model has no meaningful signification.
How can we handle this case correctly ?

Cheers,

Jao


Re: preferredNodeLocationData, SPARK-8949, and SparkContext - a leftover?

2015-10-04 Thread Sean Owen
I think it's unused as the JIRA says, but removing it from the
constructors would change the API, so that's why it stays in the
signature. Removing the internal field and one usage of it seems OK,
though I don't think it would help much of anything.

On Sun, Oct 4, 2015 at 4:36 AM, Jacek Laskowski  wrote:
> Hi,
>
> I've been reviewing SparkContext and found preferredNodeLocationData
> that was made obsoleted by SPARK-8949 [1].
>
> When you search where SparkContext.preferredNodeLocationData is used,
> you find 3 places - one constructor marked @deprecated, the other with
> logWarning telling us that "Passing in preferred locations has no
> effect at all, see SPARK-8949", and in
> org.apache.spark.deploy.yarn.ApplicationMaster.registerAM method.
>
> org.apache.spark.deploy.yarn.ApplicationMaster.registerAM method
> caught my eye and I found that it does the following in
> client.register:
>
> if (sc != null) sc.preferredNodeLocationData else Map()
>
> However, AFAIU client.register ignores the input parameter completely
> (!) It's not used in the body of the method and seems a leftover. The
> input parameter should be removed and so should the above line.
>
> What do you think? Should I report an issue and clean it up via a pull req?
>
> BTW, What do you think about removing
> SparkContext.preferredNodeLocationData as part of the cleanup?
>
> [1] https://issues.apache.org/jira/browse/SPARK-8949
>
> Pozdrawiam,
> Jacek
>
> --
> Jacek Laskowski | http://blog.japila.pl | http://blog.jaceklaskowski.pl
> Follow me at https://twitter.com/jaceklaskowski
> Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



java.lang.OutOfMemoryError: GC overhead limit exceeded

2015-10-04 Thread t_ras
I get java.lang.OutOfMemoryError: GC overhead limit exceeded when trying
coutn action on a file.

The file is a CSV file 217GB zise

Im using a 10 r3.8xlarge(ubuntu) machines cdh 5.3.6 and spark 1.2.0

configutation:

spark.app.id:local-1443956477103

spark.app.name:Spark shell

spark.cores.max:100

spark.driver.cores:24

spark.driver.extraLibraryPath:/opt/cloudera/parcels/CDH-5.3.6-1.cdh5.3.6.p0.11/lib/hadoop/lib/native
spark.driver.host:ip-172-31-34-242.us-west-2.compute.internal

spark.driver.maxResultSize:300g

spark.driver.port:55123

spark.eventLog.dir:hdfs://ip-172-31-34-242.us-west-2.compute.internal:8020/user/spark/applicationHistory
spark.eventLog.enabled:true

spark.executor.extraLibraryPath:/opt/cloudera/parcels/CDH-5.3.6-1.cdh5.3.6.p0.11/lib/hadoop/lib/native

spark.executor.id:driver spark.executor.memory:200g

spark.fileserver.uri:http://172.31.34.242:51424

spark.jars: spark.master:local[*]

spark.repl.class.uri:http://172.31.34.242:58244

spark.scheduler.mode:FIFO

spark.serializer:org.apache.spark.serializer.KryoSerializer

spark.storage.memoryFraction:0.9

spark.tachyonStore.folderName:spark-88bd9c44-d626-4ad2-8df3-f89df4cb30de

spark.yarn.historyServer.address:http://ip-172-31-34-242.us-west-2.compute.internal:18088

here is what I ran:

val testrdd =
sc.textFile("hdfs://ip-172-31-34-242.us-west-2.compute.internal:8020/user/jethro/tables/edw_fact_lsx_detail/edw_fact_lsx_detail.csv")

testrdd.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)

testrdd.count()

If I dont force it in memeory it sorks fine, but considering the cluster Im
running on it should fit in memory properly.

Any ideas?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-tp24918.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Examples module not building in intellij

2015-10-04 Thread Sean Owen
It builds for me. That message usually really means you can't resolve
or download from a repo. It's just the last thing that happens to
fail.

On Sun, Oct 4, 2015 at 7:06 AM, Stephen Boesch  wrote:
>
> For a week or two the trunk has not been building for the examples module
> within intellij. The other modules - including core, sql, mllib, etc are
> working.
>
> A portion of the error message is
>
> "Unable to get dependency information: Unable to read the metadata file for
> artifact 'org.eclipse.paho:org.eclipse.paho.client.mqttv3.jar"
>
> Any tips appreciated.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: java.lang.OutOfMemoryError: GC overhead limit exceeded

2015-10-04 Thread Ted Yu
1.2.0 is quite old. 

You may want to try 1.5.1 which was released in the past week. 

Cheers

> On Oct 4, 2015, at 4:26 AM, t_ras  wrote:
> 
> I get java.lang.OutOfMemoryError: GC overhead limit exceeded when trying
> coutn action on a file.
> 
> The file is a CSV file 217GB zise
> 
> Im using a 10 r3.8xlarge(ubuntu) machines cdh 5.3.6 and spark 1.2.0
> 
> configutation:
> 
> spark.app.id:local-1443956477103
> 
> spark.app.name:Spark shell
> 
> spark.cores.max:100
> 
> spark.driver.cores:24
> 
> spark.driver.extraLibraryPath:/opt/cloudera/parcels/CDH-5.3.6-1.cdh5.3.6.p0.11/lib/hadoop/lib/native
> spark.driver.host:ip-172-31-34-242.us-west-2.compute.internal
> 
> spark.driver.maxResultSize:300g
> 
> spark.driver.port:55123
> 
> spark.eventLog.dir:hdfs://ip-172-31-34-242.us-west-2.compute.internal:8020/user/spark/applicationHistory
> spark.eventLog.enabled:true
> 
> spark.executor.extraLibraryPath:/opt/cloudera/parcels/CDH-5.3.6-1.cdh5.3.6.p0.11/lib/hadoop/lib/native
> 
> spark.executor.id:driver spark.executor.memory:200g
> 
> spark.fileserver.uri:http://172.31.34.242:51424
> 
> spark.jars: spark.master:local[*]
> 
> spark.repl.class.uri:http://172.31.34.242:58244
> 
> spark.scheduler.mode:FIFO
> 
> spark.serializer:org.apache.spark.serializer.KryoSerializer
> 
> spark.storage.memoryFraction:0.9
> 
> spark.tachyonStore.folderName:spark-88bd9c44-d626-4ad2-8df3-f89df4cb30de
> 
> spark.yarn.historyServer.address:http://ip-172-31-34-242.us-west-2.compute.internal:18088
> 
> here is what I ran:
> 
> val testrdd =
> sc.textFile("hdfs://ip-172-31-34-242.us-west-2.compute.internal:8020/user/jethro/tables/edw_fact_lsx_detail/edw_fact_lsx_detail.csv")
> 
> testrdd.persist(org.apache.spark.storage.StorageLevel.MEMORY_ONLY_SER)
> 
> testrdd.count()
> 
> If I dont force it in memeory it sorks fine, but considering the cluster Im
> running on it should fit in memory properly.
> 
> Any ideas?
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-tp24918.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka Direct Stream

2015-10-04 Thread varun sharma
I went through the story and as I understood it is for saving data to
multiple keyspaces at once.
How will it work for saving data to multiple tables in same keyspace.
I think tableName: String should also be tableName: T=>String..
Let me know if I understood incorrectly..


On Sat, Oct 3, 2015 at 9:55 PM, Gerard Maas  wrote:

> Hi,
>
> collect(partialFunction) is equivalent to filter(x=>
> partialFunction.isDefinedAt(x)).map(partialFunction)  so it's functionally
> equivalent to your expression. I favor collect for its more compact form
> but that's a personal preference. Use what you feel reads best.
>
> Regarding performance, there will be some overhead of submitting many a
> task for every filtered RDD that gets materialized to Cassandra. That's the
> reason I proposed the ticket linked above. Have a look whether that would
> improve your particular usecase and vote for it if so :-)
>
> -kr, Gerard.
>
> On Sat, Oct 3, 2015 at 3:53 PM, varun sharma 
> wrote:
>
>> Thanks Gerardthe code snippet you shared worked.. but can you please
>> explain/point me the usage of *collect* here. How it is
>> different(performance/readability) from *filter.*
>>
>>> *val filteredRdd = rdd.filter(x=> x._1 == topic).map(_._2))*
>>
>>
>> I am doing something like this.Please tell if I can improve the *Processing
>> time* of this particular code:
>>
>> kafkaStringStream.foreachRDD{rdd =>
>>   val topics = rdd.map(_._1).distinct().collect()
>>   if (topics.length > 0) {
>> val rdd_value = rdd.take(10).mkString("\n.\n")
>> Log.slogger(Log.FILE.DEFAULT, INFO, BaseSLog(s"Printing all 
>> feeds\n$rdd_value"))
>>
>> topics.foreach { topic =>
>>   //rdd.filter(x=> x._1 == topic).map(_._2)
>>   val filteredRdd = rdd.collect { case (t, data) if t == topic => data }
>>   CassandraHelper.saveDataToCassandra(topic, filteredRdd)
>> }
>> updateOffsetsinZk(rdd)
>>   }
>>
>> }
>>
>> On Fri, Oct 2, 2015 at 11:58 PM, Gerard Maas 
>> wrote:
>>
>>> Something like this?
>>>
>>> I'm making the assumption that your topic name equals your keyspace for
>>> this filtering example.
>>>
>>> dstream.foreachRDD{rdd =>
>>>   val topics = rdd.map(_._1).distinct.collect
>>>   topics.foreach{topic =>
>>> val filteredRdd =  rdd.collect{case (t, data) if t == topic => data}.
>>> filteredRdd.saveToCassandra(topic, "table")  // do not confuse this
>>> collect with rdd.collect() that brings data to the driver
>>>   }
>>> }
>>>
>>>
>>> I'm wondering: would something like this (
>>> https://datastax-oss.atlassian.net/browse/SPARKC-257) better fit your
>>> purposes?
>>>
>>> -kr, Gerard.
>>>
>>> On Fri, Oct 2, 2015 at 8:12 PM, varun sharma 
>>> wrote:
>>>
 Hi Adrian,

 Can you please give an example of how to achieve this:

> *I would also look at filtering by topic and saving as different
> Dstreams in your code*

 I have managed to get DStream[(String, String)] which is (
 *topic,my_data)* tuple. Lets call it kafkaStringStream.
 Now if I do kafkaStringStream.groupByKey() then I would get a
 DStream[(String,Iterable[String])].
 But I want a DStream instead of Iterable in order to apply
 saveToCassandra for storing it.

 Please help in how to transform iterable to DStream or any other
 workaround for achieving same.


 On Thu, Oct 1, 2015 at 8:17 PM, Adrian Tanase 
 wrote:

> On top of that you could make the topic part of the key (e.g. keyBy in
> .transform or manually emitting a tuple) and use one of the .xxxByKey
> operators for the processing.
>
> If you have a stable, domain specific list of topics (e.g. 3-5 named
> topics) and the processing is *really* different, I would also look
> at filtering by topic and saving as different Dstreams in your code.
>
> Either way you need to start with Cody’s tip in order to extract the
> topic name.
>
> -adrian
>
> From: Cody Koeninger
> Date: Thursday, October 1, 2015 at 5:06 PM
> To: Udit Mehta
> Cc: user
> Subject: Re: Kafka Direct Stream
>
> You can get the topic for a given partition from the offset range.
> You can either filter using that; or just have a single rdd and match on
> topic when doing mapPartitions or foreachPartition (which I think is a
> better idea)
>
>
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>
> On Wed, Sep 30, 2015 at 5:02 PM, Udit Mehta 
> wrote:
>
>> Hi,
>>
>> I am using spark direct stream to consume from multiple topics in
>> Kafka. I am able to consume fine but I am stuck at how to separate the 
>> data
>> for each topic since I need to process data differently depending on the
>> topic.
>> I basically want to 

Re: Examples module not building in intellij

2015-10-04 Thread Stephen Boesch
Thanks Sean.  Why would a repo not be resolvable from IJ eve though all
modules build properly on the command line?

2015-10-04 2:47 GMT-07:00 Sean Owen :

> It builds for me. That message usually really means you can't resolve
> or download from a repo. It's just the last thing that happens to
> fail.
>
> On Sun, Oct 4, 2015 at 7:06 AM, Stephen Boesch  wrote:
> >
> > For a week or two the trunk has not been building for the examples module
> > within intellij. The other modules - including core, sql, mllib, etc are
> > working.
> >
> > A portion of the error message is
> >
> > "Unable to get dependency information: Unable to read the metadata file
> for
> > artifact 'org.eclipse.paho:org.eclipse.paho.client.mqttv3.jar"
> >
> > Any tips appreciated.
>


Examples module not building in intellij

2015-10-04 Thread Stephen Boesch
For a week or two the trunk has not been building for the examples module
within intellij. The other modules - including core, sql, mllib, etc *are *
working.

A portion of the error message is

"Unable to get dependency information: Unable to read the metadata file for
artifact 'org.eclipse.paho:org.eclipse.paho.client.mqttv3.jar"

Any tips appreciated.


Re: Hive ORC Malformed while loading into spark data frame

2015-10-04 Thread Umesh Kacha
Thanks much Zhan Zhang. I will open a JIRA saying orc files created using
hiveContext.sql can't be read by dataframe reader.

Regards,
Umesh
On Oct 4, 2015 10:14, "Zhan Zhang"  wrote:

> HI Umesh,
>
> It depends on how you create and read the orc file, although everything
> happens in side of spark. Because there are two paths in spark to create
> table, one is through hive, and the other one is through data frame. Due to
> version compatibility issue,
> there may be conflicts between these two paths. You have to use
> dataframe.write and dataframe.read to avoid such issue. The ORC path has to
> be upgraded to the same version as hive to solve this issue.
>
> Because ORC becomes a independent project now, and we are waiting for it
> to be totally isolated from hive. Then we can upgrade ORC to latest
> version, and put it to SqlContext. I think you can open a JIRA to tracking
> this upgrade.
>
> BTW, my name is Zhan Zhang instead of Zang.
>
> Thanks.
>
> Zhan Zhang
>
> On Oct 3, 2015, at 2:18 AM, Umesh Kacha  wrote:
>
> Hi Zang any idea why is this happening? I can load ORC files created by
> Hive table but I cant load ORC files created by Spark itself. It looks like
> bug.
>
> On Wed, Sep 30, 2015 at 12:03 PM, Umesh Kacha 
> wrote:
>
>> Hi Zang thanks much please find the code below
>>
>> Working code loading data from a path created by Hive table using hive
>> console outside of spark :
>>
>> DataFrame df =
>> hiveContext.read().format("orc").load("/hdfs/path/to/hive/table/partition")
>>
>> Not working code inside spark hive tables created using hiveContext.sql
>> insert into partition queries
>>
>> DataFrame df =
>> hiveContext.read().format("orc").load("/hdfs/path/to/hive/table/partition/created/by/spark")
>>
>> You see above is same in both cases just second code is trying to load
>> orc data created by Spark.
>> On Sep 30, 2015 11:22 AM, "Zhan Zhang"  wrote:
>>
>>> Hi Umesh,
>>>
>>> The potential reason is that Hive and Spark does not use same
>>> OrcInputFormat. In new hive version, there are NewOrcInputFormat, but it is
>>> not in spark because of backward compatibility (which is not available in
>>> hive-0.12).
>>> Do you mind post the code that works and not works for you?
>>>
>>> Thanks.
>>>
>>> Zhan Zhang
>>>
>>> On Sep 29, 2015, at 10:05 PM, Umesh Kacha  wrote:
>>>
>>> Hi I can read/load orc data created by hive table in a dataframe why is
>>> it throwing Malformed ORC exception when I try to load data created by
>>> hiveContext.sql into dataframe?
>>> On Sep 30, 2015 2:37 AM, "Hortonworks"  wrote:
>>>
 You can try to use data frame for both read and write

 Thanks

 Zhan Zhang


 Sent from my iPhone

 On Sep 29, 2015, at 1:56 PM, Umesh Kacha  wrote:

 Hi Zang, thanks for the response. Table is created using Spark
 hiveContext.sql and data inserted into table also using hiveContext.sql.
 Insert into partition table. When I try to load orc data into dataframe I
 am loading particular partition data stored in path say
 /user/xyz/Hive/xyz.db/sparktable/partition1=abc

 Regards,
 Umesh
 On Sep 30, 2015 02:21, "Hortonworks"  wrote:

> How was the table is generated, by hive or by spark?
>
> If you generate table using have but read it by data frame, it may
> have some comparability issue.
>
> Thanks
>
> Zhan Zhang
>
>
> Sent from my iPhone
>
> > On Sep 29, 2015, at 1:47 PM, unk1102  wrote:
> >
> > Hi I have a spark job which creates hive tables in orc format with
> > partitions. It works well I can read data back into hive table using
> hive
> > console. But if I try further process orc files generated by Spark
> job by
> > loading into dataframe  then I get the following exception
> > Caused by: java.io.IOException: Malformed ORC file
> > hdfs://localhost:9000/user/hive/warehouse/partorc/part_tiny.txt.
> Invalid
> > postscript.
> >
> > Dataframe df = hiveContext.read().format("orc").load(to/path);
> >
> > Please guide.
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Hive-ORC-Malformed-while-loading-into-spark-data-frame-tp24876.html
> > Sent from the Apache Spark User List mailing list archive at
> Nabble.com .
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
> >
>
> --
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or
> entity to
> which it is 

Re: preferredNodeLocationData, SPARK-8949, and SparkContext - a leftover?

2015-10-04 Thread Jacek Laskowski
Hi,

You're right - it is unused, but the code does some (very little)
initialization as if it'd be really needed. Confusion is seeded.

I filled https://issues.apache.org/jira/browse/SPARK-10921 to track it.

The other reason I brought it up was to help myself (and hopefully
others) who read the code and are constantly distracted by important
things that…are turning out not be be so whatsoever. I spent a couple
of hours yesterday while reading the sources for its uses as I
initially thought the YARN-specific feature in Spark was really
important (that eventually caught my attention and I kept digging
deeper) until I'd found it is a leftover.

Read the comment in
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala#L93-L96:

  // This is used only by YARN for now, but should be relevant to
other cluster types (Mesos,
  // etc) too. This is typically generated from
InputFormatInfo.computePreferredLocations. It
  // contains a map from hostname to a list of input format splits on the host.
  private[spark] var preferredNodeLocationData: Map[String,
Set[SplitInfo]] = Map()

What would you think about the var? I was convinced it's important for
Spark on YARN. Would "Removing the internal field and one usage of it
seems OK, though I don't think it would help much of anything." still
hold? I don't think so and hence the issue reported.

Pozdrawiam,
Jacek

--
Jacek Laskowski | http://blog.japila.pl | http://blog.jaceklaskowski.pl
Follow me at https://twitter.com/jaceklaskowski
Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski


On Sun, Oct 4, 2015 at 5:50 AM, Sean Owen  wrote:
> I think it's unused as the JIRA says, but removing it from the
> constructors would change the API, so that's why it stays in the
> signature. Removing the internal field and one usage of it seems OK,
> though I don't think it would help much of anything.
>
> On Sun, Oct 4, 2015 at 4:36 AM, Jacek Laskowski  wrote:
>> Hi,
>>
>> I've been reviewing SparkContext and found preferredNodeLocationData
>> that was made obsoleted by SPARK-8949 [1].
>>
>> When you search where SparkContext.preferredNodeLocationData is used,
>> you find 3 places - one constructor marked @deprecated, the other with
>> logWarning telling us that "Passing in preferred locations has no
>> effect at all, see SPARK-8949", and in
>> org.apache.spark.deploy.yarn.ApplicationMaster.registerAM method.
>>
>> org.apache.spark.deploy.yarn.ApplicationMaster.registerAM method
>> caught my eye and I found that it does the following in
>> client.register:
>>
>> if (sc != null) sc.preferredNodeLocationData else Map()
>>
>> However, AFAIU client.register ignores the input parameter completely
>> (!) It's not used in the body of the method and seems a leftover. The
>> input parameter should be removed and so should the above line.
>>
>> What do you think? Should I report an issue and clean it up via a pull req?
>>
>> BTW, What do you think about removing
>> SparkContext.preferredNodeLocationData as part of the cleanup?
>>
>> [1] https://issues.apache.org/jira/browse/SPARK-8949
>>
>> Pozdrawiam,
>> Jacek
>>
>> --
>> Jacek Laskowski | http://blog.japila.pl | http://blog.jaceklaskowski.pl
>> Follow me at https://twitter.com/jaceklaskowski
>> Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Mini projects for spark novice

2015-10-04 Thread Rahul Jeevanandam
I am currently learning Spark and I wanna solidify my knowledge on Spark,
hence I wanna do some projects on it. Can you suggest me some nice project
ideas to work on Spark?

-- 
Regards,

*Rahul*


Re: Spark Streaming over YARN

2015-10-04 Thread nibiau
4 partitions.

- Mail original -
De: "Dibyendu Bhattacharya" 
À: "Nicolas Biau" 
Cc: "Cody Koeninger" , "user" 
Envoyé: Dimanche 4 Octobre 2015 16:51:38
Objet: Re: Spark Streaming over YARN


How many partitions are there in your Kafka topic ? 


Regards, 
Dibyendu 


On Sun, Oct 4, 2015 at 8:19 PM, < nib...@free.fr > wrote: 


Hello, 
I am using https://github.com/dibbhatt/kafka-spark-consumer 
I specify 4 receivers in the ReceiverLauncher , but in YARN console I can see 
one node receiving the kafka flow. 
(I use spark 1.3.1) 

Tks 
Nicolas 


- Mail original - 
De: "Dibyendu Bhattacharya" < dibyendu.bhattach...@gmail.com > 
À: nib...@free.fr 
Cc: "Cody Koeninger" < c...@koeninger.org >, "user" < user@spark.apache.org > 
Envoyé: Vendredi 2 Octobre 2015 18:21:35 


Objet: Re: Spark Streaming over YARN 


If your Kafka topic has 4 partitions , and if you specify 4 Receivers, messages 
from each partitions are received by a dedicated receiver. so your receiving 
parallelism is defined by your number of partitions of your topic . Every 
receiver task will be scheduled evenly among nodes in your cluster. There was a 
JIRA fixed in spark 1.5 which does even distribution of receivers. 





Now for RDD parallelism ( i.e parallelism while processing your RDD ) is 
controlled by your Block Interval and Batch Interval. 


If your block Interval is 200 Ms, there will be 5 blocks per second. If your 
Batch Interval is 3 seconds, there will 15 blocks per batch. And every Batch is 
one RDD , thus your RDD will be 15 partition , which will be honored during 
processing the RDD .. 




Regards, 
Dibyendu 




On Fri, Oct 2, 2015 at 9:40 PM, < nib...@free.fr > wrote: 


Ok so if I set for example 4 receivers (number of nodes), how RDD will be 
distributed over the nodes/core. 
For example in my example I have 4 nodes (with 2 cores) 

Tks 
Nicolas 


- Mail original - 
De: "Dibyendu Bhattacharya" < dibyendu.bhattach...@gmail.com > 
À: nib...@free.fr 
Cc: "Cody Koeninger" < c...@koeninger.org >, "user" < user@spark.apache.org > 
Envoyé: Vendredi 2 Octobre 2015 18:01:59 


Objet: Re: Spark Streaming over YARN 


Hi, 


If you need to use Receiver based approach , you can try this one : 
https://github.com/dibbhatt/kafka-spark-consumer 


This is also part of Spark packages : 
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer 


You just need to specify the number of Receivers you want for desired 
parallelism while receiving , and rest of the thing will be taken care by 
ReceiverLauncher. 


This Low level Receiver will give better parallelism both on receiving , and on 
processing the RDD. 


Default Receiver based API ( KafkaUtils.createStream) using Kafka High level 
API and Kafka high Level API has serious issue to be used in production . 




Regards, 

Dibyendu 










On Fri, Oct 2, 2015 at 9:22 PM, < nib...@free.fr > wrote: 


>From my understanding as soon as I use YARN I don't need to use parrallelisme 
>(at least for RDD treatment) 
I don't want to use direct stream as I have to manage the offset positionning 
(in order to be able to start from the last offset treated after a spark job 
failure) 


- Mail original - 
De: "Cody Koeninger" < c...@koeninger.org > 
À: "Nicolas Biau" < nib...@free.fr > 
Cc: "user" < user@spark.apache.org > 
Envoyé: Vendredi 2 Octobre 2015 17:43:41 
Objet: Re: Spark Streaming over YARN 




If you're using the receiver based implementation, and want more parallelism, 
you have to create multiple streams and union them together. 


Or use the direct stream. 


On Fri, Oct 2, 2015 at 10:40 AM, < nib...@free.fr > wrote: 


Hello, 
I have a job receiving data from kafka (4 partitions) and persisting data 
inside MongoDB. 
It works fine, but when I deploy it inside YARN cluster (4 nodes with 2 cores) 
only on node is receiving all the kafka partitions and only one node is 
processing my RDD treatment (foreach function) 
How can I force YARN to use all the resources nodes and cores to process the 
data (receiver & RDD treatment) 

Tks a lot 
Nicolas 

- 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
For additional commands, e-mail: user-h...@spark.apache.org 



- 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
For additional commands, e-mail: user-h...@spark.apache.org 




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to use registered Hive UDF in Spark DataFrame?

2015-10-04 Thread Umesh Kacha
Hi I tried to use callUDF in the following way it throws exception saying
cant recognise myUDF even though I registered it.

List colList = new ArrayList();
colSeq.add(col("myColumn").as("modifiedColumn"));
Seq colSeq = JavaConversions.asScalaBuffer(colList);//I need to do
this because the following call wont accept just one col() it needs
Seq
DataFrame resultFrame =
sourceFrame.select(callUDF("MyUDF").toString(),colSeq);

Above call fails saying cant recognise ''MyUDF myColumn as modifiedColumn'
in given columns bla bla...

On Sat, Oct 3, 2015 at 2:36 AM, Michael Armbrust 
wrote:

> callUDF("MyUDF", col("col1").as("name")
>
> or
>
> callUDF("MyUDF", col("col1").alias("name")
>
> On Fri, Oct 2, 2015 at 3:29 PM, Umesh Kacha  wrote:
>
>> Hi Michael,
>>
>> Thanks much. How do we give alias name for resultant columns? For e.g.
>> when using
>>
>> hiveContext.sql("select MyUDF("test") as mytest from myTable");
>>
>> how do we do that in DataFrame callUDF
>>
>> callUDF("MyUDF", col("col1"))???
>>
>> On Fri, Oct 2, 2015 at 8:23 PM, Michael Armbrust 
>> wrote:
>>
>>> import org.apache.spark.sql.functions.*
>>>
>>> callUDF("MyUDF", col("col1"), col("col2"))
>>>
>>> On Fri, Oct 2, 2015 at 6:25 AM, unk1102  wrote:
>>>
 Hi I have registed my hive UDF using the following code:

 hiveContext.udf().register("MyUDF",new UDF1(String,String)) {
 public String call(String o) throws Execption {
 //bla bla
 }
 },DataTypes.String);

 Now I want to use above MyUDF in DataFrame. How do we use it? I know
 how to
 use it in a sql and it works fine

 hiveContext.sql(select MyUDF("test") from myTable);

 My hiveContext.sql() query involves group by on multiple columns so for
 scaling purpose I am trying to convert this query into DataFrame APIs


 dataframe.select("col1","col2","coln").groupby(""col1","col2","coln").count();

 Can we do the follwing dataframe.select(MyUDF("col1"))??? Please guide.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-registered-Hive-UDF-in-Spark-DataFrame-tp24907.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>>
>


Re: Limiting number of cores per job in multi-threaded driver.

2015-10-04 Thread Philip Weaver
Yes, I am sharing the cluster across many jobs, and each jobs only needs 8
cores (in fact, because the jobs are so small and are counting uniques, it
only gets slower as you add more cores). My question is how to limit each
job to only use 8 cores, but have the entire cluster available for that
SparkContext; e.g. if I have a cluster of 128 cores, and I want to limit
the SparkCOntext to 64 cores, and each job to 8 cores, so I can run up to 8
jobs at once.

On Sun, Oct 4, 2015 at 9:38 AM, Adrian Tanase  wrote:

> You are absolutely correct, I apologize.
>
> My understanding was that you are sharing the machine across many jobs.
> That was the context in which I was making that comment.
>
> -adrian
>
> Sent from my iPhone
>
> On 03 Oct 2015, at 07:03, Philip Weaver  wrote:
>
> You can't really say 8 cores is not much horsepower when you have no idea
> what my use case is. That's silly.
>
> On Fri, Sep 18, 2015 at 10:33 PM, Adrian Tanase  wrote:
>
>> Forgot to mention that you could also restrict the parallelism to 4,
>> essentially using only 4 cores at any given time, however if your job is
>> complex, a stage might be broken into more than 1 task...
>>
>> Sent from my iPhone
>>
>> On 19 Sep 2015, at 08:30, Adrian Tanase  wrote:
>>
>> Reading through the docs it seems that with a combination of FAIR
>> scheduler and maybe pools you can get pretty far.
>>
>> However the smallest unit of scheduled work is the task so probably you
>> need to think about the parallelism of each transformation.
>>
>> I'm guessing that by increasing the level of parallelism you get many
>> smaller tasks that the scheduler can then run across the many jobs you
>> might have - as opposed to fewer, longer tasks...
>>
>> Lastly, 8 cores is not that much horsepower :)
>> You may consider running with beefier machines or a larger cluster, to
>> get at least tens of cores.
>>
>> Hope this helps,
>> -adrian
>>
>> Sent from my iPhone
>>
>> On 18 Sep 2015, at 18:37, Philip Weaver  wrote:
>>
>> Here's a specific example of what I want to do. My Spark application is
>> running with total-executor-cores=8. A request comes in, it spawns a thread
>> to handle that request, and starts a job. That job should use only 4 cores,
>> not all 8 of the cores available to the cluster.. When the first job is
>> scheduled, it should take only 4 cores, not all 8 of the cores that are
>> available to the driver.
>>
>> Is there any way to accomplish this? This is on mesos.
>>
>> In order to support the use cases described in
>> https://spark.apache.org/docs/latest/job-scheduling.html, where a spark
>> application runs for a long time and handles requests from multiple users,
>> I believe what I'm asking about is a very important feature. One of the
>> goals is to get lower latency for each request, but if the first request
>> takes all resources and we can't guarantee any free resources for the
>> second request, then that defeats the purpose. Does that make sense?
>>
>> Thanks in advance for any advice you can provide!
>>
>> - Philip
>>
>> On Sat, Sep 12, 2015 at 10:40 PM, Philip Weaver 
>> wrote:
>>
>>> I'm playing around with dynamic allocation in spark-1.5.0, with the FAIR
>>> scheduler, so I can define a long-running application capable of executing
>>> multiple simultaneous spark jobs.
>>>
>>> The kind of jobs that I'm running do not benefit from more than 4 cores,
>>> but I want my application to be able to take several times that in order to
>>> run multiple jobs at the same time.
>>>
>>> I suppose my question is more basic: How can I limit the number of cores
>>> used to load an RDD or DataFrame? I can immediately repartition or coalesce
>>> my RDD or DataFrame to 4 partitions after I load it, but that doesn't stop
>>> Spark from using more cores to load it.
>>>
>>> Does it make sense what I am trying to accomplish, and is there any way
>>> to do it?
>>>
>>> - Philip
>>>
>>>
>>
>


Enriching df.write.jdbc

2015-10-04 Thread Kapil Raaj
Hello folks,

I would like to contribute code to enrich DataFrame writer api for JDBC to
cover "Update table" feature based on some field name/key passed as LIST of
Strings.

Use Case:
1. df.write.mode(*"Update"*).jdbc(connectionString, "table_name"
,connectionProperties, *keys*)
Or
2. df.write.mode(SaveMode.Append).jdbc(connectionString, "table_name"
,connectionProperties, *keys*)

For the second implementation if "keys" is an empty list it'll work as it
is working for now, if "keys" have something, it'll update those entries.

 Let me know which (1 or 2) is better, I think 2 looks better as I don't
want to introduce a new ENUM for SaveMode, moreover "update" looks
irrelevant in context of Big data transformation.

If this use case is useful, let me know I'll go ahead and send a PR.
Any other tips will highly be appreciated.

thanks,

-- 
kapil


Re: Mini projects for spark novice

2015-10-04 Thread Ted Yu
See https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark

FYI

On Sun, Oct 4, 2015 at 7:06 AM, Rahul Jeevanandam 
wrote:

> I am currently learning Spark and I wanna solidify my knowledge on Spark,
> hence I wanna do some projects on it. Can you suggest me some nice project
> ideas to work on Spark?
>
> --
> Regards,
>
> *Rahul*
>


Re: Spark Streaming over YARN

2015-10-04 Thread nibiau
Hello,
I am using  https://github.com/dibbhatt/kafka-spark-consumer 
I specify 4 receivers in the ReceiverLauncher , but in YARN console I can see 
one node receiving the kafka flow.
(I use spark 1.3.1)

Tks
Nicolas


- Mail original -
De: "Dibyendu Bhattacharya" 
À: nib...@free.fr
Cc: "Cody Koeninger" , "user" 
Envoyé: Vendredi 2 Octobre 2015 18:21:35
Objet: Re: Spark Streaming over YARN


If your Kafka topic has 4 partitions , and if you specify 4 Receivers, messages 
from each partitions are received by a dedicated receiver. so your receiving 
parallelism is defined by your number of partitions of your topic . Every 
receiver task will be scheduled evenly among nodes in your cluster. There was a 
JIRA fixed in spark 1.5 which does even distribution of receivers. 





Now for RDD parallelism ( i.e parallelism while processing your RDD ) is 
controlled by your Block Interval and Batch Interval. 


If your block Interval is 200 Ms, there will be 5 blocks per second. If your 
Batch Interval is 3 seconds, there will 15 blocks per batch. And every Batch is 
one RDD , thus your RDD will be 15 partition , which will be honored during 
processing the RDD .. 




Regards, 
Dibyendu 




On Fri, Oct 2, 2015 at 9:40 PM, < nib...@free.fr > wrote: 


Ok so if I set for example 4 receivers (number of nodes), how RDD will be 
distributed over the nodes/core. 
For example in my example I have 4 nodes (with 2 cores) 

Tks 
Nicolas 


- Mail original - 
De: "Dibyendu Bhattacharya" < dibyendu.bhattach...@gmail.com > 
À: nib...@free.fr 
Cc: "Cody Koeninger" < c...@koeninger.org >, "user" < user@spark.apache.org > 
Envoyé: Vendredi 2 Octobre 2015 18:01:59 


Objet: Re: Spark Streaming over YARN 


Hi, 


If you need to use Receiver based approach , you can try this one : 
https://github.com/dibbhatt/kafka-spark-consumer 


This is also part of Spark packages : 
http://spark-packages.org/package/dibbhatt/kafka-spark-consumer 


You just need to specify the number of Receivers you want for desired 
parallelism while receiving , and rest of the thing will be taken care by 
ReceiverLauncher. 


This Low level Receiver will give better parallelism both on receiving , and on 
processing the RDD. 


Default Receiver based API ( KafkaUtils.createStream) using Kafka High level 
API and Kafka high Level API has serious issue to be used in production . 




Regards, 

Dibyendu 










On Fri, Oct 2, 2015 at 9:22 PM, < nib...@free.fr > wrote: 


>From my understanding as soon as I use YARN I don't need to use parrallelisme 
>(at least for RDD treatment) 
I don't want to use direct stream as I have to manage the offset positionning 
(in order to be able to start from the last offset treated after a spark job 
failure) 


- Mail original - 
De: "Cody Koeninger" < c...@koeninger.org > 
À: "Nicolas Biau" < nib...@free.fr > 
Cc: "user" < user@spark.apache.org > 
Envoyé: Vendredi 2 Octobre 2015 17:43:41 
Objet: Re: Spark Streaming over YARN 




If you're using the receiver based implementation, and want more parallelism, 
you have to create multiple streams and union them together. 


Or use the direct stream. 


On Fri, Oct 2, 2015 at 10:40 AM, < nib...@free.fr > wrote: 


Hello, 
I have a job receiving data from kafka (4 partitions) and persisting data 
inside MongoDB. 
It works fine, but when I deploy it inside YARN cluster (4 nodes with 2 cores) 
only on node is receiving all the kafka partitions and only one node is 
processing my RDD treatment (foreach function) 
How can I force YARN to use all the resources nodes and cores to process the 
data (receiver & RDD treatment) 

Tks a lot 
Nicolas 

- 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
For additional commands, e-mail: user-h...@spark.apache.org 



- 
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
For additional commands, e-mail: user-h...@spark.apache.org 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Limiting number of cores per job in multi-threaded driver.

2015-10-04 Thread Adrian Tanase
You are absolutely correct, I apologize.

My understanding was that you are sharing the machine across many jobs. That 
was the context in which I was making that comment.

-adrian

Sent from my iPhone

On 03 Oct 2015, at 07:03, Philip Weaver 
> wrote:

You can't really say 8 cores is not much horsepower when you have no idea what 
my use case is. That's silly.

On Fri, Sep 18, 2015 at 10:33 PM, Adrian Tanase 
> wrote:
Forgot to mention that you could also restrict the parallelism to 4, 
essentially using only 4 cores at any given time, however if your job is 
complex, a stage might be broken into more than 1 task...

Sent from my iPhone

On 19 Sep 2015, at 08:30, Adrian Tanase 
> wrote:

Reading through the docs it seems that with a combination of FAIR scheduler and 
maybe pools you can get pretty far.

However the smallest unit of scheduled work is the task so probably you need to 
think about the parallelism of each transformation.

I'm guessing that by increasing the level of parallelism you get many smaller 
tasks that the scheduler can then run across the many jobs you might have - as 
opposed to fewer, longer tasks...

Lastly, 8 cores is not that much horsepower :)
You may consider running with beefier machines or a larger cluster, to get at 
least tens of cores.

Hope this helps,
-adrian

Sent from my iPhone

On 18 Sep 2015, at 18:37, Philip Weaver 
> wrote:

Here's a specific example of what I want to do. My Spark application is running 
with total-executor-cores=8. A request comes in, it spawns a thread to handle 
that request, and starts a job. That job should use only 4 cores, not all 8 of 
the cores available to the cluster.. When the first job is scheduled, it should 
take only 4 cores, not all 8 of the cores that are available to the driver.

Is there any way to accomplish this? This is on mesos.

In order to support the use cases described in 
https://spark.apache.org/docs/latest/job-scheduling.html, where a spark 
application runs for a long time and handles requests from multiple users, I 
believe what I'm asking about is a very important feature. One of the goals is 
to get lower latency for each request, but if the first request takes all 
resources and we can't guarantee any free resources for the second request, 
then that defeats the purpose. Does that make sense?

Thanks in advance for any advice you can provide!

- Philip

On Sat, Sep 12, 2015 at 10:40 PM, Philip Weaver 
> wrote:
I'm playing around with dynamic allocation in spark-1.5.0, with the FAIR 
scheduler, so I can define a long-running application capable of executing 
multiple simultaneous spark jobs.

The kind of jobs that I'm running do not benefit from more than 4 cores, but I 
want my application to be able to take several times that in order to run 
multiple jobs at the same time.

I suppose my question is more basic: How can I limit the number of cores used 
to load an RDD or DataFrame? I can immediately repartition or coalesce my RDD 
or DataFrame to 4 partitions after I load it, but that doesn't stop Spark from 
using more cores to load it.

Does it make sense what I am trying to accomplish, and is there any way to do 
it?

- Philip





Re: how to broadcast huge lookup table?

2015-10-04 Thread Adrian Tanase
have a look at .transformWith, you can specify another RDD.

Sent from my iPhone

On 02 Oct 2015, at 21:50, 
"saif.a.ell...@wellsfargo.com" 
> wrote:

I tried broadcasting a key-value rdd, but then I cannot perform any rdd-actions 
inside a map/foreach function of another rdd.

any tips? If going into scala collections I end up with huge memory bottlenecks.

Saif



Re: Spark Streaming over YARN

2015-10-04 Thread Dibyendu Bhattacharya
How many partitions are there in your Kafka topic ?

Regards,
Dibyendu

On Sun, Oct 4, 2015 at 8:19 PM,  wrote:

> Hello,
> I am using  https://github.com/dibbhatt/kafka-spark-consumer
> I specify 4 receivers in the ReceiverLauncher , but in YARN console I can
> see one node receiving the kafka flow.
> (I use spark 1.3.1)
>
> Tks
> Nicolas
>
>
> - Mail original -
> De: "Dibyendu Bhattacharya" 
> À: nib...@free.fr
> Cc: "Cody Koeninger" , "user" 
> Envoyé: Vendredi 2 Octobre 2015 18:21:35
> Objet: Re: Spark Streaming over YARN
>
>
> If your Kafka topic has 4 partitions , and if you specify 4 Receivers,
> messages from each partitions are received by a dedicated receiver. so your
> receiving parallelism is defined by your number of partitions of your topic
> . Every receiver task will be scheduled evenly among nodes in your cluster.
> There was a JIRA fixed in spark 1.5 which does even distribution of
> receivers.
>
>
>
>
>
> Now for RDD parallelism ( i.e parallelism while processing your RDD ) is
> controlled by your Block Interval and Batch Interval.
>
>
> If your block Interval is 200 Ms, there will be 5 blocks per second. If
> your Batch Interval is 3 seconds, there will 15 blocks per batch. And every
> Batch is one RDD , thus your RDD will be 15 partition , which will be
> honored during processing the RDD ..
>
>
>
>
> Regards,
> Dibyendu
>
>
>
>
> On Fri, Oct 2, 2015 at 9:40 PM, < nib...@free.fr > wrote:
>
>
> Ok so if I set for example 4 receivers (number of nodes), how RDD will be
> distributed over the nodes/core.
> For example in my example I have 4 nodes (with 2 cores)
>
> Tks
> Nicolas
>
>
> - Mail original -
> De: "Dibyendu Bhattacharya" < dibyendu.bhattach...@gmail.com >
> À: nib...@free.fr
> Cc: "Cody Koeninger" < c...@koeninger.org >, "user" <
> user@spark.apache.org >
> Envoyé: Vendredi 2 Octobre 2015 18:01:59
>
>
> Objet: Re: Spark Streaming over YARN
>
>
> Hi,
>
>
> If you need to use Receiver based approach , you can try this one :
> https://github.com/dibbhatt/kafka-spark-consumer
>
>
> This is also part of Spark packages :
> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer
>
>
> You just need to specify the number of Receivers you want for desired
> parallelism while receiving , and rest of the thing will be taken care by
> ReceiverLauncher.
>
>
> This Low level Receiver will give better parallelism both on receiving ,
> and on processing the RDD.
>
>
> Default Receiver based API ( KafkaUtils.createStream) using Kafka High
> level API and Kafka high Level API has serious issue to be used in
> production .
>
>
>
>
> Regards,
>
> Dibyendu
>
>
>
>
>
>
>
>
>
>
> On Fri, Oct 2, 2015 at 9:22 PM, < nib...@free.fr > wrote:
>
>
> From my understanding as soon as I use YARN I don't need to use
> parrallelisme (at least for RDD treatment)
> I don't want to use direct stream as I have to manage the offset
> positionning (in order to be able to start from the last offset treated
> after a spark job failure)
>
>
> - Mail original -
> De: "Cody Koeninger" < c...@koeninger.org >
> À: "Nicolas Biau" < nib...@free.fr >
> Cc: "user" < user@spark.apache.org >
> Envoyé: Vendredi 2 Octobre 2015 17:43:41
> Objet: Re: Spark Streaming over YARN
>
>
>
>
> If you're using the receiver based implementation, and want more
> parallelism, you have to create multiple streams and union them together.
>
>
> Or use the direct stream.
>
>
> On Fri, Oct 2, 2015 at 10:40 AM, < nib...@free.fr > wrote:
>
>
> Hello,
> I have a job receiving data from kafka (4 partitions) and persisting data
> inside MongoDB.
> It works fine, but when I deploy it inside YARN cluster (4 nodes with 2
> cores) only on node is receiving all the kafka partitions and only one node
> is processing my RDD treatment (foreach function)
> How can I force YARN to use all the resources nodes and cores to process
> the data (receiver & RDD treatment)
>
> Tks a lot
> Nicolas
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>


Re: Limiting number of cores per job in multi-threaded driver.

2015-10-04 Thread Jerry Lam
Philip, the guy is trying to help you. Calling him silly is a bit too far. He 
might assume your problem is IO bound which might not be the case. If you need 
only 4 cores per job no matter what there is little advantage to use spark in 
my opinion because you can easily do this with just a worker farm that take the 
job and process it in a single machine. let the scheduler figures out which 
node in the farm is idled and spawns jobs on those until all of them are 
saturated. Call me silly but this seems much simpler.

Sent from my iPhone

> On 3 Oct, 2015, at 12:02 am, Philip Weaver  wrote:
> 
> You can't really say 8 cores is not much horsepower when you have no idea 
> what my use case is. That's silly.
> 
>> On Fri, Sep 18, 2015 at 10:33 PM, Adrian Tanase  wrote:
>> Forgot to mention that you could also restrict the parallelism to 4, 
>> essentially using only 4 cores at any given time, however if your job is 
>> complex, a stage might be broken into more than 1 task...
>> 
>> Sent from my iPhone
>> 
>> On 19 Sep 2015, at 08:30, Adrian Tanase  wrote:
>> 
>>> Reading through the docs it seems that with a combination of FAIR scheduler 
>>> and maybe pools you can get pretty far.
>>> 
>>> However the smallest unit of scheduled work is the task so probably you 
>>> need to think about the parallelism of each transformation.
>>> 
>>> I'm guessing that by increasing the level of parallelism you get many 
>>> smaller tasks that the scheduler can then run across the many jobs you 
>>> might have - as opposed to fewer, longer tasks...
>>> 
>>> Lastly, 8 cores is not that much horsepower :) 
>>> You may consider running with beefier machines or a larger cluster, to get 
>>> at least tens of cores.
>>> 
>>> Hope this helps,
>>> -adrian
>>> 
>>> Sent from my iPhone
>>> 
>>> On 18 Sep 2015, at 18:37, Philip Weaver  wrote:
>>> 
 Here's a specific example of what I want to do. My Spark application is 
 running with total-executor-cores=8. A request comes in, it spawns a 
 thread to handle that request, and starts a job. That job should use only 
 4 cores, not all 8 of the cores available to the cluster.. When the first 
 job is scheduled, it should take only 4 cores, not all 8 of the cores that 
 are available to the driver.
 
 Is there any way to accomplish this? This is on mesos.
 
 In order to support the use cases described in 
 https://spark.apache.org/docs/latest/job-scheduling.html, where a spark 
 application runs for a long time and handles requests from multiple users, 
 I believe what I'm asking about is a very important feature. One of the 
 goals is to get lower latency for each request, but if the first request 
 takes all resources and we can't guarantee any free resources for the 
 second request, then that defeats the purpose. Does that make sense?
 
 Thanks in advance for any advice you can provide!
 
 - Philip
 
> On Sat, Sep 12, 2015 at 10:40 PM, Philip Weaver  
> wrote:
> I'm playing around with dynamic allocation in spark-1.5.0, with the FAIR 
> scheduler, so I can define a long-running application capable of 
> executing multiple simultaneous spark jobs.
> 
> The kind of jobs that I'm running do not benefit from more than 4 cores, 
> but I want my application to be able to take several times that in order 
> to run multiple jobs at the same time.
> 
> I suppose my question is more basic: How can I limit the number of cores 
> used to load an RDD or DataFrame? I can immediately repartition or 
> coalesce my RDD or DataFrame to 4 partitions after I load it, but that 
> doesn't stop Spark from using more cores to load it.
> 
> Does it make sense what I am trying to accomplish, and is there any way 
> to do it?
> 
> - Philip
> 


Secondary Sorting in Spark

2015-10-04 Thread Bill Bejeck
I've written blog post on secondary sorting in Spark and I'd thought I'd
share it with the group

http://codingjunkie.net/spark-secondary-sort/

Thanks,
Bill


Re: Limiting number of cores per job in multi-threaded driver.

2015-10-04 Thread Philip Weaver
I believe I've described my use case clearly, and I'm being questioned that
it's legitimate. I will assert again that if you don't understand my use
case, it really doesn't make sense to make any statement about how many
resources I should need.

And I'm sorry, but I completely disagree with your logic. Your suggestion
is not simpler. The development effort that Spark saves is what you would
have to do to parallelize an algorithm from single-threaded to 4 cores. So
the big win comes from getting to 4 cores, not taking it from 4 to 128
(though that also is nice). Everything that Spark does I could do myself,
but it would take much longer. Keep in mind I'm not just trying to reduce
the level of effort for scheduling jobs, but also scheduling the tasks
within each job, and those are both something that Spark does really well.

- Philip


On Sun, Oct 4, 2015 at 10:57 AM, Jerry Lam  wrote:

> Philip, the guy is trying to help you. Calling him silly is a bit too far.
> He might assume your problem is IO bound which might not be the case. If
> you need only 4 cores per job no matter what there is little advantage to
> use spark in my opinion because you can easily do this with just a worker
> farm that take the job and process it in a single machine. let the
> scheduler figures out which node in the farm is idled and spawns jobs on
> those until all of them are saturated. Call me silly but this seems much
> simpler.
>
> Sent from my iPhone
>
> On 3 Oct, 2015, at 12:02 am, Philip Weaver 
> wrote:
>
> You can't really say 8 cores is not much horsepower when you have no idea
> what my use case is. That's silly.
>
> On Fri, Sep 18, 2015 at 10:33 PM, Adrian Tanase  wrote:
>
>> Forgot to mention that you could also restrict the parallelism to 4,
>> essentially using only 4 cores at any given time, however if your job is
>> complex, a stage might be broken into more than 1 task...
>>
>> Sent from my iPhone
>>
>> On 19 Sep 2015, at 08:30, Adrian Tanase  wrote:
>>
>> Reading through the docs it seems that with a combination of FAIR
>> scheduler and maybe pools you can get pretty far.
>>
>> However the smallest unit of scheduled work is the task so probably you
>> need to think about the parallelism of each transformation.
>>
>> I'm guessing that by increasing the level of parallelism you get many
>> smaller tasks that the scheduler can then run across the many jobs you
>> might have - as opposed to fewer, longer tasks...
>>
>> Lastly, 8 cores is not that much horsepower :)
>> You may consider running with beefier machines or a larger cluster, to
>> get at least tens of cores.
>>
>> Hope this helps,
>> -adrian
>>
>> Sent from my iPhone
>>
>> On 18 Sep 2015, at 18:37, Philip Weaver  wrote:
>>
>> Here's a specific example of what I want to do. My Spark application is
>> running with total-executor-cores=8. A request comes in, it spawns a thread
>> to handle that request, and starts a job. That job should use only 4 cores,
>> not all 8 of the cores available to the cluster.. When the first job is
>> scheduled, it should take only 4 cores, not all 8 of the cores that are
>> available to the driver.
>>
>> Is there any way to accomplish this? This is on mesos.
>>
>> In order to support the use cases described in
>> https://spark.apache.org/docs/latest/job-scheduling.html, where a spark
>> application runs for a long time and handles requests from multiple users,
>> I believe what I'm asking about is a very important feature. One of the
>> goals is to get lower latency for each request, but if the first request
>> takes all resources and we can't guarantee any free resources for the
>> second request, then that defeats the purpose. Does that make sense?
>>
>> Thanks in advance for any advice you can provide!
>>
>> - Philip
>>
>> On Sat, Sep 12, 2015 at 10:40 PM, Philip Weaver 
>> wrote:
>>
>>> I'm playing around with dynamic allocation in spark-1.5.0, with the FAIR
>>> scheduler, so I can define a long-running application capable of executing
>>> multiple simultaneous spark jobs.
>>>
>>> The kind of jobs that I'm running do not benefit from more than 4 cores,
>>> but I want my application to be able to take several times that in order to
>>> run multiple jobs at the same time.
>>>
>>> I suppose my question is more basic: How can I limit the number of cores
>>> used to load an RDD or DataFrame? I can immediately repartition or coalesce
>>> my RDD or DataFrame to 4 partitions after I load it, but that doesn't stop
>>> Spark from using more cores to load it.
>>>
>>> Does it make sense what I am trying to accomplish, and is there any way
>>> to do it?
>>>
>>> - Philip
>>>
>>>
>>
>


Re: Limiting number of cores per job in multi-threaded driver.

2015-10-04 Thread Philip Weaver
Since I'm running Spark on Mesos, to be fair I should give Mesos credit,
too! And I should also put some effort into describing what I'm trying to
accomplish of more clearly. There are really three levels of scheduling
that I'm hoping to exploit:

- Scheduling in Mesos across all frameworks, where the particular type of
Spark job that I previously described is only one of many types of
frameworks.
- Scheduling of multiple jobs (maybe that's not the right terminology?)
within the same SparkContext; this SparkContext would run in a persistent
application with an API for users to submit jobs.
- Scheduling of individual tasks with a single user submitted Spark job.

Through some brief testing, I've found that the performance of my jobs
scales almost linearly until about 8 cores, and after that the gains are
very small or sometimes even negative. These are small jobs, that typically
count uniques across only about 40M rows.

This setup works very well, with one exception: when a user submits a job,
if there are no others running, then that job will take all of the cores
that the SparkContext has available to it. This is undesirable for two
reasons:
1.) As I mentioned above, the jobs don't scale beyond about 8 cores.
2.) The next submitted job will have to wait for resources to become
available.

- Philip


On Sun, Oct 4, 2015 at 2:33 PM, Philip Weaver 
wrote:

> I believe I've described my use case clearly, and I'm being questioned
> that it's legitimate. I will assert again that if you don't understand my
> use case, it really doesn't make sense to make any statement about how many
> resources I should need.
>
> And I'm sorry, but I completely disagree with your logic. Your suggestion
> is not simpler. The development effort that Spark saves is what you would
> have to do to parallelize an algorithm from single-threaded to 4 cores. So
> the big win comes from getting to 4 cores, not taking it from 4 to 128
> (though that also is nice). Everything that Spark does I could do myself,
> but it would take much longer. Keep in mind I'm not just trying to reduce
> the level of effort for scheduling jobs, but also scheduling the tasks
> within each job, and those are both something that Spark does really well.
>
> - Philip
>
>
> On Sun, Oct 4, 2015 at 10:57 AM, Jerry Lam  wrote:
>
>> Philip, the guy is trying to help you. Calling him silly is a bit too
>> far. He might assume your problem is IO bound which might not be the case.
>> If you need only 4 cores per job no matter what there is little advantage
>> to use spark in my opinion because you can easily do this with just a
>> worker farm that take the job and process it in a single machine. let the
>> scheduler figures out which node in the farm is idled and spawns jobs on
>> those until all of them are saturated. Call me silly but this seems much
>> simpler.
>>
>> Sent from my iPhone
>>
>> On 3 Oct, 2015, at 12:02 am, Philip Weaver 
>> wrote:
>>
>> You can't really say 8 cores is not much horsepower when you have no idea
>> what my use case is. That's silly.
>>
>> On Fri, Sep 18, 2015 at 10:33 PM, Adrian Tanase 
>> wrote:
>>
>>> Forgot to mention that you could also restrict the parallelism to 4,
>>> essentially using only 4 cores at any given time, however if your job is
>>> complex, a stage might be broken into more than 1 task...
>>>
>>> Sent from my iPhone
>>>
>>> On 19 Sep 2015, at 08:30, Adrian Tanase  wrote:
>>>
>>> Reading through the docs it seems that with a combination of FAIR
>>> scheduler and maybe pools you can get pretty far.
>>>
>>> However the smallest unit of scheduled work is the task so probably you
>>> need to think about the parallelism of each transformation.
>>>
>>> I'm guessing that by increasing the level of parallelism you get many
>>> smaller tasks that the scheduler can then run across the many jobs you
>>> might have - as opposed to fewer, longer tasks...
>>>
>>> Lastly, 8 cores is not that much horsepower :)
>>> You may consider running with beefier machines or a larger cluster, to
>>> get at least tens of cores.
>>>
>>> Hope this helps,
>>> -adrian
>>>
>>> Sent from my iPhone
>>>
>>> On 18 Sep 2015, at 18:37, Philip Weaver  wrote:
>>>
>>> Here's a specific example of what I want to do. My Spark application is
>>> running with total-executor-cores=8. A request comes in, it spawns a thread
>>> to handle that request, and starts a job. That job should use only 4 cores,
>>> not all 8 of the cores available to the cluster.. When the first job is
>>> scheduled, it should take only 4 cores, not all 8 of the cores that are
>>> available to the driver.
>>>
>>> Is there any way to accomplish this? This is on mesos.
>>>
>>> In order to support the use cases described in
>>> https://spark.apache.org/docs/latest/job-scheduling.html, where a spark
>>> application runs for a long time and handles requests from 

spark-ec2 config files.

2015-10-04 Thread Renato Perini

Can someone provide the relevant config files generated by Spark EC2 script?
I'm configuring a Spark cluster on EC2 manually, and I would like to 
compare my config files (spark-defaults.conf, spark-env.sh) with those 
generated by the spark-ec2 script.

Of course, hide your sensitive informations.

Thank you.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Usage of transform for code reuse between Streaming and Batch job affects the performance ?

2015-10-04 Thread swetha
Hi,

I have the following code for code reuse between the batch and the streaming
job

*  val groupedAndSortedSessions =
sessions.transform(rdd=>JobCommon.getGroupedAndSortedSessions(rdd))*

The same code without code reuse between the batch and the streaming has the
following. 

* val groupedSessions = sessions.groupByKey();

val sortedSessions  = groupedSessions.mapValues[(List[(Long,
String)])](iter => iter.toList.sortBy(_._1))
*

Does use of transform for code reuse affect groupByKey performance?


Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Usage-of-transform-for-code-reuse-between-Streaming-and-Batch-job-affects-the-performance-tp24920.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Scala Limitation - Case Class definition with more than 22 arguments

2015-10-04 Thread satish chandra j
Hi Petr,
Could you please let me know if I am missing anything on the code as my
code is as same as snippet shared by you but still i am getting the below
error:

*error type mismatch: found String required: Serializable*

Please let me know if any fix to be applied on this

Regards,
Satish Chandra

On Sat, Oct 3, 2015 at 9:31 AM, satish chandra j 
wrote:

> Hi,
> I am getting the below error while implementing the above custom class
> code given by you
>
> error type mismatch: found String required: Serializable
>
> Please let me know if i am missing anything here
>
> Regards,
> Satish Chandra
>
> On Wed, Sep 23, 2015 at 12:34 PM, Petr Novak  wrote:
>
>> You can implement your own case class supporting more then 22 fields. It
>> is something like:
>>
>> class MyRecord(val val1: String, val val2: String, ... more then 22, in this 
>> case f.e. 26)
>>   extends Product with Serializable {
>>
>>   def canEqual(that: Any): Boolean = that.isInstanceOf[MyRecord]
>>
>>   def productArity: Int = 26 // example value, it is amount of arguments
>>
>>   def productElement(n: Int): Serializable = n match {
>> case  1 => val1
>> case  2 => val2
>> //... cases up to 26
>>   }
>> }
>>
>> You can google it for more details.
>>
>> Petr
>>
>
>