Re: SparkSQL HiveContext No Suitable Driver / Cannot Find Driver

2014-08-30 Thread Denny Lee
Oh, forgot to add the managed libraries and the Hive libraries within the 
CLASSPATH.  As soon as I did that, we’re good to go now.



On August 29, 2014 at 22:55:47, Denny Lee (denny.g@gmail.com) wrote:

My issue is similar to the issue as noted 
http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccadoad2ks9_qgeign5-w7xogmrotrlbchvfukctgstj5qp9q...@mail.gmail.com%3E.

Currently using Spark-1.1 (grabbed from git two days ago) and using Hive 0.12 
with my metastore in MySQL.  If I run any HiveContext statements, it results in 
cannot find the driver in CLASSPATH error.  If I include it via —jars then it 
gives me the error “no suitable driver”.

Any ideas on how to get the Hive context to work here?

Thanks!
Denny



spark-ec2 [Errno 110] Connection time out

2014-08-30 Thread David Matheson
I'm following the latest documentation on configuring a cluster on ec2
(http://spark.apache.org/docs/latest/ec2-scripts.html).  Running 
 ./spark-ec2 -k Blah -i .ssh/Blah.pem -s 2 launch spark-ec2-test
gets a generic timeout error that's coming from
  File ./spark_ec2.py, line 717, in real_main
conn = ec2.connect_to_region(opts.region)

Any suggestions on how to debug the cause of the timeout? 

Note: I replaced the name of my keypair with Blah.

Thanks,
David




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-ec2-Errno-110-Connection-time-out-tp13171.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark on yarn with hive

2014-08-30 Thread centerqi hu
I want to let hive run on spark and yarn clusters,Hive Metastore is stored
in MySQL

I compiled spark code:
sh make-distribution.sh --hadoop 2.4.1 --with-yarn --skpi-java-test --tgz
--with-hive

My HQL code:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql._
import org.apache.spark.sql.hive.LocalHiveContext

object HqlTest {
  case class Record(key: Int, value: String)
  def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName(HiveFromSpark)
val sc = new SparkContext(sparkConf)

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext._
hiveContext.hql(FROM tmp_adclick_udc select
uid).collect().foreach(println)
  }
}

I submitted the job code:
/usr/local/webserver/sparkhive/bin/spark-submit --class HqlTest --master
yarn --deploy-mode cluster --queue sls_queue_1 --num-executors 5
--driver-memory 6g --executor-memory 20g --executor-cores 5
target/scala-2.10/simple-project_2.10-1.0.jar
/user/www/udc/output/2014-08-10/*
/user/www/udc/input/platformuvpv2/2014-08-10

I put hive-site.xml into spark/conf/.

But, I get the following error:

14/08/30 16:30:49 INFO Datastore: The class
org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as
embedded-only so does not have its own datastore table.
14/08/30 16:30:49 INFO Datastore: The class
org.apache.hadoop.hive.metastore.model.MOrder is tagged as
embedded-only so does not have its own datastore table.
14/08/30 16:30:54 ERROR Hive:
NoSuchObjectException(message:default.tmp_adclick_udc table not found)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1373)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:103)
at com.sun.proxy.$Proxy26.get_table(Unknown Source)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:854)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

thanks
-- 
cente...@gmail.com|齐忠


Re: Low Level Kafka Consumer for Spark

2014-08-30 Thread Sean Owen
I'm no expert. But as I understand, yes you create multiple streams to
consume multiple partitions in parallel. If they're all in the same
Kafka consumer group, you'll get exactly one copy of the message so
yes if you have 10 consumers and 3 Kafka partitions I believe only 3
will be getting messages.

The parallelism of Spark's processing of the RDDs of those messages is
different. There could be 4 partitions in your RDDs doing the work.
This is the kind of thing you potentially influence with repartition.
That is I believe you can get more tasks processing the messages even
if you are only able to consume messages from the queue with 3-way
parallelism, since the queue has 3 partitions.

On Aug 30, 2014 12:56 AM, Tim Smith secs...@gmail.com wrote:

 Ok, so I did this:
 val kInStreams = (1 to 10).map{_ = 
 KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct
  - 1)) }
 val kInMsg = ssc.union(kInStreams)
 val outdata = kInMsg.map(x=normalizeLog(x._2,configMap))

 This has improved parallelism. Earlier I would only get a Stream 0. Now I 
 have Streams [0-9]. Of course, since the kafka topic has only three 
 partitions, only three of those streams are active but I am seeing more 
 blocks being pulled across the three streams total that what one was doing 
 earlier. Also, four nodes are actively processing tasks (vs only two earlier) 
 now which actually has me confused. If Streams are active only on 3 nodes 
 then how/why did a 4th node get work? If a 4th got work why aren't more nodes 
 getting work?


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Hive max key length is 767 bytes

2014-08-30 Thread arthur.hk.c...@gmail.com
Hi Michael,

Thank you so much!!

I have tried to change the following key length from 256 to 255 and from 767 to 
766, it still didn’t work
alter table COLUMNS_V2 modify column COMMENT VARCHAR(255);
alter table INDEX_PARAMS modify column PARAM_KEY VARCHAR(255);
alter table SD_PARAMS modify column PARAM_KEY VARCHAR(255);
alter table SERDE_PARAMS modify column PARAM_KEY VARCHAR(255);
alter table TABLE_PARAMS modify column PARAM_KEY VARCHAR(255);
alter table TBLS modify column OWNER VARCHAR(766);
alter table PART_COL_STATS modify column PARTITION_NAME VARCHAR(766);
alter table PARTITION_KEYS modify column PKEY_TYPE VARCHAR(766);
alter table PARTITIONS modify column PART_NAME VARCHAR(766);

I use Hadoop 2.4.1 HBase 0.98.5 Hive 0.13, trying Spark 1.0.2 and Shark 0.9.2, 
and JDK1.6_45.

Some questions:
shark-0.9.2 is based on which Hive version?  is HBase 0.98.x OK? is Hive 0.13.1 
OK? and which Java?  (I use JDK1.6 at the moment, it seems not working)
spark-1.0.2 is based on which Hive version?  is HBase 0.98.x OK?  

Regards
Arthur 


On 30 Aug, 2014, at 1:40 am, Michael Armbrust mich...@databricks.com wrote:

 Spark SQL is based on Hive 12.  They must have changed the maximum key size 
 between 12 and 13.
 
 
 On Fri, Aug 29, 2014 at 4:38 AM, arthur.hk.c...@gmail.com 
 arthur.hk.c...@gmail.com wrote:
 Hi,
 
 
 Tried the same thing in HIVE directly without issue:
 
 HIVE:
 hive create table test_datatype2 (testbigint bigint );
 OK
 Time taken: 0.708 seconds
 
 hive drop table test_datatype2;
 OK
 Time taken: 23.272 seconds
 
 
 
 Then tried again in SPARK:
 scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
 14/08/29 19:33:52 INFO Configuration.deprecation: 
 mapred.reduce.tasks.speculative.execution is deprecated. Instead, use 
 mapreduce.reduce.speculative
 hiveContext: org.apache.spark.sql.hive.HiveContext = 
 org.apache.spark.sql.hive.HiveContext@395c7b94
 
 scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
 res0: org.apache.spark.sql.SchemaRDD = 
 SchemaRDD[0] at RDD at SchemaRDD.scala:104
 == Query Plan ==
 Native command: executed by Hive
 
 scala hiveContext.hql(drop table test_datatype3)
 
 14/08/29 19:34:14 ERROR DataNucleus.Datastore: An exception was thrown while 
 adding/validating class(es) : Specified key was too long; max key length is 
 767 bytes
 com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was 
 too long; max key length is 767 bytes
   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
   at 
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
 
 14/08/29 19:34:17 WARN DataNucleus.Query: Query for candidates of 
 org.apache.hadoop.hive.metastore.model.MPartition and subclasses resulted in 
 no possible candidates
 Error(s) were found while auto-creating/validating the datastore for classes. 
 The errors are printed in the log, and are attached to this exception.
 org.datanucleus.exceptions.NucleusDataStoreException: Error(s) were found 
 while auto-creating/validating the datastore for classes. The errors are 
 printed in the log, and are attached to this exception.
   at 
 org.datanucleus.store.rdbms.RDBMSStoreManager$ClassAdder.verifyErrors(RDBMSStoreManager.java:3609)
 
 
 Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: 
 Specified key was too long; max key length is 767 bytes
   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
   at 
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
 
 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
 org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as 
 embedded-only so does not have its own datastore table.
 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
 org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only 
 so does not have its own datastore table.
 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
 org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as 
 embedded-only so does not have its own datastore table.
 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
 org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only 
 so does not have its own datastore table.
 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
 org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as 
 embedded-only so does not have its own datastore table.
 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
 org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only 
 so does not have its own datastore table.
 14/08/29 19:34:25 ERROR DataNucleus.Datastore: An exception was thrown while 
 adding/validating class(es) : Specified key was too long; max key length is 
 767 bytes
 com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was 
 too long; max key length is 767 bytes
   at 

Re: org.apache.spark.examples.xxx

2014-08-30 Thread Akhil Das
It bundles all these src's
https://github.com/apache/spark/tree/master/examples together and also it
uses the pom file to get the dependencies list if I'm not wrong.

Thanks
Best Regards


On Fri, Aug 29, 2014 at 12:39 AM, filipus floe...@gmail.com wrote:

 hey guys

 i still try to get used to compile and run the example code

 why does the run_example code submit the class with an
 org.apache.spark.examples in front of the class itself?

 probably a stupid question but i would be glad some one of you explains

 by the way.. how was the spark...example...jar file build?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-examples-xxx-tp13052.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: org.apache.spark.examples.xxx

2014-08-30 Thread Ted Yu
bq. how was the spark...example...jar file build?

You can use the following command to build against hadoop 2.4:

mvn -Phadoop-2.4,yarn -Dhadoop.version=2.4.1 -DskipTests clean package

examples jar can be found under examples/target

Cheers


On Sat, Aug 30, 2014 at 6:54 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 It bundles all these src's
 https://github.com/apache/spark/tree/master/examples together and also it
 uses the pom file to get the dependencies list if I'm not wrong.

 Thanks
 Best Regards


 On Fri, Aug 29, 2014 at 12:39 AM, filipus floe...@gmail.com wrote:

 hey guys

 i still try to get used to compile and run the example code

 why does the run_example code submit the class with an
 org.apache.spark.examples in front of the class itself?

 probably a stupid question but i would be glad some one of you explains

 by the way.. how was the spark...example...jar file build?



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-examples-xxx-tp13052.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: org.apache.spark.examples.xxx

2014-08-30 Thread filipus
i try to get use to sbt in order to build stnd allone application by myself

the example SimpleApp i managed to run

than i tried to copy some example scala program like LinearRegression in a
local directory

.
./build.sbt
./src
./src/main
./src/main/scala
./src/main/scala/LinearRegression.scala

my build.sbt even when I dont know what I do looks like

name := Linear Regression

version := 1.0

scalaVersion := 2.10.4

libraryDependencies += org.apache.spark %% spark-core % 1.0.2

libraryDependencies += org.apache.spark %% spark-mllib % 1.0.2

libraryDependencies += com.github.scopt %% scopt % 3.2.0

resolvers += Akka Repository at http://repo.akka.io/releases/;

By the way... first i tried scalaVersion := 2.11.2 which is my installed
version. but this faild

...

sbt package builds a jar file in target but the command

spark-submit --class LinearRegression --master local[2]
target/scala-2.10/linear-regression_2.10-1.0.jar
~/git/spark/data/mllib/sample_linear_regression_data.txt

didnt work. it tells me

Spark assembly has been built with Hive, including Datanucleus jars on
classpath
Exception in thread main java.lang.NoClassDefFoundError:
scopt/OptionParser
at java.lang.Class.getDeclaredMethods0(Native Method)

AHHH: I comented /*package org.apache.spark.examples.mllib*/ in
LinearRegression.scala because otherwise it doesnt find the main class
Exception in thread main java.lang.ClassNotFoundException:
LinearRegression

when I does the same with the pre build jar package of examples everything
works fine

spark-submit --class  org.apache.spark.examples.mllib.LinearRegression
--master local[2] lib/spark-examples-1.0.2-hadoop2.2.0.jar
~/git/spark/data/mllib/sample_linear_regression_data.txt

works !!! 

spark-submit --class  org.apache.spark.examples.mllib.LinearRegression
--master local[2] lib/spark-examples-1.0.2-hadoop2.2.0.jar
~/git/spark/data/mllib/sample_linear_regression_data.txt



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-examples-xxx-tp13052p13178.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Mapping Hadoop Reduce to Spark

2014-08-30 Thread Steve Lewis
When programming in Hadoop it is possible to guarantee
1) All keys sent to a specific partition will be handled by the same
machine (thread)
2) All keys received by a specific machine (thread) will be received in
sorted order
3) These conditions will hold even if the values associated with a specific
key are too large enough to fit in memory.

In my Hadoop code I use all of these conditions - specifically with my
larger data sets the size of data I wish to group exceeds the available
memory.

I think I understand the operation of groupby but my understanding is that
this requires that the results for a single key, and perhaps all keys fit
on a single machine.

Is there away to perform like Hadoop ad not require that an entire group
fir in memory?


Re: org.apache.spark.examples.xxx

2014-08-30 Thread filipus
compilation works but execution not at least with spark-submit as I described
above

when I make a local copy of the training set I can execute sbt run file
which works

sbt run sample_linear_regression_data.txt

when I do

sbt run ~/git/spark/data/mllib/sample_linear_regression_data.txt

the program fails because it doesnt find any traning set at

[error] (run-main-0) org.apache.hadoop.mapred.InvalidInputException: Input
path does not exist:
file:*/home/filip/spark-ex-regression/*~/git/spark/data/mllib/sample_linear_regression_data.txt
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:
file:/home/filip/spark-ex-regression/~/git/spark/data/mllib/sample_linear_regression_data.txt

ps: does anybody knows where in the program LinearRegression.scala it
specifies the PATH or has it to do with sbt???



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-examples-xxx-tp13052p13180.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: org.apache.spark.examples.xxx

2014-08-30 Thread Ted Yu
Did you run sbt under /home/filip/spark-ex-regression ?

'~/git/spark/data/mllib/sample_linear_regression_data.txt' was interpreted
as rooted under /home/filip/spark-ex-regression

Cheers


On Sat, Aug 30, 2014 at 9:28 AM, filipus floe...@gmail.com wrote:

 compilation works but execution not at least with spark-submit as I
 described
 above

 when I make a local copy of the training set I can execute sbt run file
 which works

 sbt run sample_linear_regression_data.txt

 when I do

 sbt run ~/git/spark/data/mllib/sample_linear_regression_data.txt

 the program fails because it doesnt find any traning set at

 [error] (run-main-0) org.apache.hadoop.mapred.InvalidInputException: Input
 path does not exist:

 file:*/home/filip/spark-ex-regression/*~/git/spark/data/mllib/sample_linear_regression_data.txt
 org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:

 file:/home/filip/spark-ex-regression/~/git/spark/data/mllib/sample_linear_regression_data.txt

 ps: does anybody knows where in the program LinearRegression.scala it
 specifies the PATH or has it to do with sbt???



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-examples-xxx-tp13052p13180.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: org.apache.spark.examples.xxx

2014-08-30 Thread filipus
ok I see :-)

.. instead of ~ works fine so

do you know the reason

sbt run [options] works 

after sbt package 

but 

spark-submit --class ClassName --master local[2]
target/scala/JarPackage.jar [options]

doesnt?

it cannot resolve everything somehow



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-examples-xxx-tp13052p13182.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Mapping Hadoop Reduce to Spark

2014-08-30 Thread Matei Zaharia
In 1.1, you'll be able to get all of these properties using sortByKey, and then 
mapPartitions on top to iterate through the key-value pairs. Unfortunately 
sortByKey does not let you control the Partitioner, but it's fairly easy to 
write your own version that does if this is important.

In previous versions, the values for each key had to fit in memory (though we 
could have data on disk across keys), and this is still true for groupByKey, 
cogroup and join. Those restrictions will hopefully go away in a later release. 
But sortByKey + mapPartitions lets you just iterate through the key-value pairs 
without worrying about this.

Matei

On August 30, 2014 at 9:04:37 AM, Steve Lewis (lordjoe2...@gmail.com) wrote:

When programming in Hadoop it is possible to guarantee
1) All keys sent to a specific partition will be handled by the same machine 
(thread)
2) All keys received by a specific machine (thread) will be received in sorted 
order
3) These conditions will hold even if the values associated with a specific key 
are too large enough to fit in memory.

In my Hadoop code I use all of these conditions - specifically with my larger 
data sets the size of data I wish to group exceeds the available memory.

I think I understand the operation of groupby but my understanding is that this 
requires that the results for a single key, and perhaps all keys fit on a 
single machine.

Is there away to perform like Hadoop ad not require that an entire group fir in 
memory?



Re: Spark Hive max key length is 767 bytes

2014-08-30 Thread Denny Lee
Oh, you may be running into an issue with your MySQL setup actually, try running

alter database metastore_db character set latin1

so that way Hive (and the Spark HiveContext) can execute properly against the 
metastore.


On August 29, 2014 at 04:39:01, arthur.hk.c...@gmail.com 
(arthur.hk.c...@gmail.com) wrote:

Hi,


Tried the same thing in HIVE directly without issue:

HIVE:
hive create table test_datatype2 (testbigint bigint );
OK
Time taken: 0.708 seconds

hive drop table test_datatype2;
OK
Time taken: 23.272 seconds



Then tried again in SPARK:
scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
14/08/29 19:33:52 INFO Configuration.deprecation: 
mapred.reduce.tasks.speculative.execution is deprecated. Instead, use 
mapreduce.reduce.speculative
hiveContext: org.apache.spark.sql.hive.HiveContext = 
org.apache.spark.sql.hive.HiveContext@395c7b94

scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
res0: org.apache.spark.sql.SchemaRDD = 
SchemaRDD[0] at RDD at SchemaRDD.scala:104
== Query Plan ==
Native command: executed by Hive

scala hiveContext.hql(drop table test_datatype3)

14/08/29 19:34:14 ERROR DataNucleus.Datastore: An exception was thrown while 
adding/validating class(es) : Specified key was too long; max key length is 767 
bytes
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was 
too long; max key length is 767 bytes
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)

14/08/29 19:34:17 WARN DataNucleus.Query: Query for candidates of 
org.apache.hadoop.hive.metastore.model.MPartition and subclasses resulted in no 
possible candidates
Error(s) were found while auto-creating/validating the datastore for classes. 
The errors are printed in the log, and are attached to this exception.
org.datanucleus.exceptions.NucleusDataStoreException: Error(s) were found while 
auto-creating/validating the datastore for classes. The errors are printed in 
the log, and are attached to this exception.
at 
org.datanucleus.store.rdbms.RDBMSStoreManager$ClassAdder.verifyErrors(RDBMSStoreManager.java:3609)


Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified 
key was too long; max key length is 767 bytes
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)

14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as 
embedded-only so does not have its own datastore table.
14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so 
does not have its own datastore table.
14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as 
embedded-only so does not have its own datastore table.
14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so 
does not have its own datastore table.
14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as 
embedded-only so does not have its own datastore table.
14/08/29 19:34:17 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so 
does not have its own datastore table.
14/08/29 19:34:25 ERROR DataNucleus.Datastore: An exception was thrown while 
adding/validating class(es) : Specified key was too long; max key length is 767 
bytes
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was 
too long; max key length is 767 bytes
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)


Can anyone please help?

Regards
Arthur


On 29 Aug, 2014, at 12:47 pm, arthur.hk.c...@gmail.com 
arthur.hk.c...@gmail.com wrote:

(Please ignore if duplicated) 


Hi,

I use Spark 1.0.2 with Hive 0.13.1

I have already set the hive mysql database to latine1; 

mysql:
alter database hive character set latin1;

Spark:
scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
scala hiveContext.hql(create table test_datatype1 (testbigint bigint ))
scala hiveContext.hql(drop table test_datatype1)


14/08/29 12:31:55 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as 
embedded-only so does not have its own datastore table.
14/08/29 12:31:55 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so 
does not have its own datastore table.
14/08/29 12:31:55 INFO DataNucleus.Datastore: The class 
org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as 
embedded-only so does not have its own datastore table.
14/08/29 12:31:55 INFO 

Re: Spark Hive max key length is 767 bytes

2014-08-30 Thread arthur.hk.c...@gmail.com
Hi,

Already done but still get the same error:

(I use HIVE 0.13.1 Spark 1.0.2, Hadoop 2.4.1)

Steps:
Step 1) mysql:
 
 alter database hive character set latin1;
Step 2) HIVE:
 hive create table test_datatype2 (testbigint bigint );
 OK
 Time taken: 0.708 seconds
 
 hive drop table test_datatype2;
 OK
 Time taken: 23.272 seconds
Step 3) scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
 14/08/29 19:33:52 INFO Configuration.deprecation: 
 mapred.reduce.tasks.speculative.execution is deprecated. Instead, use 
 mapreduce.reduce.speculative
 hiveContext: org.apache.spark.sql.hive.HiveContext = 
 org.apache.spark.sql.hive.HiveContext@395c7b94
 scala hiveContext.hql(“create table test_datatype3 (testbigint bigint)”)
 res0: org.apache.spark.sql.SchemaRDD = 
 SchemaRDD[0] at RDD at SchemaRDD.scala:104
 == Query Plan ==
 Native command: executed by Hive
 scala hiveContext.hql(drop table test_datatype3)
 
 14/08/29 19:34:14 ERROR DataNucleus.Datastore: An exception was thrown while 
 adding/validating class(es) : Specified key was too long; max key length is 
 767 bytes
 com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was 
 too long; max key length is 767 bytes
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at 
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
 
 14/08/29 19:34:17 WARN DataNucleus.Query: Query for candidates of 
 org.apache.hadoop.hive.metastore.model.MPartition and subclasses resulted in 
 no possible candidates
 Error(s) were found while auto-creating/validating the datastore for 
 classes. The errors are printed in the log, and are attached to this 
 exception.
 org.datanucleus.exceptions.NucleusDataStoreException: Error(s) were found 
 while auto-creating/validating the datastore for classes. The errors are 
 printed in the log, and are attached to this exception.
 at 
 org.datanucleus.store.rdbms.RDBMSStoreManager$ClassAdder.verifyErrors(RDBMSStoreManager.java:3609)
 
 
 Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: 
 Specified key was too long; max key length is 767 bytes
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)



Should I use HIVE 0.12.0 instead of HIVE 0.13.1?

Regards
Arthur

On 31 Aug, 2014, at 6:01 am, Denny Lee denny.g@gmail.com wrote:

 Oh, you may be running into an issue with your MySQL setup actually, try 
 running
 
 alter database metastore_db character set latin1
 
 so that way Hive (and the Spark HiveContext) can execute properly against the 
 metastore.
 
 
 On August 29, 2014 at 04:39:01, arthur.hk.c...@gmail.com 
 (arthur.hk.c...@gmail.com) wrote:
 
 Hi,
 
 
 Tried the same thing in HIVE directly without issue:
 
 HIVE:
 hive create table test_datatype2 (testbigint bigint );
 OK
 Time taken: 0.708 seconds
 
 hive drop table test_datatype2;
 OK
 Time taken: 23.272 seconds
 
 
 
 Then tried again in SPARK:
 scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
 14/08/29 19:33:52 INFO Configuration.deprecation: 
 mapred.reduce.tasks.speculative.execution is deprecated. Instead, use 
 mapreduce.reduce.speculative
 hiveContext: org.apache.spark.sql.hive.HiveContext = 
 org.apache.spark.sql.hive.HiveContext@395c7b94
 
 scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
 res0: org.apache.spark.sql.SchemaRDD = 
 SchemaRDD[0] at RDD at SchemaRDD.scala:104
 == Query Plan ==
 Native command: executed by Hive
 
 scala hiveContext.hql(drop table test_datatype3)
 
 14/08/29 19:34:14 ERROR DataNucleus.Datastore: An exception was thrown while 
 adding/validating class(es) : Specified key was too long; max key length is 
 767 bytes
 com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was 
 too long; max key length is 767 bytes
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at 
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
 
 14/08/29 19:34:17 WARN DataNucleus.Query: Query for candidates of 
 org.apache.hadoop.hive.metastore.model.MPartition and subclasses resulted in 
 no possible candidates
 Error(s) were found while auto-creating/validating the datastore for 
 classes. The errors are printed in the log, and are attached to this 
 exception.
 org.datanucleus.exceptions.NucleusDataStoreException: Error(s) were found 
 while auto-creating/validating the datastore for classes. The errors are 
 printed in the log, and are attached to this exception.
 at 
 org.datanucleus.store.rdbms.RDBMSStoreManager$ClassAdder.verifyErrors(RDBMSStoreManager.java:3609)
 
 
 Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: 
 Specified key was too long; max key length is 767 bytes
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at 
 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
 
 14/08/29 19:34:17 INFO 

Spark Master/Slave and HA

2014-08-30 Thread arthur.hk.c...@gmail.com
Hi,

I have few questions about Spark Master and Slave setup:

Here, I have 5 Hadoop nodes (n1, n2, n3, n4, and n5 respectively), at the 
moment I run Spark under these nodes:
n1:Hadoop Active Name node, Hadoop Slave
Spark Active Master 
n2:Hadoop Standby Name Node,Hadoop Salve
Spark Slave
n3: Hadoop Salve
Spark Slave 
n4: Hadoop Salve
Spark Slave 
n5: Hadoop Salve
Spark Slave 

Questions:
Q1: If I set n1 as both Spark Master and Spark Slave, I cannot start the Spark 
Cluster. does it mean that, unlike Hadoop, I cannot use the same machine to be 
both MASTER and SLAVE in Spark?
n1:Hadoop Active Name node, Hadoop Slave
Spark Active Master Spark Slave (failed to Start Spark)
n2:Hadoop Standby Name Node,Hadoop Salve
Spark Slave
n3: Hadoop Salve
Spark Slave 
n4: Hadoop 
SalveSpark Slave 
n5: Hadoop Salve
Spark Slave 

Q2: I am planning Spark HA, what if I use n2 as Spark Standby Master and Spark 
Slave”? is Spark allowed to run Standby Master and Slave under same machine?
n1:Hadoop Active Name node, Hadoop Slave
Spark Active Master 
n2:Hadoop Standby Name Node,Hadoop SalveSpark 
Standby MasterSpark Slave 
n3: Hadoop Salve
Spark Slave 
n4: Hadoop Salve
Spark Slave 
n5:  Hadoop Salve   
Spark Slave 

Q3: Does the Spark Master node do actual computation work like a worker or just 
a pure monitoring node? 

Regards
Arthur

Spark and Shark Node: RAM Allocation

2014-08-30 Thread arthur.hk.c...@gmail.com
Hi,

Is there any formula to calculate proper RAM allocation values for Spark and 
Shark based on Physical RAM, HADOOP and HBASE RAM usage?
e.g. if a node has 32GB physical RAM


spark-defaults.conf
spark.executor.memory   ?g

spark-env.sh
export SPARK_WORKER_MEMORY=?
export HADOOP_HEAPSIZE=?


shark-env.sh
export SPARK_MEM=?g
export SHARK_MASTER_MEM=?g

spark-defaults.conf
spark.executor.memory   ?g


Regards
Arthur




Fwd: What does appMasterRpcPort: -1 indicate ?

2014-08-30 Thread Tao Xiao
I'm using CDH 5.1.0, which bundles Spark 1.0.0 with it.

Following How-to: Run a Simple Apache Spark App in CDH 5
http://blog.cloudera.com/blog/2014/04/how-to-run-a-simple-apache-spark-app-in-cdh-5/
, I tried to submit my job in local mode, Spark Standalone mode and YARN
mode. I successfully submitted my job in local mode and Standalone mode,
however, I noticed the following messages printed on console when I
submitted my job in YARN mode:

 14/08/29 22:27:29 INFO Client: Submitting application to ASM
14/08/29 22:27:29 INFO YarnClientImpl: Submitted application
application_1406949333981_0015
14/08/29 22:27:29 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:30 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:31 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:32 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:33 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:34 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:35 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:36 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:37 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:38 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: -1
 appStartTime: 1409365649836
 yarnAppState: ACCEPTED
 14/08/29 22:27:39 INFO YarnClientSchedulerBackend: Application report from
ASM:
 appMasterRpcPort: 0
 appStartTime: 1409365649836
 yarnAppState: RUNNING

The job finished successfully and produced correct results.
But I'm not sure what those messages mean? Does appMasterRpcPort: -1 indicate
an error or exception ?

Thanks


Re: Low Level Kafka Consumer for Spark

2014-08-30 Thread Roger Hoover
I have this same question.  Isn't there somewhere that the Kafka range
metadata can be saved?  From my naive perspective, it seems like it should
be very similar to HDFS lineage.  The original HDFS blocks are kept
somewhere (in the driver?) so that if an RDD partition is lost, it can be
recomputed.  In this case, all we need is the Kafka topic, partition, and
offset range.

Can someone enlighten us on why two copies of the RDD are needed (or some
other mechanism like a WAL) for fault tolerance when using Kafka but not
when reading from say HDFS?


On Fri, Aug 29, 2014 at 8:58 AM, Jonathan Hodges hodg...@gmail.com wrote:

 'this 2-node replication is mainly for failover in case the receiver dies
 while data is in flight.  there's still chance for data loss as there's no
 write ahead log on the hot path, but this is being addressed.'

 Can you comment a little on how this will be addressed, will there be a
 durable WAL?  Is there a JIRA for tracking this effort?

 I am curious without WAL if you can avoid this data loss with explicit
 management of Kafka offsets e.g. don't commit offset unless data is
 replicated to multiple nodes or maybe not until processed.  The incoming
 data will always be durably stored to disk in Kafka so can be replayed in
 failure scenarios to avoid data loss if the offsets are managed properly.




 On Thu, Aug 28, 2014 at 12:02 PM, Chris Fregly ch...@fregly.com wrote:

 @bharat-

 overall, i've noticed a lot of confusion about how Spark Streaming scales
 - as well as how it handles failover and checkpointing, but we can discuss
 that separately.

 there's actually 2 dimensions to scaling here:  receiving and processing.

 *Receiving*
 receiving can be scaled out by submitting new DStreams/Receivers to the
 cluster as i've done in the Kinesis example.  in fact, i purposely chose to
 submit multiple receivers in my Kinesis example because i feel it should be
 the norm and not the exception - particularly for partitioned and
 checkpoint-capable streaming systems like Kafka and Kinesis.   it's the
 only way to scale.

 a side note here is that each receiver running in the cluster will
 immediately replicates to 1 other node for fault-tolerance of that specific
 receiver.  this is where the confusion lies.  this 2-node replication is
 mainly for failover in case the receiver dies while data is in flight.
  there's still chance for data loss as there's no write ahead log on the
 hot path, but this is being addressed.

 this in mentioned in the docs here:
 https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

 *Processing*
 once data is received, tasks are scheduled across the Spark cluster just
 like any other non-streaming task where you can specify the number of
 partitions for reduces, etc.  this is the part of scaling that is sometimes
 overlooked - probably because it works just like regular Spark, but it is
 worth highlighting.

 Here's a blurb in the docs:
 https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-processing

 the other thing that's confusing with Spark Streaming is that in Scala,
 you need to explicitly

 import
 org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions

 in order to pick up the implicits that allow DStream.reduceByKey and such
 (versus DStream.transform(rddBatch = rddBatch.reduceByKey())

 in other words, DStreams appear to be relatively featureless until you
 discover this implicit.  otherwise, you need to operate on the underlying
 RDD's explicitly which is not ideal.

 the Kinesis example referenced earlier in the thread uses the DStream
 implicits.


 side note to all of this - i've recently convinced my publisher for my
 upcoming book, Spark In Action, to let me jump ahead and write the Spark
 Streaming chapter ahead of other more well-understood libraries.  early
 release is in a month or so.  sign up  @ http://sparkinaction.com if you
 wanna get notified.

 shameless plug that i wouldn't otherwise do, but i really think it will
 help clear a lot of confusion in this area as i hear these questions asked
 a lot in my talks and such.  and i think a clear, crisp story on scaling
 and fault-tolerance will help Spark Streaming's adoption.

 hope that helps!

 -chris




 On Wed, Aug 27, 2014 at 6:32 PM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:

 I agree. This issue should be fixed in Spark rather rely on replay of
 Kafka messages.

 Dib
 On Aug 28, 2014 6:45 AM, RodrigoB rodrigo.boav...@aspect.com wrote:

 Dibyendu,

 Tnks for getting back.

 I believe you are absolutely right. We were under the assumption that
 the
 raw data was being computed again and that's not happening after further
 tests. This applies to Kafka as well.

 The issue is of major priority fortunately.

 Regarding your suggestion, I would maybe prefer to have the problem
 resolved
 within Spark's internals since once the data is replicated we 

Powered By Spark

2014-08-30 Thread Yi Tian
Hi, 

Could you please add Asiainfo to the Powered By Spark page?

Thanks

Asiainfo
www.asiainfo.com
Core, SQL, Streaming, MLlib, GraphX
 
We leverage Spark and Hadoop ecosystem to build cost effective data center 
solution for our customer in teleco industry as well as other industrial 
sectors. Meantime we also build innovative big data applications to help our 
customer in real time marketing, cross product selling, customer behavior 
analysis as well as other areas by using Spark technology.


Yi Tian

Re: saveAsSequenceFile for DStream

2014-08-30 Thread Chris Fregly
couple things to add here:

1) you can import the
org.apache.spark.streaming.dstream.PairDStreamFunctions implicit which adds
a whole ton of functionality to DStream itself.  this lets you work at the
DStream level versus digging into the underlying RDDs.

2) you can use ssc.fileStream(directory) to create an input stream made up
of files in a given directory.  new files will be added to the stream as
they appear in that directory.  note:  files must be immutable.


On Tue, Jul 22, 2014 at 8:39 AM, Barnaby Falls bfa...@outlook.com wrote:

 Thanks Sean! I got that working last night similar to how you solved it.
 Any ideas about how to monitor that same folder in another script by
 creating a stream? I can use sc.sequenceFile() to read in the RDD, but how
 do I get the name of the file that got added since there is no
 sequenceFileStream() method? Thanks again for your help.

  On Jul 22, 2014, at 1:57, Sean Owen so...@cloudera.com wrote:
 
  What about simply:
 
  dstream.foreachRDD(_.saveAsSequenceFile(...))
 
  ?
 
  On Tue, Jul 22, 2014 at 2:06 AM, Barnaby bfa...@outlook.com wrote:
  First of all, I do not know Scala, but learning.
 
  I'm doing a proof of concept by streaming content from a socket,
 counting
  the words and write it to a Tachyon disk. A different script will read
 the
  file stream and print out the results.
 
  val lines = ssc.socketTextStream(args(0), args(1).toInt,
  StorageLevel.MEMORY_AND_DISK_SER)
  val words = lines.flatMap(_.split( ))
  val wordCounts = words.map(x = (x, 1)).reduceByKey(_ + _)
  wordCounts.saveAs???Files(tachyon://localhost:19998/files/WordCounts)
  ssc.start()
  ssc.awaitTermination()
 
  I already did a proof of concept to write and read sequence files but
 there
  doesn't seem to be a saveAsSequenceFiles() method in DStream. What is
 the
  best way to write out an RDD to a stream so that the timestamps are in
 the
  filenames and so there is minimal overhead in reading the data back in
 as
  objects, see below.
 
  My simple successful proof was the following:
  val rdd =  sc.parallelize(Array((a,2), (b,3), (c,1)))
  rdd.saveAsSequenceFile(tachyon://.../123.sf2)
  val rdd2 = sc.sequenceFile[String,Int](tachyon://.../123.sf2)
 
  How can I do something similar with streaming?
 
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/saveAsSequenceFile-for-DStream-tp10369.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.



How can a deserialized Java object be stored on disk?

2014-08-30 Thread Tao Xiao
Reading about RDD Persistency
https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence,
I
learned that the storage level MEMORY_AND_DISK means that  Store RDD as
deserialized Java objects in the JVM. If the RDD does not fit in memory,
store the partitions that don't fit on disk, and read them from there when
they're needed. 

But how can a deserialized Java object be stored on disk? As far as I
know, a Java object should be stored as an array of bytes on disk, which
means that Java object should be firtly converted into an array of bytes (a
serialized object).

Thanks .


Re: data locality

2014-08-30 Thread Chris Fregly
you can view the Locality Level of each task within a stage by using the
Spark Web UI under the Stages tab.

levels are as follows (in order of decreasing desirability):
1) PROCESS_LOCAL - data was found directly in the executor JVM
2) NODE_LOCAL - data was found on the same node as the executor JVM
3) RACK_LOCAL - data was found in the same rack
4) ANY - outside the rack

also, the Aggregated Metrics by Executor section of the Stage detail view
shows how much data is being shuffled across the network (Shuffle
Read/Write).  0 is where you wanna be with that metric.

-chris


On Fri, Jul 25, 2014 at 4:13 AM, Tsai Li Ming mailingl...@ltsai.com wrote:

 Hi,

 In the standalone mode, how can we check data locality is working as
 expected when tasks are assigned?

 Thanks!


 On 23 Jul, 2014, at 12:49 am, Sandy Ryza sandy.r...@cloudera.com wrote:

 On standalone there is still special handling for assigning tasks within
 executors.  There just isn't special handling for where to place executors,
 because standalone generally places an executor on every node.


 On Mon, Jul 21, 2014 at 7:42 PM, Haopu Wang hw...@qilinsoft.com wrote:

   Sandy,



 I just tried the standalone cluster and didn't have chance to try Yarn
 yet.

 So if I understand correctly, there are **no** special handling of task
 assignment according to the HDFS block's location when Spark is running as
 a **standalone** cluster.

 Please correct me if I'm wrong. Thank you for your patience!


  --

 *From:* Sandy Ryza [mailto:sandy.r...@cloudera.com]
 *Sent:* 2014年7月22日 9:47

 *To:* user@spark.apache.org
 *Subject:* Re: data locality



 This currently only works for YARN.  The standalone default is to place
 an executor on every node for every job.



 The total number of executors is specified by the user.



 -Sandy



 On Fri, Jul 18, 2014 at 2:00 AM, Haopu Wang hw...@qilinsoft.com wrote:

 Sandy,



 Do you mean the “preferred location” is working for standalone cluster
 also? Because I check the code of SparkContext and see comments as below:



   // This is used only by YARN for now, but should be relevant to other
 cluster types (*Mesos*,

   // etc) too. This is typically generated from
 InputFormatInfo.computePreferredLocations. It

   // contains a map from *hostname* to a list of input format splits on
 the host.

   *private*[spark] *var* preferredNodeLocationData: Map[String,
 Set[SplitInfo]] = Map()



 BTW, even with the preferred hosts, how does Spark decide how many total
 executors to use for this application?



 Thanks again!


  --

 *From:* Sandy Ryza [mailto:sandy.r...@cloudera.com]
 *Sent:* Friday, July 18, 2014 3:44 PM
 *To:* user@spark.apache.org
 *Subject:* Re: data locality



 Hi Haopu,



 Spark will ask HDFS for file block locations and try to assign tasks
 based on these.



 There is a snag.  Spark schedules its tasks inside of executor
 processes that stick around for the lifetime of a Spark application.  Spark
 requests executors before it runs any jobs, i.e. before it has any
 information about where the input data for the jobs is located.  If the
 executors occupy significantly fewer nodes than exist in the cluster, it
 can be difficult for Spark to achieve data locality.  The workaround for
 this is an API that allows passing in a set of preferred locations when
 instantiating a Spark context.  This API is currently broken in Spark 1.0,
 and will likely changed to be something a little simpler in a future
 release.



 val locData = InputFormatInfo.computePreferredLocations

   (Seq(new InputFormatInfo(conf, classOf[TextInputFormat], new
 Path(“myfile.txt”)))



 val sc = new SparkContext(conf, locData)



 -Sandy





 On Fri, Jul 18, 2014 at 12:35 AM, Haopu Wang hw...@qilinsoft.com wrote:

 I have a standalone spark cluster and a HDFS cluster which share some of
 nodes.



 When reading HDFS file, how does spark assign tasks to nodes? Will it ask
 HDFS the location for each file block in order to get a right worker node?



 How about a spark cluster on Yarn?



 Thank you very much!












Re: Low Level Kafka Consumer for Spark

2014-08-30 Thread Tim Smith
I'd be interested to understand this mechanism as well. But this is the
error recovery part of the equation. Consuming from Kafka has two aspects -
parallelism and error recovery and I am not sure how either works. For
error recovery, I would like to understand how:
- A failed receiver gets re-spawned. In 1.0.0, despite settings failed
tasks threshold to 64, my job aborts after 4 receiver task failures.
- Data loss recovery due to a failed receiver task/executor.


 For parallelism, I would expect a single createStream() to intelligently
map a receiver thread somewhere, one for each kafka partition, but in
different JVMs. Also, repartition() does not seem to work as advertised. A
repartition(512) should get nodes other than the receiver nodes to get some
RDDs to process. No?


On Sat, Aug 30, 2014 at 7:14 PM, Roger Hoover roger.hoo...@gmail.com
wrote:

 I have this same question.  Isn't there somewhere that the Kafka range
 metadata can be saved?  From my naive perspective, it seems like it should
 be very similar to HDFS lineage.  The original HDFS blocks are kept
 somewhere (in the driver?) so that if an RDD partition is lost, it can be
 recomputed.  In this case, all we need is the Kafka topic, partition, and
 offset range.

 Can someone enlighten us on why two copies of the RDD are needed (or some
 other mechanism like a WAL) for fault tolerance when using Kafka but not
 when reading from say HDFS?



 On Fri, Aug 29, 2014 at 8:58 AM, Jonathan Hodges hodg...@gmail.com
 wrote:

 'this 2-node replication is mainly for failover in case the receiver
 dies while data is in flight.  there's still chance for data loss as
 there's no write ahead log on the hot path, but this is being addressed.'

 Can you comment a little on how this will be addressed, will there be a
 durable WAL?  Is there a JIRA for tracking this effort?

 I am curious without WAL if you can avoid this data loss with explicit
 management of Kafka offsets e.g. don't commit offset unless data is
 replicated to multiple nodes or maybe not until processed.  The incoming
 data will always be durably stored to disk in Kafka so can be replayed in
 failure scenarios to avoid data loss if the offsets are managed properly.




 On Thu, Aug 28, 2014 at 12:02 PM, Chris Fregly ch...@fregly.com wrote:

 @bharat-

 overall, i've noticed a lot of confusion about how Spark Streaming
 scales - as well as how it handles failover and checkpointing, but we can
 discuss that separately.

 there's actually 2 dimensions to scaling here:  receiving and processing.

 *Receiving*
 receiving can be scaled out by submitting new DStreams/Receivers to the
 cluster as i've done in the Kinesis example.  in fact, i purposely chose to
 submit multiple receivers in my Kinesis example because i feel it should be
 the norm and not the exception - particularly for partitioned and
 checkpoint-capable streaming systems like Kafka and Kinesis.   it's the
 only way to scale.

 a side note here is that each receiver running in the cluster will
 immediately replicates to 1 other node for fault-tolerance of that specific
 receiver.  this is where the confusion lies.  this 2-node replication is
 mainly for failover in case the receiver dies while data is in flight.
  there's still chance for data loss as there's no write ahead log on the
 hot path, but this is being addressed.

 this in mentioned in the docs here:
 https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving

 *Processing*
 once data is received, tasks are scheduled across the Spark cluster just
 like any other non-streaming task where you can specify the number of
 partitions for reduces, etc.  this is the part of scaling that is sometimes
 overlooked - probably because it works just like regular Spark, but it is
 worth highlighting.

 Here's a blurb in the docs:
 https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-processing

 the other thing that's confusing with Spark Streaming is that in Scala,
 you need to explicitly

 import
 org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions

 in order to pick up the implicits that allow DStream.reduceByKey and
 such (versus DStream.transform(rddBatch = rddBatch.reduceByKey())

 in other words, DStreams appear to be relatively featureless until you
 discover this implicit.  otherwise, you need to operate on the underlying
 RDD's explicitly which is not ideal.

 the Kinesis example referenced earlier in the thread uses the DStream
 implicits.


 side note to all of this - i've recently convinced my publisher for my
 upcoming book, Spark In Action, to let me jump ahead and write the Spark
 Streaming chapter ahead of other more well-understood libraries.  early
 release is in a month or so.  sign up  @ http://sparkinaction.com if
 you wanna get notified.

 shameless plug that i wouldn't otherwise do, but i really think it will
 help clear a lot of confusion in this area