Re: SparkSQL HiveContext No Suitable Driver / Cannot Find Driver
Oh, forgot to add the managed libraries and the Hive libraries within the CLASSPATH. As soon as I did that, we’re good to go now. On August 29, 2014 at 22:55:47, Denny Lee (denny.g@gmail.com) wrote: My issue is similar to the issue as noted http://mail-archives.apache.org/mod_mbox/incubator-spark-user/201406.mbox/%3ccadoad2ks9_qgeign5-w7xogmrotrlbchvfukctgstj5qp9q...@mail.gmail.com%3E. Currently using Spark-1.1 (grabbed from git two days ago) and using Hive 0.12 with my metastore in MySQL. If I run any HiveContext statements, it results in cannot find the driver in CLASSPATH error. If I include it via —jars then it gives me the error “no suitable driver”. Any ideas on how to get the Hive context to work here? Thanks! Denny
spark-ec2 [Errno 110] Connection time out
I'm following the latest documentation on configuring a cluster on ec2 (http://spark.apache.org/docs/latest/ec2-scripts.html). Running ./spark-ec2 -k Blah -i .ssh/Blah.pem -s 2 launch spark-ec2-test gets a generic timeout error that's coming from File ./spark_ec2.py, line 717, in real_main conn = ec2.connect_to_region(opts.region) Any suggestions on how to debug the cause of the timeout? Note: I replaced the name of my keypair with Blah. Thanks, David -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-ec2-Errno-110-Connection-time-out-tp13171.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark on yarn with hive
I want to let hive run on spark and yarn clusters,Hive Metastore is stored in MySQL I compiled spark code: sh make-distribution.sh --hadoop 2.4.1 --with-yarn --skpi-java-test --tgz --with-hive My HQL code: import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql._ import org.apache.spark.sql.hive.LocalHiveContext object HqlTest { case class Record(key: Int, value: String) def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(HiveFromSpark) val sc = new SparkContext(sparkConf) val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) import hiveContext._ hiveContext.hql(FROM tmp_adclick_udc select uid).collect().foreach(println) } } I submitted the job code: /usr/local/webserver/sparkhive/bin/spark-submit --class HqlTest --master yarn --deploy-mode cluster --queue sls_queue_1 --num-executors 5 --driver-memory 6g --executor-memory 20g --executor-cores 5 target/scala-2.10/simple-project_2.10-1.0.jar /user/www/udc/output/2014-08-10/* /user/www/udc/input/platformuvpv2/2014-08-10 I put hive-site.xml into spark/conf/. But, I get the following error: 14/08/30 16:30:49 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 14/08/30 16:30:49 INFO Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 14/08/30 16:30:54 ERROR Hive: NoSuchObjectException(message:default.tmp_adclick_udc table not found) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.get_table(HiveMetaStore.java:1373) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:103) at com.sun.proxy.$Proxy26.get_table(Unknown Source) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getTable(HiveMetaStoreClient.java:854) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) thanks -- cente...@gmail.com|齐忠
Re: Low Level Kafka Consumer for Spark
I'm no expert. But as I understand, yes you create multiple streams to consume multiple partitions in parallel. If they're all in the same Kafka consumer group, you'll get exactly one copy of the message so yes if you have 10 consumers and 3 Kafka partitions I believe only 3 will be getting messages. The parallelism of Spark's processing of the RDDs of those messages is different. There could be 4 partitions in your RDDs doing the work. This is the kind of thing you potentially influence with repartition. That is I believe you can get more tasks processing the messages even if you are only able to consume messages from the queue with 3-way parallelism, since the queue has 3 partitions. On Aug 30, 2014 12:56 AM, Tim Smith secs...@gmail.com wrote: Ok, so I did this: val kInStreams = (1 to 10).map{_ = KafkaUtils.createStream(ssc,zkhost1:2181/zk_kafka,testApp,Map(rawunstruct - 1)) } val kInMsg = ssc.union(kInStreams) val outdata = kInMsg.map(x=normalizeLog(x._2,configMap)) This has improved parallelism. Earlier I would only get a Stream 0. Now I have Streams [0-9]. Of course, since the kafka topic has only three partitions, only three of those streams are active but I am seeing more blocks being pulled across the three streams total that what one was doing earlier. Also, four nodes are actively processing tasks (vs only two earlier) now which actually has me confused. If Streams are active only on 3 nodes then how/why did a 4th node get work? If a 4th got work why aren't more nodes getting work? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Hive max key length is 767 bytes
Hi Michael, Thank you so much!! I have tried to change the following key length from 256 to 255 and from 767 to 766, it still didn’t work alter table COLUMNS_V2 modify column COMMENT VARCHAR(255); alter table INDEX_PARAMS modify column PARAM_KEY VARCHAR(255); alter table SD_PARAMS modify column PARAM_KEY VARCHAR(255); alter table SERDE_PARAMS modify column PARAM_KEY VARCHAR(255); alter table TABLE_PARAMS modify column PARAM_KEY VARCHAR(255); alter table TBLS modify column OWNER VARCHAR(766); alter table PART_COL_STATS modify column PARTITION_NAME VARCHAR(766); alter table PARTITION_KEYS modify column PKEY_TYPE VARCHAR(766); alter table PARTITIONS modify column PART_NAME VARCHAR(766); I use Hadoop 2.4.1 HBase 0.98.5 Hive 0.13, trying Spark 1.0.2 and Shark 0.9.2, and JDK1.6_45. Some questions: shark-0.9.2 is based on which Hive version? is HBase 0.98.x OK? is Hive 0.13.1 OK? and which Java? (I use JDK1.6 at the moment, it seems not working) spark-1.0.2 is based on which Hive version? is HBase 0.98.x OK? Regards Arthur On 30 Aug, 2014, at 1:40 am, Michael Armbrust mich...@databricks.com wrote: Spark SQL is based on Hive 12. They must have changed the maximum key size between 12 and 13. On Fri, Aug 29, 2014 at 4:38 AM, arthur.hk.c...@gmail.com arthur.hk.c...@gmail.com wrote: Hi, Tried the same thing in HIVE directly without issue: HIVE: hive create table test_datatype2 (testbigint bigint ); OK Time taken: 0.708 seconds hive drop table test_datatype2; OK Time taken: 23.272 seconds Then tried again in SPARK: scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 14/08/29 19:33:52 INFO Configuration.deprecation: mapred.reduce.tasks.speculative.execution is deprecated. Instead, use mapreduce.reduce.speculative hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@395c7b94 scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) res0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:104 == Query Plan == Native command: executed by Hive scala hiveContext.hql(drop table test_datatype3) 14/08/29 19:34:14 ERROR DataNucleus.Datastore: An exception was thrown while adding/validating class(es) : Specified key was too long; max key length is 767 bytes com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was too long; max key length is 767 bytes at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) 14/08/29 19:34:17 WARN DataNucleus.Query: Query for candidates of org.apache.hadoop.hive.metastore.model.MPartition and subclasses resulted in no possible candidates Error(s) were found while auto-creating/validating the datastore for classes. The errors are printed in the log, and are attached to this exception. org.datanucleus.exceptions.NucleusDataStoreException: Error(s) were found while auto-creating/validating the datastore for classes. The errors are printed in the log, and are attached to this exception. at org.datanucleus.store.rdbms.RDBMSStoreManager$ClassAdder.verifyErrors(RDBMSStoreManager.java:3609) Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was too long; max key length is 767 bytes at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 14/08/29 19:34:25 ERROR DataNucleus.Datastore: An exception was thrown while adding/validating class(es) : Specified key was too long; max key length is 767 bytes com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was too long; max key length is 767 bytes at
Re: org.apache.spark.examples.xxx
It bundles all these src's https://github.com/apache/spark/tree/master/examples together and also it uses the pom file to get the dependencies list if I'm not wrong. Thanks Best Regards On Fri, Aug 29, 2014 at 12:39 AM, filipus floe...@gmail.com wrote: hey guys i still try to get used to compile and run the example code why does the run_example code submit the class with an org.apache.spark.examples in front of the class itself? probably a stupid question but i would be glad some one of you explains by the way.. how was the spark...example...jar file build? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-examples-xxx-tp13052.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: org.apache.spark.examples.xxx
bq. how was the spark...example...jar file build? You can use the following command to build against hadoop 2.4: mvn -Phadoop-2.4,yarn -Dhadoop.version=2.4.1 -DskipTests clean package examples jar can be found under examples/target Cheers On Sat, Aug 30, 2014 at 6:54 AM, Akhil Das ak...@sigmoidanalytics.com wrote: It bundles all these src's https://github.com/apache/spark/tree/master/examples together and also it uses the pom file to get the dependencies list if I'm not wrong. Thanks Best Regards On Fri, Aug 29, 2014 at 12:39 AM, filipus floe...@gmail.com wrote: hey guys i still try to get used to compile and run the example code why does the run_example code submit the class with an org.apache.spark.examples in front of the class itself? probably a stupid question but i would be glad some one of you explains by the way.. how was the spark...example...jar file build? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-examples-xxx-tp13052.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: org.apache.spark.examples.xxx
i try to get use to sbt in order to build stnd allone application by myself the example SimpleApp i managed to run than i tried to copy some example scala program like LinearRegression in a local directory . ./build.sbt ./src ./src/main ./src/main/scala ./src/main/scala/LinearRegression.scala my build.sbt even when I dont know what I do looks like name := Linear Regression version := 1.0 scalaVersion := 2.10.4 libraryDependencies += org.apache.spark %% spark-core % 1.0.2 libraryDependencies += org.apache.spark %% spark-mllib % 1.0.2 libraryDependencies += com.github.scopt %% scopt % 3.2.0 resolvers += Akka Repository at http://repo.akka.io/releases/; By the way... first i tried scalaVersion := 2.11.2 which is my installed version. but this faild ... sbt package builds a jar file in target but the command spark-submit --class LinearRegression --master local[2] target/scala-2.10/linear-regression_2.10-1.0.jar ~/git/spark/data/mllib/sample_linear_regression_data.txt didnt work. it tells me Spark assembly has been built with Hive, including Datanucleus jars on classpath Exception in thread main java.lang.NoClassDefFoundError: scopt/OptionParser at java.lang.Class.getDeclaredMethods0(Native Method) AHHH: I comented /*package org.apache.spark.examples.mllib*/ in LinearRegression.scala because otherwise it doesnt find the main class Exception in thread main java.lang.ClassNotFoundException: LinearRegression when I does the same with the pre build jar package of examples everything works fine spark-submit --class org.apache.spark.examples.mllib.LinearRegression --master local[2] lib/spark-examples-1.0.2-hadoop2.2.0.jar ~/git/spark/data/mllib/sample_linear_regression_data.txt works !!! spark-submit --class org.apache.spark.examples.mllib.LinearRegression --master local[2] lib/spark-examples-1.0.2-hadoop2.2.0.jar ~/git/spark/data/mllib/sample_linear_regression_data.txt -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-examples-xxx-tp13052p13178.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Mapping Hadoop Reduce to Spark
When programming in Hadoop it is possible to guarantee 1) All keys sent to a specific partition will be handled by the same machine (thread) 2) All keys received by a specific machine (thread) will be received in sorted order 3) These conditions will hold even if the values associated with a specific key are too large enough to fit in memory. In my Hadoop code I use all of these conditions - specifically with my larger data sets the size of data I wish to group exceeds the available memory. I think I understand the operation of groupby but my understanding is that this requires that the results for a single key, and perhaps all keys fit on a single machine. Is there away to perform like Hadoop ad not require that an entire group fir in memory?
Re: org.apache.spark.examples.xxx
compilation works but execution not at least with spark-submit as I described above when I make a local copy of the training set I can execute sbt run file which works sbt run sample_linear_regression_data.txt when I do sbt run ~/git/spark/data/mllib/sample_linear_regression_data.txt the program fails because it doesnt find any traning set at [error] (run-main-0) org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:*/home/filip/spark-ex-regression/*~/git/spark/data/mllib/sample_linear_regression_data.txt org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/filip/spark-ex-regression/~/git/spark/data/mllib/sample_linear_regression_data.txt ps: does anybody knows where in the program LinearRegression.scala it specifies the PATH or has it to do with sbt??? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-examples-xxx-tp13052p13180.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: org.apache.spark.examples.xxx
Did you run sbt under /home/filip/spark-ex-regression ? '~/git/spark/data/mllib/sample_linear_regression_data.txt' was interpreted as rooted under /home/filip/spark-ex-regression Cheers On Sat, Aug 30, 2014 at 9:28 AM, filipus floe...@gmail.com wrote: compilation works but execution not at least with spark-submit as I described above when I make a local copy of the training set I can execute sbt run file which works sbt run sample_linear_regression_data.txt when I do sbt run ~/git/spark/data/mllib/sample_linear_regression_data.txt the program fails because it doesnt find any traning set at [error] (run-main-0) org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:*/home/filip/spark-ex-regression/*~/git/spark/data/mllib/sample_linear_regression_data.txt org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/filip/spark-ex-regression/~/git/spark/data/mllib/sample_linear_regression_data.txt ps: does anybody knows where in the program LinearRegression.scala it specifies the PATH or has it to do with sbt??? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-examples-xxx-tp13052p13180.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: org.apache.spark.examples.xxx
ok I see :-) .. instead of ~ works fine so do you know the reason sbt run [options] works after sbt package but spark-submit --class ClassName --master local[2] target/scala/JarPackage.jar [options] doesnt? it cannot resolve everything somehow -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-examples-xxx-tp13052p13182.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Mapping Hadoop Reduce to Spark
In 1.1, you'll be able to get all of these properties using sortByKey, and then mapPartitions on top to iterate through the key-value pairs. Unfortunately sortByKey does not let you control the Partitioner, but it's fairly easy to write your own version that does if this is important. In previous versions, the values for each key had to fit in memory (though we could have data on disk across keys), and this is still true for groupByKey, cogroup and join. Those restrictions will hopefully go away in a later release. But sortByKey + mapPartitions lets you just iterate through the key-value pairs without worrying about this. Matei On August 30, 2014 at 9:04:37 AM, Steve Lewis (lordjoe2...@gmail.com) wrote: When programming in Hadoop it is possible to guarantee 1) All keys sent to a specific partition will be handled by the same machine (thread) 2) All keys received by a specific machine (thread) will be received in sorted order 3) These conditions will hold even if the values associated with a specific key are too large enough to fit in memory. In my Hadoop code I use all of these conditions - specifically with my larger data sets the size of data I wish to group exceeds the available memory. I think I understand the operation of groupby but my understanding is that this requires that the results for a single key, and perhaps all keys fit on a single machine. Is there away to perform like Hadoop ad not require that an entire group fir in memory?
Re: Spark Hive max key length is 767 bytes
Oh, you may be running into an issue with your MySQL setup actually, try running alter database metastore_db character set latin1 so that way Hive (and the Spark HiveContext) can execute properly against the metastore. On August 29, 2014 at 04:39:01, arthur.hk.c...@gmail.com (arthur.hk.c...@gmail.com) wrote: Hi, Tried the same thing in HIVE directly without issue: HIVE: hive create table test_datatype2 (testbigint bigint ); OK Time taken: 0.708 seconds hive drop table test_datatype2; OK Time taken: 23.272 seconds Then tried again in SPARK: scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 14/08/29 19:33:52 INFO Configuration.deprecation: mapred.reduce.tasks.speculative.execution is deprecated. Instead, use mapreduce.reduce.speculative hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@395c7b94 scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) res0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:104 == Query Plan == Native command: executed by Hive scala hiveContext.hql(drop table test_datatype3) 14/08/29 19:34:14 ERROR DataNucleus.Datastore: An exception was thrown while adding/validating class(es) : Specified key was too long; max key length is 767 bytes com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was too long; max key length is 767 bytes at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) 14/08/29 19:34:17 WARN DataNucleus.Query: Query for candidates of org.apache.hadoop.hive.metastore.model.MPartition and subclasses resulted in no possible candidates Error(s) were found while auto-creating/validating the datastore for classes. The errors are printed in the log, and are attached to this exception. org.datanucleus.exceptions.NucleusDataStoreException: Error(s) were found while auto-creating/validating the datastore for classes. The errors are printed in the log, and are attached to this exception. at org.datanucleus.store.rdbms.RDBMSStoreManager$ClassAdder.verifyErrors(RDBMSStoreManager.java:3609) Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was too long; max key length is 767 bytes at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 14/08/29 19:34:17 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 14/08/29 19:34:25 ERROR DataNucleus.Datastore: An exception was thrown while adding/validating class(es) : Specified key was too long; max key length is 767 bytes com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was too long; max key length is 767 bytes at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) Can anyone please help? Regards Arthur On 29 Aug, 2014, at 12:47 pm, arthur.hk.c...@gmail.com arthur.hk.c...@gmail.com wrote: (Please ignore if duplicated) Hi, I use Spark 1.0.2 with Hive 0.13.1 I have already set the hive mysql database to latine1; mysql: alter database hive character set latin1; Spark: scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) scala hiveContext.hql(create table test_datatype1 (testbigint bigint )) scala hiveContext.hql(drop table test_datatype1) 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MOrder is tagged as embedded-only so does not have its own datastore table. 14/08/29 12:31:55 INFO DataNucleus.Datastore: The class org.apache.hadoop.hive.metastore.model.MFieldSchema is tagged as embedded-only so does not have its own datastore table. 14/08/29 12:31:55 INFO
Re: Spark Hive max key length is 767 bytes
Hi, Already done but still get the same error: (I use HIVE 0.13.1 Spark 1.0.2, Hadoop 2.4.1) Steps: Step 1) mysql: alter database hive character set latin1; Step 2) HIVE: hive create table test_datatype2 (testbigint bigint ); OK Time taken: 0.708 seconds hive drop table test_datatype2; OK Time taken: 23.272 seconds Step 3) scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 14/08/29 19:33:52 INFO Configuration.deprecation: mapred.reduce.tasks.speculative.execution is deprecated. Instead, use mapreduce.reduce.speculative hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@395c7b94 scala hiveContext.hql(“create table test_datatype3 (testbigint bigint)”) res0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:104 == Query Plan == Native command: executed by Hive scala hiveContext.hql(drop table test_datatype3) 14/08/29 19:34:14 ERROR DataNucleus.Datastore: An exception was thrown while adding/validating class(es) : Specified key was too long; max key length is 767 bytes com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was too long; max key length is 767 bytes at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) 14/08/29 19:34:17 WARN DataNucleus.Query: Query for candidates of org.apache.hadoop.hive.metastore.model.MPartition and subclasses resulted in no possible candidates Error(s) were found while auto-creating/validating the datastore for classes. The errors are printed in the log, and are attached to this exception. org.datanucleus.exceptions.NucleusDataStoreException: Error(s) were found while auto-creating/validating the datastore for classes. The errors are printed in the log, and are attached to this exception. at org.datanucleus.store.rdbms.RDBMSStoreManager$ClassAdder.verifyErrors(RDBMSStoreManager.java:3609) Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was too long; max key length is 767 bytes at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) Should I use HIVE 0.12.0 instead of HIVE 0.13.1? Regards Arthur On 31 Aug, 2014, at 6:01 am, Denny Lee denny.g@gmail.com wrote: Oh, you may be running into an issue with your MySQL setup actually, try running alter database metastore_db character set latin1 so that way Hive (and the Spark HiveContext) can execute properly against the metastore. On August 29, 2014 at 04:39:01, arthur.hk.c...@gmail.com (arthur.hk.c...@gmail.com) wrote: Hi, Tried the same thing in HIVE directly without issue: HIVE: hive create table test_datatype2 (testbigint bigint ); OK Time taken: 0.708 seconds hive drop table test_datatype2; OK Time taken: 23.272 seconds Then tried again in SPARK: scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 14/08/29 19:33:52 INFO Configuration.deprecation: mapred.reduce.tasks.speculative.execution is deprecated. Instead, use mapreduce.reduce.speculative hiveContext: org.apache.spark.sql.hive.HiveContext = org.apache.spark.sql.hive.HiveContext@395c7b94 scala val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) res0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:104 == Query Plan == Native command: executed by Hive scala hiveContext.hql(drop table test_datatype3) 14/08/29 19:34:14 ERROR DataNucleus.Datastore: An exception was thrown while adding/validating class(es) : Specified key was too long; max key length is 767 bytes com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was too long; max key length is 767 bytes at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) 14/08/29 19:34:17 WARN DataNucleus.Query: Query for candidates of org.apache.hadoop.hive.metastore.model.MPartition and subclasses resulted in no possible candidates Error(s) were found while auto-creating/validating the datastore for classes. The errors are printed in the log, and are attached to this exception. org.datanucleus.exceptions.NucleusDataStoreException: Error(s) were found while auto-creating/validating the datastore for classes. The errors are printed in the log, and are attached to this exception. at org.datanucleus.store.rdbms.RDBMSStoreManager$ClassAdder.verifyErrors(RDBMSStoreManager.java:3609) Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was too long; max key length is 767 bytes at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39) 14/08/29 19:34:17 INFO
Spark Master/Slave and HA
Hi, I have few questions about Spark Master and Slave setup: Here, I have 5 Hadoop nodes (n1, n2, n3, n4, and n5 respectively), at the moment I run Spark under these nodes: n1:Hadoop Active Name node, Hadoop Slave Spark Active Master n2:Hadoop Standby Name Node,Hadoop Salve Spark Slave n3: Hadoop Salve Spark Slave n4: Hadoop Salve Spark Slave n5: Hadoop Salve Spark Slave Questions: Q1: If I set n1 as both Spark Master and Spark Slave, I cannot start the Spark Cluster. does it mean that, unlike Hadoop, I cannot use the same machine to be both MASTER and SLAVE in Spark? n1:Hadoop Active Name node, Hadoop Slave Spark Active Master Spark Slave (failed to Start Spark) n2:Hadoop Standby Name Node,Hadoop Salve Spark Slave n3: Hadoop Salve Spark Slave n4: Hadoop SalveSpark Slave n5: Hadoop Salve Spark Slave Q2: I am planning Spark HA, what if I use n2 as Spark Standby Master and Spark Slave”? is Spark allowed to run Standby Master and Slave under same machine? n1:Hadoop Active Name node, Hadoop Slave Spark Active Master n2:Hadoop Standby Name Node,Hadoop SalveSpark Standby MasterSpark Slave n3: Hadoop Salve Spark Slave n4: Hadoop Salve Spark Slave n5: Hadoop Salve Spark Slave Q3: Does the Spark Master node do actual computation work like a worker or just a pure monitoring node? Regards Arthur
Spark and Shark Node: RAM Allocation
Hi, Is there any formula to calculate proper RAM allocation values for Spark and Shark based on Physical RAM, HADOOP and HBASE RAM usage? e.g. if a node has 32GB physical RAM spark-defaults.conf spark.executor.memory ?g spark-env.sh export SPARK_WORKER_MEMORY=? export HADOOP_HEAPSIZE=? shark-env.sh export SPARK_MEM=?g export SHARK_MASTER_MEM=?g spark-defaults.conf spark.executor.memory ?g Regards Arthur
Fwd: What does appMasterRpcPort: -1 indicate ?
I'm using CDH 5.1.0, which bundles Spark 1.0.0 with it. Following How-to: Run a Simple Apache Spark App in CDH 5 http://blog.cloudera.com/blog/2014/04/how-to-run-a-simple-apache-spark-app-in-cdh-5/ , I tried to submit my job in local mode, Spark Standalone mode and YARN mode. I successfully submitted my job in local mode and Standalone mode, however, I noticed the following messages printed on console when I submitted my job in YARN mode: 14/08/29 22:27:29 INFO Client: Submitting application to ASM 14/08/29 22:27:29 INFO YarnClientImpl: Submitted application application_1406949333981_0015 14/08/29 22:27:29 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:30 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:31 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:32 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:33 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:34 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:35 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:36 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:37 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:38 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1409365649836 yarnAppState: ACCEPTED 14/08/29 22:27:39 INFO YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: 0 appStartTime: 1409365649836 yarnAppState: RUNNING The job finished successfully and produced correct results. But I'm not sure what those messages mean? Does appMasterRpcPort: -1 indicate an error or exception ? Thanks
Re: Low Level Kafka Consumer for Spark
I have this same question. Isn't there somewhere that the Kafka range metadata can be saved? From my naive perspective, it seems like it should be very similar to HDFS lineage. The original HDFS blocks are kept somewhere (in the driver?) so that if an RDD partition is lost, it can be recomputed. In this case, all we need is the Kafka topic, partition, and offset range. Can someone enlighten us on why two copies of the RDD are needed (or some other mechanism like a WAL) for fault tolerance when using Kafka but not when reading from say HDFS? On Fri, Aug 29, 2014 at 8:58 AM, Jonathan Hodges hodg...@gmail.com wrote: 'this 2-node replication is mainly for failover in case the receiver dies while data is in flight. there's still chance for data loss as there's no write ahead log on the hot path, but this is being addressed.' Can you comment a little on how this will be addressed, will there be a durable WAL? Is there a JIRA for tracking this effort? I am curious without WAL if you can avoid this data loss with explicit management of Kafka offsets e.g. don't commit offset unless data is replicated to multiple nodes or maybe not until processed. The incoming data will always be durably stored to disk in Kafka so can be replayed in failure scenarios to avoid data loss if the offsets are managed properly. On Thu, Aug 28, 2014 at 12:02 PM, Chris Fregly ch...@fregly.com wrote: @bharat- overall, i've noticed a lot of confusion about how Spark Streaming scales - as well as how it handles failover and checkpointing, but we can discuss that separately. there's actually 2 dimensions to scaling here: receiving and processing. *Receiving* receiving can be scaled out by submitting new DStreams/Receivers to the cluster as i've done in the Kinesis example. in fact, i purposely chose to submit multiple receivers in my Kinesis example because i feel it should be the norm and not the exception - particularly for partitioned and checkpoint-capable streaming systems like Kafka and Kinesis. it's the only way to scale. a side note here is that each receiver running in the cluster will immediately replicates to 1 other node for fault-tolerance of that specific receiver. this is where the confusion lies. this 2-node replication is mainly for failover in case the receiver dies while data is in flight. there's still chance for data loss as there's no write ahead log on the hot path, but this is being addressed. this in mentioned in the docs here: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving *Processing* once data is received, tasks are scheduled across the Spark cluster just like any other non-streaming task where you can specify the number of partitions for reduces, etc. this is the part of scaling that is sometimes overlooked - probably because it works just like regular Spark, but it is worth highlighting. Here's a blurb in the docs: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-processing the other thing that's confusing with Spark Streaming is that in Scala, you need to explicitly import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions in order to pick up the implicits that allow DStream.reduceByKey and such (versus DStream.transform(rddBatch = rddBatch.reduceByKey()) in other words, DStreams appear to be relatively featureless until you discover this implicit. otherwise, you need to operate on the underlying RDD's explicitly which is not ideal. the Kinesis example referenced earlier in the thread uses the DStream implicits. side note to all of this - i've recently convinced my publisher for my upcoming book, Spark In Action, to let me jump ahead and write the Spark Streaming chapter ahead of other more well-understood libraries. early release is in a month or so. sign up @ http://sparkinaction.com if you wanna get notified. shameless plug that i wouldn't otherwise do, but i really think it will help clear a lot of confusion in this area as i hear these questions asked a lot in my talks and such. and i think a clear, crisp story on scaling and fault-tolerance will help Spark Streaming's adoption. hope that helps! -chris On Wed, Aug 27, 2014 at 6:32 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: I agree. This issue should be fixed in Spark rather rely on replay of Kafka messages. Dib On Aug 28, 2014 6:45 AM, RodrigoB rodrigo.boav...@aspect.com wrote: Dibyendu, Tnks for getting back. I believe you are absolutely right. We were under the assumption that the raw data was being computed again and that's not happening after further tests. This applies to Kafka as well. The issue is of major priority fortunately. Regarding your suggestion, I would maybe prefer to have the problem resolved within Spark's internals since once the data is replicated we
Powered By Spark
Hi, Could you please add Asiainfo to the Powered By Spark page? Thanks Asiainfo www.asiainfo.com Core, SQL, Streaming, MLlib, GraphX We leverage Spark and Hadoop ecosystem to build cost effective data center solution for our customer in teleco industry as well as other industrial sectors. Meantime we also build innovative big data applications to help our customer in real time marketing, cross product selling, customer behavior analysis as well as other areas by using Spark technology. Yi Tian
Re: saveAsSequenceFile for DStream
couple things to add here: 1) you can import the org.apache.spark.streaming.dstream.PairDStreamFunctions implicit which adds a whole ton of functionality to DStream itself. this lets you work at the DStream level versus digging into the underlying RDDs. 2) you can use ssc.fileStream(directory) to create an input stream made up of files in a given directory. new files will be added to the stream as they appear in that directory. note: files must be immutable. On Tue, Jul 22, 2014 at 8:39 AM, Barnaby Falls bfa...@outlook.com wrote: Thanks Sean! I got that working last night similar to how you solved it. Any ideas about how to monitor that same folder in another script by creating a stream? I can use sc.sequenceFile() to read in the RDD, but how do I get the name of the file that got added since there is no sequenceFileStream() method? Thanks again for your help. On Jul 22, 2014, at 1:57, Sean Owen so...@cloudera.com wrote: What about simply: dstream.foreachRDD(_.saveAsSequenceFile(...)) ? On Tue, Jul 22, 2014 at 2:06 AM, Barnaby bfa...@outlook.com wrote: First of all, I do not know Scala, but learning. I'm doing a proof of concept by streaming content from a socket, counting the words and write it to a Tachyon disk. A different script will read the file stream and print out the results. val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split( )) val wordCounts = words.map(x = (x, 1)).reduceByKey(_ + _) wordCounts.saveAs???Files(tachyon://localhost:19998/files/WordCounts) ssc.start() ssc.awaitTermination() I already did a proof of concept to write and read sequence files but there doesn't seem to be a saveAsSequenceFiles() method in DStream. What is the best way to write out an RDD to a stream so that the timestamps are in the filenames and so there is minimal overhead in reading the data back in as objects, see below. My simple successful proof was the following: val rdd = sc.parallelize(Array((a,2), (b,3), (c,1))) rdd.saveAsSequenceFile(tachyon://.../123.sf2) val rdd2 = sc.sequenceFile[String,Int](tachyon://.../123.sf2) How can I do something similar with streaming? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsSequenceFile-for-DStream-tp10369.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
How can a deserialized Java object be stored on disk?
Reading about RDD Persistency https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence, I learned that the storage level MEMORY_AND_DISK means that Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed. But how can a deserialized Java object be stored on disk? As far as I know, a Java object should be stored as an array of bytes on disk, which means that Java object should be firtly converted into an array of bytes (a serialized object). Thanks .
Re: data locality
you can view the Locality Level of each task within a stage by using the Spark Web UI under the Stages tab. levels are as follows (in order of decreasing desirability): 1) PROCESS_LOCAL - data was found directly in the executor JVM 2) NODE_LOCAL - data was found on the same node as the executor JVM 3) RACK_LOCAL - data was found in the same rack 4) ANY - outside the rack also, the Aggregated Metrics by Executor section of the Stage detail view shows how much data is being shuffled across the network (Shuffle Read/Write). 0 is where you wanna be with that metric. -chris On Fri, Jul 25, 2014 at 4:13 AM, Tsai Li Ming mailingl...@ltsai.com wrote: Hi, In the standalone mode, how can we check data locality is working as expected when tasks are assigned? Thanks! On 23 Jul, 2014, at 12:49 am, Sandy Ryza sandy.r...@cloudera.com wrote: On standalone there is still special handling for assigning tasks within executors. There just isn't special handling for where to place executors, because standalone generally places an executor on every node. On Mon, Jul 21, 2014 at 7:42 PM, Haopu Wang hw...@qilinsoft.com wrote: Sandy, I just tried the standalone cluster and didn't have chance to try Yarn yet. So if I understand correctly, there are **no** special handling of task assignment according to the HDFS block's location when Spark is running as a **standalone** cluster. Please correct me if I'm wrong. Thank you for your patience! -- *From:* Sandy Ryza [mailto:sandy.r...@cloudera.com] *Sent:* 2014年7月22日 9:47 *To:* user@spark.apache.org *Subject:* Re: data locality This currently only works for YARN. The standalone default is to place an executor on every node for every job. The total number of executors is specified by the user. -Sandy On Fri, Jul 18, 2014 at 2:00 AM, Haopu Wang hw...@qilinsoft.com wrote: Sandy, Do you mean the “preferred location” is working for standalone cluster also? Because I check the code of SparkContext and see comments as below: // This is used only by YARN for now, but should be relevant to other cluster types (*Mesos*, // etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It // contains a map from *hostname* to a list of input format splits on the host. *private*[spark] *var* preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map() BTW, even with the preferred hosts, how does Spark decide how many total executors to use for this application? Thanks again! -- *From:* Sandy Ryza [mailto:sandy.r...@cloudera.com] *Sent:* Friday, July 18, 2014 3:44 PM *To:* user@spark.apache.org *Subject:* Re: data locality Hi Haopu, Spark will ask HDFS for file block locations and try to assign tasks based on these. There is a snag. Spark schedules its tasks inside of executor processes that stick around for the lifetime of a Spark application. Spark requests executors before it runs any jobs, i.e. before it has any information about where the input data for the jobs is located. If the executors occupy significantly fewer nodes than exist in the cluster, it can be difficult for Spark to achieve data locality. The workaround for this is an API that allows passing in a set of preferred locations when instantiating a Spark context. This API is currently broken in Spark 1.0, and will likely changed to be something a little simpler in a future release. val locData = InputFormatInfo.computePreferredLocations (Seq(new InputFormatInfo(conf, classOf[TextInputFormat], new Path(“myfile.txt”))) val sc = new SparkContext(conf, locData) -Sandy On Fri, Jul 18, 2014 at 12:35 AM, Haopu Wang hw...@qilinsoft.com wrote: I have a standalone spark cluster and a HDFS cluster which share some of nodes. When reading HDFS file, how does spark assign tasks to nodes? Will it ask HDFS the location for each file block in order to get a right worker node? How about a spark cluster on Yarn? Thank you very much!
Re: Low Level Kafka Consumer for Spark
I'd be interested to understand this mechanism as well. But this is the error recovery part of the equation. Consuming from Kafka has two aspects - parallelism and error recovery and I am not sure how either works. For error recovery, I would like to understand how: - A failed receiver gets re-spawned. In 1.0.0, despite settings failed tasks threshold to 64, my job aborts after 4 receiver task failures. - Data loss recovery due to a failed receiver task/executor. For parallelism, I would expect a single createStream() to intelligently map a receiver thread somewhere, one for each kafka partition, but in different JVMs. Also, repartition() does not seem to work as advertised. A repartition(512) should get nodes other than the receiver nodes to get some RDDs to process. No? On Sat, Aug 30, 2014 at 7:14 PM, Roger Hoover roger.hoo...@gmail.com wrote: I have this same question. Isn't there somewhere that the Kafka range metadata can be saved? From my naive perspective, it seems like it should be very similar to HDFS lineage. The original HDFS blocks are kept somewhere (in the driver?) so that if an RDD partition is lost, it can be recomputed. In this case, all we need is the Kafka topic, partition, and offset range. Can someone enlighten us on why two copies of the RDD are needed (or some other mechanism like a WAL) for fault tolerance when using Kafka but not when reading from say HDFS? On Fri, Aug 29, 2014 at 8:58 AM, Jonathan Hodges hodg...@gmail.com wrote: 'this 2-node replication is mainly for failover in case the receiver dies while data is in flight. there's still chance for data loss as there's no write ahead log on the hot path, but this is being addressed.' Can you comment a little on how this will be addressed, will there be a durable WAL? Is there a JIRA for tracking this effort? I am curious without WAL if you can avoid this data loss with explicit management of Kafka offsets e.g. don't commit offset unless data is replicated to multiple nodes or maybe not until processed. The incoming data will always be durably stored to disk in Kafka so can be replayed in failure scenarios to avoid data loss if the offsets are managed properly. On Thu, Aug 28, 2014 at 12:02 PM, Chris Fregly ch...@fregly.com wrote: @bharat- overall, i've noticed a lot of confusion about how Spark Streaming scales - as well as how it handles failover and checkpointing, but we can discuss that separately. there's actually 2 dimensions to scaling here: receiving and processing. *Receiving* receiving can be scaled out by submitting new DStreams/Receivers to the cluster as i've done in the Kinesis example. in fact, i purposely chose to submit multiple receivers in my Kinesis example because i feel it should be the norm and not the exception - particularly for partitioned and checkpoint-capable streaming systems like Kafka and Kinesis. it's the only way to scale. a side note here is that each receiver running in the cluster will immediately replicates to 1 other node for fault-tolerance of that specific receiver. this is where the confusion lies. this 2-node replication is mainly for failover in case the receiver dies while data is in flight. there's still chance for data loss as there's no write ahead log on the hot path, but this is being addressed. this in mentioned in the docs here: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving *Processing* once data is received, tasks are scheduled across the Spark cluster just like any other non-streaming task where you can specify the number of partitions for reduces, etc. this is the part of scaling that is sometimes overlooked - probably because it works just like regular Spark, but it is worth highlighting. Here's a blurb in the docs: https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-processing the other thing that's confusing with Spark Streaming is that in Scala, you need to explicitly import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions in order to pick up the implicits that allow DStream.reduceByKey and such (versus DStream.transform(rddBatch = rddBatch.reduceByKey()) in other words, DStreams appear to be relatively featureless until you discover this implicit. otherwise, you need to operate on the underlying RDD's explicitly which is not ideal. the Kinesis example referenced earlier in the thread uses the DStream implicits. side note to all of this - i've recently convinced my publisher for my upcoming book, Spark In Action, to let me jump ahead and write the Spark Streaming chapter ahead of other more well-understood libraries. early release is in a month or so. sign up @ http://sparkinaction.com if you wanna get notified. shameless plug that i wouldn't otherwise do, but i really think it will help clear a lot of confusion in this area