Re: Serialization issue when using HBase with Spark
In #1, class HTable can not be serializable. You also need to check you self defined function getUserActions and make sure it is a member function of one class who implement serializable interface. 发自我的 iPad 在 2014年12月12日,下午4:35,yangliuyu yangli...@163.com 写道: The scenario is using HTable instance to scan multiple rowkey range in Spark tasks look likes below: Option 1: val users = input .map { case (deviceId, uid) = uid}.distinct().sortBy(x=x).mapPartitions(iterator={ val conf = HBaseConfiguration.create() val table = new HTable(conf, actions) val result = iterator.map{ userId= (userId, getUserActions(table, userId, timeStart, timeStop)) } table.close() result }) But got the exception: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1264) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:597) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:60)... ... Caused by: java.io.NotSerializableException: org.apache.hadoop.conf.Configuration at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) The reason not using sc.newAPIHadoopRDD is it only support one scan each time. val hbaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) And if using MultiTableInputFormat, driver is not possible put all rowkeys into HBaseConfiguration Option 2: sc.newAPIHadoopRDD(conf, classOf[MultiTableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) It may divide all rowkey ranges into several parts then use option 2, but I prefer option 1. So is there any solution for option 1? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Serialization-issue-when-using-HBase-with-Spark-tp20655.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: JSON Input files
Hi Helena and All, I have a below example JSON file format. My use case is to read NAME variable. When I execute I got next exception *Exception in thread main org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'NAME, tree:Project ['NAME] Subquery device* *Please let me know how to read values from JSON using Spark SQL* *CODE BLOCK :* *val device = sqlContext.jsonFile(hdfs://localhost:9000/user/rajesh/json/test.json) device.registerAsTable(device)device.printSchemaval results = sqlContext.sql(SELECT NAME FROM device).collect.foreach(println)* *JSON format :* { Device 1 : {NAME : Device 1, GROUP : 1, SITE : qqq, DIRECTION : East, } Device 2 : {NAME : Device 2, GROUP : 2, SITE : sss, DIRECTION : North, } } On Sat, Dec 13, 2014 at 10:13 PM, Helena Edelson helena.edel...@datastax.com wrote: One solution can be found here: https://spark.apache.org/docs/1.1.0/sql-programming-guide.html#json-datasets - Helena @helenaedelson On Dec 13, 2014, at 11:18 AM, Madabhattula Rajesh Kumar mrajaf...@gmail.com wrote: Hi Team, I have a large JSON file in Hadoop. Could you please let me know 1. How to read the JSON file 2. How to parse the JSON file Please share any example program based on Scala Regards, Rajesh
Re: Spark-SQL JDBC driver
I'll add that there is an experimental method that allows you to start the JDBC server with an existing HiveContext (which might have registered temporary tables). https://github.com/apache/spark/blob/master/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala#L42 On Thu, Dec 11, 2014 at 6:52 AM, Denny Lee denny.g@gmail.com wrote: Yes, that is correct. A quick reference on this is the post https://www.linkedin.com/pulse/20141007143323-732459-an-absolutely-unofficial-way-to-connect-tableau-to-sparksql-spark-1-1?_mSplash=1 with the pertinent section being: It is important to note that when you create Spark tables (for example, via the .registerTempTable) these are operating within the Spark environment which resides in a separate process than the Hive Metastore. This means that currently tables that are created within the Spark context are not available through the Thrift server. To achieve this, within the Spark context save your temporary table into Hive - then the Spark Thrift Server will be able to see the table. HTH! On Thu, Dec 11, 2014 at 04:09 Anas Mosaad anas.mos...@incorta.com wrote: Actually I came to a conclusion that RDDs has to be persisted in hive in order to be able to access through thrift. Hope I didn't end up with incorrect conclusion. Please someone correct me if I am wrong. On Dec 11, 2014 8:53 AM, Judy Nash judyn...@exchange.microsoft.com wrote: Looks like you are wondering why you cannot see the RDD table you have created via thrift? Based on my own experience with spark 1.1, RDD created directly via Spark SQL (i.e. Spark Shell or Spark-SQL.sh) is not visible on thrift, since thrift has its own session containing its own RDD. Spark SQL experts on the forum can confirm on this though. *From:* Cheng Lian [mailto:lian.cs@gmail.com] *Sent:* Tuesday, December 9, 2014 6:42 AM *To:* Anas Mosaad *Cc:* Judy Nash; user@spark.apache.org *Subject:* Re: Spark-SQL JDBC driver According to the stacktrace, you were still using SQLContext rather than HiveContext. To interact with Hive, HiveContext *must* be used. Please refer to this page http://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables On 12/9/14 6:26 PM, Anas Mosaad wrote: Back to the first question, this will mandate that hive is up and running? When I try it, I get the following exception. The documentation says that this method works only on SchemaRDD. I though that countries.saveAsTable did not work for that a reason so I created a tmp that contains the results from the registered temp table. Which I could validate that it's a SchemaRDD as shown below. * @Judy,* I do really appreciate your kind support and I want to understand and off course don't want to wast your time. If you can direct me the documentation describing this details, this will be great. scala val tmp = sqlContext.sql(select * from countries) tmp: org.apache.spark.sql.SchemaRDD = SchemaRDD[12] at RDD at SchemaRDD.scala:108 == Query Plan == == Physical Plan == PhysicalRDD [COUNTRY_ID#20,COUNTRY_ISO_CODE#21,COUNTRY_NAME#22,COUNTRY_SUBREGION#23,COUNTRY_SUBREGION_ID#24,COUNTRY_REGION#25,COUNTRY_REGION_ID#26,COUNTRY_TOTAL#27,COUNTRY_TOTAL_ID#28,COUNTRY_NAME_HIST#29], MapPartitionsRDD[9] at mapPartitions at ExistingRDD.scala:36 scala tmp.saveAsTable(Countries) org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved plan found, tree: 'CreateTableAsSelect None, Countries, false, None Project [COUNTRY_ID#20,COUNTRY_ISO_CODE#21,COUNTRY_NAME#22,COUNTRY_SUBREGION#23,COUNTRY_SUBREGION_ID#24,COUNTRY_REGION#25,COUNTRY_REGION_ID#26,COUNTRY_TOTAL#27,COUNTRY_TOTAL_ID#28,COUNTRY_NAME_HIST#29] Subquery countries LogicalRDD [COUNTRY_ID#20,COUNTRY_ISO_CODE#21,COUNTRY_NAME#22,COUNTRY_SUBREGION#23,COUNTRY_SUBREGION_ID#24,COUNTRY_REGION#25,COUNTRY_REGION_ID#26,COUNTRY_TOTAL#27,COUNTRY_TOTAL_ID#28,COUNTRY_NAME_HIST#29], MapPartitionsRDD[9] at mapPartitions at ExistingRDD.scala:36 at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:83) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:78) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:78) at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:76) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) at
Re: SchemaRDD partition on specific column values?
I'm happy to discuss what it would take to make sure we can propagate this information correctly. Please open a JIRA (and mention me in it). Regarding including it in 1.2.1, it depends on how invasive the change ends up being, but it is certainly possible. On Thu, Dec 11, 2014 at 3:55 AM, nitin nitin2go...@gmail.com wrote: Can we take this as a performance improvement task in Spark-1.2.1? I can help contribute for this. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-partition-on-specific-column-values-tp20350p20623.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: JSON Input files
Pay attention to your JSON file, try to change it like following. Each record represent as a JSON string. {NAME : Device 1, GROUP : 1, SITE : qqq, DIRECTION : East, } {NAME : Device 2, GROUP : 2, SITE : sss, DIRECTION : North, } 在 2014年12月14日,下午5:01,Madabhattula Rajesh Kumar mrajaf...@gmail.com 写道: { Device 1 : {NAME : Device 1, GROUP : 1, SITE : qqq, DIRECTION : East, } Device 2 : {NAME : Device 2, GROUP : 2, SITE : sss, DIRECTION : North, } } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Read data from SparkStreaming from Java socket.
Why doesn't it work?? I guess that it's the same with \n. 2014-12-13 12:56 GMT+01:00 Guillermo Ortiz konstt2...@gmail.com: I got it, thanks,, a silly question,, why if I do: out.write(hello + System.currentTimeMillis() + \n); it doesn't detect anything and if I do out.println(hello + System.currentTimeMillis()); it works?? I'm doing with spark val errorLines = lines.filter(_.contains(hello)) 2014-12-13 8:12 GMT+01:00 Tathagata Das tathagata.das1...@gmail.com: Yes, socketTextStream starts a TCP client that tries to connect to a TCP server (localhost: in your case). If there is a server running on that port that can send data to connected TCP connections, then you will receive data in the stream. Did you check out the quick example in the streaming programming guide? http://spark.apache.org/docs/latest/streaming-programming-guide.html That has instructions to start a netcat server on port and send data to spark streaming through that. TD On Fri, Dec 12, 2014 at 9:54 PM, Akhil Das ak...@sigmoidanalytics.com wrote: socketTextStream is Socket client which will read from a TCP ServerSocket. Thanks Best Regards On Fri, Dec 12, 2014 at 7:21 PM, Guillermo Ortiz konstt2...@gmail.com wrote: I dont' understand what spark streaming socketTextStream is waiting... is it like a server so you just have to send data from a client?? or what's it excepting? 2014-12-12 14:19 GMT+01:00 Akhil Das ak...@sigmoidanalytics.com: I have created a Serversocket program which you can find over here https://gist.github.com/akhld/4286df9ab0677a555087 It simply listens to the given port and when the client connects, it will send the contents of the given file. I'm attaching the executable jar also, you can run the jar as: java -jar SocketBenchmark.jar student 12345 io Here student is the file which will be sent to the client whoever connects on 12345, i have it tested and is working with SparkStreaming (socketTextStream). Thanks Best Regards On Fri, Dec 12, 2014 at 6:25 PM, Guillermo Ortiz konstt2...@gmail.com wrote: Hi, I'm a newbie with Spark,, I'm just trying to use SparkStreaming and filter some data sent with a Java Socket but it's not working... it works when I use ncat Why is it not working?? My sparkcode is just this: val sparkConf = new SparkConf().setMaster(local[2]).setAppName(Test) val ssc = new StreamingContext(sparkConf, Seconds(5)) val lines = ssc.socketTextStream(localhost, ) val errorLines = lines.filter(_.contains(hello)) errorLines.print() I created a client socket which sends data to that port, but it could connect any address, I guess that Spark doesn't work like a serverSocket... what's the way to send data from a socket with Java to be able to read from socketTextStream?? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Read data from SparkStreaming from Java socket.
Are you using a bufferedPrintWriter? that's probably a different flushing behaviour. Try doing out.flush() after out.write(...) and you will have the same result. This is Spark unrelated btw. -kr, Gerard.
Re: MLLIB model export: PMML vs MLLIB serialization
Hi Sourabh, have a look at https://issues.apache.org/jira/browse/SPARK-1406, I am looking into exporting models in PMML using JPMML. Regards, Vincenzo -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/MLLIB-model-export-PMML-vs-MLLIB-serialization-tp20324p20674.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Read data from SparkStreaming from Java socket.
Thanks. 2014-12-14 12:20 GMT+01:00 Gerard Maas gerard.m...@gmail.com: Are you using a bufferedPrintWriter? that's probably a different flushing behaviour. Try doing out.flush() after out.write(...) and you will have the same result. This is Spark unrelated btw. -kr, Gerard. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Having problem with Spark streaming with Kinesis
The reason is because of the following code: val numStreams = numShards val kinesisStreams = (0 until numStreams).map { i = KinesisUtils.createStream(ssc, streamName, endpointUrl, kinesisCheckpointInterval, InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2) } In the above code, numStreams is set as numShards. This enforces the need to have #shards + 1 workers. If you set numStreams as Math.min(numShards, numAvailableWorkers - 1), you can have lesser number of workers than number of shards. Makes sense? On Sun Dec 14 2014 at 10:06:36 A.K.M. Ashrafuzzaman ashrafuzzaman...@gmail.com wrote: Thanks Aniket, The trick is to have the #workers = #shards + 1. But I don’t know why is that. http://spark.apache.org/docs/latest/streaming-kinesis-integration.html Here in the figure[spark streaming kinesis architecture], it seems like one node should be able to take on more than one shards. A.K.M. Ashrafuzzaman Lead Software Engineer NewsCred http://www.newscred.com/ (M) 880-175-5592433 Twitter https://twitter.com/ashrafuzzaman | Blog http://jitu-blog.blogspot.com/ | Facebook https://www.facebook.com/ashrafuzzaman.jitu Check out The Academy http://newscred.com/theacademy, your #1 source for free content marketing resources On Nov 26, 2014, at 6:23 PM, A.K.M. Ashrafuzzaman ashrafuzzaman...@gmail.com wrote: Hi guys, When we are using Kinesis with 1 shard then it works fine. But when we use more that 1 then it falls into an infinite loop and no data is processed by the spark streaming. In the kinesis dynamo DB, I can see that it keeps increasing the leaseCounter. But it do start processing. I am using, scala: 2.10.4 java version: 1.8.0_25 Spark: 1.1.0 spark-streaming-kinesis-asl: 1.1.0 A.K.M. Ashrafuzzaman Lead Software Engineer NewsCred http://www.newscred.com/ (M) 880-175-5592433 Twitter https://twitter.com/ashrafuzzaman | Blog http://jitu-blog.blogspot.com/ | Facebook https://www.facebook.com/ashrafuzzaman.jitu Check out The Academy http://newscred.com/theacademy, your #1 source for free content marketing resources
Re: JSON Input files
Thank you Yanbo Regards, Rajesh On Sun, Dec 14, 2014 at 3:15 PM, Yanbo yanboha...@gmail.com wrote: Pay attention to your JSON file, try to change it like following. Each record represent as a JSON string. {NAME : Device 1, GROUP : 1, SITE : qqq, DIRECTION : East, } {NAME : Device 2, GROUP : 2, SITE : sss, DIRECTION : North, } 在 2014年12月14日,下午5:01,Madabhattula Rajesh Kumar mrajaf...@gmail.com 写道: { Device 1 : {NAME : Device 1, GROUP : 1, SITE : qqq, DIRECTION : East, } Device 2 : {NAME : Device 2, GROUP : 2, SITE : sss, DIRECTION : North, } }
pyspark is crashing in this case. why?
Hi, My environment is: standalone spark 1.1.1 on windows 8.1 pro. The following case works fine: a = [1,2,3,4,5,6,7,8,9] b = [] for x in range(10): ... b.append(a) ... rdd1 = sc.parallelize(b) rdd1.first() [1, 2, 3, 4, 5, 6, 7, 8, 9] The following case does not work. The only difference is the size of the array. Note the loop range: 100K vs. 1M. a = [1,2,3,4,5,6,7,8,9] b = [] for x in range(100): ... b.append(a) ... rdd1 = sc.parallelize(b) rdd1.first() 14/12/14 07:52:19 ERROR PythonRDD: Python worker exited unexpectedly (crashed) java.net.SocketException: Connection reset by peer: socket write error at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(Unknown Source) at java.net.SocketOutputStream.write(Unknown Source) at java.io.BufferedOutputStream.flushBuffer(Unknown Source) at java.io.BufferedOutputStream.write(Unknown Source) at java.io.DataOutputStream.write(Unknown Source) at java.io.FilterOutputStream.write(Unknown Source) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$ 1.apply(PythonRDD.scala:341) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$ 1.apply(PythonRDD.scala:339) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRD D.scala:339) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app ly$mcV$sp(PythonRDD.scala:209) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app ly(PythonRDD.scala:184) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app ly(PythonRDD.scala:184) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1364) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scal a:183) What I have tried: 1. Replaced JRE 32bit with JRE64 2. Multiple configurations when I start pyspark: --driver-memory, --executor-memory 3. Tried to set the SparkConf with different settings 4. Tried also with spark 1.1.0 Being new to Spark, I am sure that it is something simple that I am missing and would appreciate any thoughts. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-is-crashing-in-this-case-why-tp20675.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
MLlib vs Madlib
Can somebody throw light on MLlib vs Madlib? Which is better for machine learning? and are there any specific use case scenarios MLlib or Madlib will shine in? Regards, Venkat Ankam This communication is the property of CenturyLink and may contain confidential or privileged information. Unauthorized use of this communication is strictly prohibited and may be unlawful. If you have received this communication in error, please immediately notify the sender by reply e-mail and destroy all copies of the communication and any attachments.
Limit the # of columns in Spark Scala
I have a large of files within HDFS that I would like to do a group by statement ala val table = sc.textFile(hdfs://) val tabs = table.map(_.split(\t)) I'm trying to do something similar to tabs.map(c = (c._(167), c._(110), c._(200)) where I create a new RDD that only has but that isn't quite right because I'm not really manipulating sequences. BTW, I cannot use SparkSQL / case right now because my table has 200 columns (and I'm on Scala 2.10.3) Thanks! Denny
Re: Limit the # of columns in Spark Scala
Hi, I don't get what the problem is. That map to selected columns looks like the way to go given the context. What's not working? Kr, Gerard On Dec 14, 2014 5:17 PM, Denny Lee denny.g@gmail.com wrote: I have a large of files within HDFS that I would like to do a group by statement ala val table = sc.textFile(hdfs://) val tabs = table.map(_.split(\t)) I'm trying to do something similar to tabs.map(c = (c._(167), c._(110), c._(200)) where I create a new RDD that only has but that isn't quite right because I'm not really manipulating sequences. BTW, I cannot use SparkSQL / case right now because my table has 200 columns (and I'm on Scala 2.10.3) Thanks! Denny
Re: Limit the # of columns in Spark Scala
Getting a bunch of syntax errors. Let me get back with the full statement and error later today. Thanks for verifying my thinking wasn't out in left field. On Sun, Dec 14, 2014 at 08:56 Gerard Maas gerard.m...@gmail.com wrote: Hi, I don't get what the problem is. That map to selected columns looks like the way to go given the context. What's not working? Kr, Gerard On Dec 14, 2014 5:17 PM, Denny Lee denny.g@gmail.com wrote: I have a large of files within HDFS that I would like to do a group by statement ala val table = sc.textFile(hdfs://) val tabs = table.map(_.split(\t)) I'm trying to do something similar to tabs.map(c = (c._(167), c._(110), c._(200)) where I create a new RDD that only has but that isn't quite right because I'm not really manipulating sequences. BTW, I cannot use SparkSQL / case right now because my table has 200 columns (and I'm on Scala 2.10.3) Thanks! Denny
Re: MLlib vs Madlib
MADLib (http://madlib.net/) was designed to bring large-scale ML techniques to a relational database, primarily postgresql. MLlib assumes the data exists in some Spark-compatible data format. I would suggest you pick the library that matches your data platform first. DISCLAIMER: I am the original author of MADLib, though EMC/Pivotal assumed ownership rather quickly. ~~ May All Your Sequences Converge On Dec 14, 2014, at 6:26 AM, Venkat, Ankam ankam.ven...@centurylink.com wrote: Can somebody throw light on MLlib vs Madlib? Which is better for machine learning? and are there any specific use case scenarios MLlib or Madlib will shine in? Regards, Venkat Ankam This communication is the property of CenturyLink and may contain confidential or privileged information. Unauthorized use of this communication is strictly prohibited and may be unlawful. If you have received this communication in error, please immediately notify the sender by reply e-mail and destroy all copies of the communication and any attachments.
DStream demultiplexer based on a key
Hey, I am doing an experiment with Spark Streaming consisting of moving data from Kafka to S3 locations while partitioning by date. I have already looked into Linked Camus and Pinterest Secor and while both are workable solutions, it just feels that Spark Streaming should be able to be on par with those without having to manage yet another application in our stack since we already have a Spark Streaming cluster in production. So what I am trying to do is very simple really. Each message in Kafka is thrift serialized, and the corresponding thrift objects have a timestamp field. What I'd like is to do is something like that: JavaPairDStream stream = KafkaUtils.createRawStream(...) stream = stream.map(new PairFunctionTuple2Void, Log, String, Log { public Tuple2String, Log call(Tuple2Void, Log tuple) { return new Tuple2(tuple._2().getDate(), tuple._2()); } } At this point, I'd like to do some partitioning on the resulting DStream to have multiple DStream each with a single common string Date... So for instance in one DStream I would have all the entries from 12/01 and on another the entries from 12/02. Once I have this list of DStream, for each of them I would call saveAsObjectFiles() basically. I unfortunately did not find a way to demultiplex DStream based on a key. Obviously the reduce operation families does some of that but the result is still a single DStream. An alternative approach would be to call forEachRDD() on the DStream and demultiplex the entries into multiple new RDDs based on the timestamp to bucketize the entries with the same day date in the same RDD and finally call saveAsObjectFiles(). I am not sure if I can use parallelize() to create those RDDs? Another thing that I am gonna be experimenting with is to use much longer batching interval. I am talking in minutes because I don't want to have bunch of tiny files. I might simply use a bigger Duration or use one of the window operation. Not sure if anybody tries running Spark Streaming in that way. Any thoughts on that would be much appreciated, Thanks!
Re: pyspark is crashing in this case. why?
How much executor-memory are you setting for the JVM? What about the Driver JVM memory? Also check the Windows Event Log for Out of memory errors for one of the 2 above JVMs. On Dec 14, 2014 6:04 AM, genesis fatum genesis.fa...@gmail.com wrote: Hi, My environment is: standalone spark 1.1.1 on windows 8.1 pro. The following case works fine: a = [1,2,3,4,5,6,7,8,9] b = [] for x in range(10): ... b.append(a) ... rdd1 = sc.parallelize(b) rdd1.first() [1, 2, 3, 4, 5, 6, 7, 8, 9] The following case does not work. The only difference is the size of the array. Note the loop range: 100K vs. 1M. a = [1,2,3,4,5,6,7,8,9] b = [] for x in range(100): ... b.append(a) ... rdd1 = sc.parallelize(b) rdd1.first() 14/12/14 07:52:19 ERROR PythonRDD: Python worker exited unexpectedly (crashed) java.net.SocketException: Connection reset by peer: socket write error at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(Unknown Source) at java.net.SocketOutputStream.write(Unknown Source) at java.io.BufferedOutputStream.flushBuffer(Unknown Source) at java.io.BufferedOutputStream.write(Unknown Source) at java.io.DataOutputStream.write(Unknown Source) at java.io.FilterOutputStream.write(Unknown Source) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$ 1.apply(PythonRDD.scala:341) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$ 1.apply(PythonRDD.scala:339) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRD D.scala:339) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app ly$mcV$sp(PythonRDD.scala:209) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app ly(PythonRDD.scala:184) at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app ly(PythonRDD.scala:184) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1364) at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scal a:183) What I have tried: 1. Replaced JRE 32bit with JRE64 2. Multiple configurations when I start pyspark: --driver-memory, --executor-memory 3. Tried to set the SparkConf with different settings 4. Tried also with spark 1.1.0 Being new to Spark, I am sure that it is something simple that I am missing and would appreciate any thoughts. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-is-crashing-in-this-case-why-tp20675.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: DStream demultiplexer based on a key
Hi Jean-Pascal, At Virdata we do a similar thing to 'bucketize' our data to different keyspaces in Cassandra. The basic construction would be to filter the DStream (or the underlying RDD) for each key and then apply the usual storage operations on that new data set. Given that, in your case, you need the data within the stream to apply the filter, you will need first to collect those keys in order to create the buckets. Something like this: val kafkaStream = ??? kafkaStream.foreachRDD{rdd = rdd.cache() // very important! val keys = rdd.map(elem = key(elem)).distinct.collect // where key(...) is a function to get the desired key from each record keys.foreach{ key = rdd.filter(elem= key(elem) == key).saveAsObjectFile(...) } rdd.unpersist() } -kr, Gerard. On Sun, Dec 14, 2014 at 7:50 PM, Jean-Pascal Billaud j...@tellapart.com wrote: Hey, I am doing an experiment with Spark Streaming consisting of moving data from Kafka to S3 locations while partitioning by date. I have already looked into Linked Camus and Pinterest Secor and while both are workable solutions, it just feels that Spark Streaming should be able to be on par with those without having to manage yet another application in our stack since we already have a Spark Streaming cluster in production. So what I am trying to do is very simple really. Each message in Kafka is thrift serialized, and the corresponding thrift objects have a timestamp field. What I'd like is to do is something like that: JavaPairDStream stream = KafkaUtils.createRawStream(...) stream = stream.map(new PairFunctionTuple2Void, Log, String, Log { public Tuple2String, Log call(Tuple2Void, Log tuple) { return new Tuple2(tuple._2().getDate(), tuple._2()); } } At this point, I'd like to do some partitioning on the resulting DStream to have multiple DStream each with a single common string Date... So for instance in one DStream I would have all the entries from 12/01 and on another the entries from 12/02. Once I have this list of DStream, for each of them I would call saveAsObjectFiles() basically. I unfortunately did not find a way to demultiplex DStream based on a key. Obviously the reduce operation families does some of that but the result is still a single DStream. An alternative approach would be to call forEachRDD() on the DStream and demultiplex the entries into multiple new RDDs based on the timestamp to bucketize the entries with the same day date in the same RDD and finally call saveAsObjectFiles(). I am not sure if I can use parallelize() to create those RDDs? Another thing that I am gonna be experimenting with is to use much longer batching interval. I am talking in minutes because I don't want to have bunch of tiny files. I might simply use a bigger Duration or use one of the window operation. Not sure if anybody tries running Spark Streaming in that way. Any thoughts on that would be much appreciated, Thanks!
Re: DStream demultiplexer based on a key
Ah! That sounds very much like what I need. A very basic question (most likely), why is rdd.cache() critical? Isn't it already true that in Spark Streaming DStream are cached in memory anyway? Also any experience with minutes long batch interval? Thanks for the quick answer! On Sun, Dec 14, 2014 at 11:17 AM, Gerard Maas gerard.m...@gmail.com wrote: Hi Jean-Pascal, At Virdata we do a similar thing to 'bucketize' our data to different keyspaces in Cassandra. The basic construction would be to filter the DStream (or the underlying RDD) for each key and then apply the usual storage operations on that new data set. Given that, in your case, you need the data within the stream to apply the filter, you will need first to collect those keys in order to create the buckets. Something like this: val kafkaStream = ??? kafkaStream.foreachRDD{rdd = rdd.cache() // very important! val keys = rdd.map(elem = key(elem)).distinct.collect // where key(...) is a function to get the desired key from each record keys.foreach{ key = rdd.filter(elem= key(elem) == key).saveAsObjectFile(...) } rdd.unpersist() } -kr, Gerard. On Sun, Dec 14, 2014 at 7:50 PM, Jean-Pascal Billaud j...@tellapart.com wrote: Hey, I am doing an experiment with Spark Streaming consisting of moving data from Kafka to S3 locations while partitioning by date. I have already looked into Linked Camus and Pinterest Secor and while both are workable solutions, it just feels that Spark Streaming should be able to be on par with those without having to manage yet another application in our stack since we already have a Spark Streaming cluster in production. So what I am trying to do is very simple really. Each message in Kafka is thrift serialized, and the corresponding thrift objects have a timestamp field. What I'd like is to do is something like that: JavaPairDStream stream = KafkaUtils.createRawStream(...) stream = stream.map(new PairFunctionTuple2Void, Log, String, Log { public Tuple2String, Log call(Tuple2Void, Log tuple) { return new Tuple2(tuple._2().getDate(), tuple._2()); } } At this point, I'd like to do some partitioning on the resulting DStream to have multiple DStream each with a single common string Date... So for instance in one DStream I would have all the entries from 12/01 and on another the entries from 12/02. Once I have this list of DStream, for each of them I would call saveAsObjectFiles() basically. I unfortunately did not find a way to demultiplex DStream based on a key. Obviously the reduce operation families does some of that but the result is still a single DStream. An alternative approach would be to call forEachRDD() on the DStream and demultiplex the entries into multiple new RDDs based on the timestamp to bucketize the entries with the same day date in the same RDD and finally call saveAsObjectFiles(). I am not sure if I can use parallelize() to create those RDDs? Another thing that I am gonna be experimenting with is to use much longer batching interval. I am talking in minutes because I don't want to have bunch of tiny files. I might simply use a bigger Duration or use one of the window operation. Not sure if anybody tries running Spark Streaming in that way. Any thoughts on that would be much appreciated, Thanks!
HTTP 500 Error for SparkUI in YARN Cluster mode
I got this error when I click Track URL: ApplicationMaster when I run a spark job in YARN cluster mode. I found this jira https://issues.apache.org/jira/browse/YARN-800, but I could not get this problem fixed. I'm running CDH 5.1.0 with Both HDFS and RM HA enabled. Does anybody has the similar issue? How do you fix this?HTTP ERROR 500 Problem accessing /proxy/application_1418016558670_0193/. Reason: Connection refused Caused by: java.net.ConnectException: Connection refused at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
Re: DStream demultiplexer based on a key
I haven't done anything else than performance tuning on Spark Streaming for the past weeks. rdd.cache makes a huge difference. A must in this case where you want to iterate over the same RDD several times. Intuitively, I also thought that all data was in memory already so that wouldn't make a difference and I was very surprised to see stage times dropping from seconds to ms when cache() was present. Our intervals are 10-12 seconds long. I've not tried batches of minutes yet. Probably the best way would be to use window functions for that. Although something in the 1-5 minute range should be doable as well. -kr, Gerard. On Sun, Dec 14, 2014 at 8:25 PM, Jean-Pascal Billaud j...@tellapart.com wrote: Ah! That sounds very much like what I need. A very basic question (most likely), why is rdd.cache() critical? Isn't it already true that in Spark Streaming DStream are cached in memory anyway? Also any experience with minutes long batch interval? Thanks for the quick answer! On Sun, Dec 14, 2014 at 11:17 AM, Gerard Maas gerard.m...@gmail.com wrote: Hi Jean-Pascal, At Virdata we do a similar thing to 'bucketize' our data to different keyspaces in Cassandra. The basic construction would be to filter the DStream (or the underlying RDD) for each key and then apply the usual storage operations on that new data set. Given that, in your case, you need the data within the stream to apply the filter, you will need first to collect those keys in order to create the buckets. Something like this: val kafkaStream = ??? kafkaStream.foreachRDD{rdd = rdd.cache() // very important! val keys = rdd.map(elem = key(elem)).distinct.collect // where key(...) is a function to get the desired key from each record keys.foreach{ key = rdd.filter(elem= key(elem) == key).saveAsObjectFile(...) } rdd.unpersist() } -kr, Gerard. On Sun, Dec 14, 2014 at 7:50 PM, Jean-Pascal Billaud j...@tellapart.com wrote: Hey, I am doing an experiment with Spark Streaming consisting of moving data from Kafka to S3 locations while partitioning by date. I have already looked into Linked Camus and Pinterest Secor and while both are workable solutions, it just feels that Spark Streaming should be able to be on par with those without having to manage yet another application in our stack since we already have a Spark Streaming cluster in production. So what I am trying to do is very simple really. Each message in Kafka is thrift serialized, and the corresponding thrift objects have a timestamp field. What I'd like is to do is something like that: JavaPairDStream stream = KafkaUtils.createRawStream(...) stream = stream.map(new PairFunctionTuple2Void, Log, String, Log { public Tuple2String, Log call(Tuple2Void, Log tuple) { return new Tuple2(tuple._2().getDate(), tuple._2()); } } At this point, I'd like to do some partitioning on the resulting DStream to have multiple DStream each with a single common string Date... So for instance in one DStream I would have all the entries from 12/01 and on another the entries from 12/02. Once I have this list of DStream, for each of them I would call saveAsObjectFiles() basically. I unfortunately did not find a way to demultiplex DStream based on a key. Obviously the reduce operation families does some of that but the result is still a single DStream. An alternative approach would be to call forEachRDD() on the DStream and demultiplex the entries into multiple new RDDs based on the timestamp to bucketize the entries with the same day date in the same RDD and finally call saveAsObjectFiles(). I am not sure if I can use parallelize() to create those RDDs? Another thing that I am gonna be experimenting with is to use much longer batching interval. I am talking in minutes because I don't want to have bunch of tiny files. I might simply use a bigger Duration or use one of the window operation. Not sure if anybody tries running Spark Streaming in that way. Any thoughts on that would be much appreciated, Thanks!
Re: Limit the # of columns in Spark Scala
BTW, I cannot use SparkSQL / case right now because my table has 200 columns (and I'm on Scala 2.10.3) You can still apply the schema programmatically: http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema
Re: Trouble with cache() and parquet
For many operations, Spark SQL will just pass the data through without looking at it. Caching, in contrast, has to process the data so that we can build up compressed column buffers. So the schema is mismatched in both cases, but only the caching case shows it. Based on the exception, it looks more like there is a type mismatch (the metastore is reporting an Integer, but the parquet data is actually producing a String). On Thu, Dec 11, 2014 at 6:38 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: I see -- they are the same in design but the difference comes from partitioned Hive tables: when the RDD is generated by querying an external Hive metastore, the partition is appended as part of the row, and shows up as part of the schema. Can you shed some light on why this is a problem: last2HourRdd.first -- works ok last2HourRdd.cache() last2HourRdd.first -- does not work The first call shows K+1 columns (and so does print schema, where K columns are from the backing parquet files and the K+1st is the partition inlined. My impression is that the second call to .first would just force the cache() call and dump out that RDD to disk (with all of it's K+1 columns and store the schema info, again with K+1 columns), and then just return a single entry. I am not sure why the fact that Hive metastore exposes an extra column over the raw parquet file is a problem since it does so both on the schema and in the data: last2HourRdd.schema.fields.length reports K+1, and so does last2HourRdd.first.length. I also tried calling sqlContext.applySchema(last2HourRdd,parquetFile.schema) before caching but it does not fix the issue. The only workaround I've come up with so far is to replace select * with a select list_of_columns. But I'd love to understand a little better why the cache call trips this scenario On Wed, Dec 10, 2014 at 3:50 PM, Michael Armbrust mich...@databricks.com wrote: Have you checked to make sure the schema in the metastore matches the schema in the parquet file? One way to test would be to just use sqlContext.parquetFile(...) which infers the schema from the file instead of using the metastore. On Wed, Dec 10, 2014 at 12:46 PM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Hi folks, wondering if anyone has thoughts. Trying to create something akin to a materialized view (sqlContext is a HiveContext connected to external metastore): val last2HourRdd = sqlContext.sql(sselect * from mytable) //last2HourRdd.first prints out a org.apache.spark.sql.Row = [...] with valid data last2HourRdd.cache() //last2HourRdd.first now fails in an executor with the following: In the driver: 14/12/10 20:24:01 INFO TaskSetManager: Starting task 0.1 in stage 25.0 (TID 35, iphere, NODE_LOCAL, 2170 bytes) 14/12/10 20:24:01 INFO TaskSetManager: Lost task 0.1 in stage 25.0 (TID 35) on executor iphere: java.lang.ClassCastException (null) [duplicate 1] And in executor: 14/12/10 19:56:57 ERROR Executor: Exception in task 0.1 in stage 20.0 (TID 27) java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106) at org.apache.spark.sql.catalyst.expressions.MutableInt.update(SpecificMutableRow.scala:73) at org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.update(SpecificMutableRow.scala:231) at org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.setString(SpecificMutableRow.scala:236) at org.apache.spark.sql.columnar.STRING$.setField(ColumnType.scala:328) at org.apache.spark.sql.columnar.STRING$.setField(ColumnType.scala:310) at org.apache.spark.sql.columnar.compression.RunLengthEncoding$Decoder.next(compressionSchemes.scala:168) at org.apache.spark.sql.columnar.compression.CompressibleColumnAccessor$class.extractSingle(CompressibleColumnAccessor.scala:37) at org.apache.spark.sql.columnar.NativeColumnAccessor.extractSingle(ColumnAccessor.scala:64) at org.apache.spark.sql.columnar.BasicColumnAccessor.extractTo(ColumnAccessor.scala:54) at org.apache.spark.sql.columnar.NativeColumnAccessor.org$apache$spark$sql$columnar$NullableColumnAccessor$$super$extractTo(ColumnAccessor.scala:64) at org.apache.spark.sql.columnar.NullableColumnAccessor$class.extractTo(NullableColumnAccessor.scala:52) at org.apache.spark.sql.columnar.NativeColumnAccessor.extractTo(ColumnAccessor.scala:64) at org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:279) at org.apache.spark.sql.columnar.InMemoryColumnarTableScan$$anonfun$9$$anonfun$14$$anon$2.next(InMemoryColumnarTableScan.scala:275) at scala.collection.Iterator$$anon$13.next(Iterator.scala:372) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at
Re: Limit the # of columns in Spark Scala
Yes - that works great! Sorry for implying I couldn't. Was just more flummoxed that I couldn't make the Scala call work on its own. Will continue to debug ;-) On Sun, Dec 14, 2014 at 11:39 Michael Armbrust mich...@databricks.com wrote: BTW, I cannot use SparkSQL / case right now because my table has 200 columns (and I'm on Scala 2.10.3) You can still apply the schema programmatically: http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema
spark kafka batch integration
hello all, we at tresata wrote a library to provide for batch integration between spark and kafka (distributed write of rdd to kafa, distributed read of rdd from kafka). our main use cases are (in lambda architecture jargon): * period appends to the immutable master dataset on hdfs from kafka using spark * make non-streaming data available in kafka with periodic data drops from hdfs using spark. this is to facilitate merging the speed and batch layer in spark-streaming * distributed writes from spark-streaming see here: https://github.com/tresata/spark-kafka best, koert
Re: Limit the # of columns in Spark Scala
Denny, I am not sure what exception you're observing but I've had luck with 2 things: val table = sc.textFile(hdfs://) You can try calling table.first here and you'll see the first line of the file. You can also do val debug = table.first.split(\t) which would give you an array and you can indeed verify that the array contains what you want in positions 167,119 and 200. In the case of large files with a random bad line I find wrapping the call within the map in try/catch very valuable -- you can dump out the whole line in the catch statement Lastly I would guess that you're getting a compile error and not a runtime error -- I believe c is an array of values so I think you want tabs.map(c = (c(167), c(110), c(200)) instead of tabs.map(c = (c._(167), c._(110), c._(200)) On Sun, Dec 14, 2014 at 3:12 PM, Denny Lee denny.g@gmail.com wrote: Yes - that works great! Sorry for implying I couldn't. Was just more flummoxed that I couldn't make the Scala call work on its own. Will continue to debug ;-) On Sun, Dec 14, 2014 at 11:39 Michael Armbrust mich...@databricks.com wrote: BTW, I cannot use SparkSQL / case right now because my table has 200 columns (and I'm on Scala 2.10.3) You can still apply the schema programmatically: http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema
RE: MLlib vs Madlib
Thanks for the info Brian. I am trying to compare performance difference between Pivotal HAWQ/Greenplum with MADlib vs HDFS with MLlib. Do you think Spark MLlib will perform better because of in-memory, caching and iterative processing capabilities? I need to perform large scale text analytics and I can data store on HDFS or on Pivotal Greenplum/Hawq. Regards, Venkat Ankam From: Brian Dolan [mailto:buddha_...@yahoo.com] Sent: Sunday, December 14, 2014 10:02 AM To: Venkat, Ankam Cc: 'user@spark.apache.org' Subject: Re: MLlib vs Madlib MADLib (http://madlib.net/) was designed to bring large-scale ML techniques to a relational database, primarily postgresql. MLlib assumes the data exists in some Spark-compatible data format. I would suggest you pick the library that matches your data platform first. DISCLAIMER: I am the original author of MADLib, though EMC/Pivotal assumed ownership rather quickly. ~~ May All Your Sequences Converge On Dec 14, 2014, at 6:26 AM, Venkat, Ankam ankam.ven...@centurylink.commailto:ankam.ven...@centurylink.com wrote: Can somebody throw light on MLlib vs Madlib? Which is better for machine learning? and are there any specific use case scenarios MLlib or Madlib will shine in? Regards, Venkat Ankam This communication is the property of CenturyLink and may contain confidential or privileged information. Unauthorized use of this communication is strictly prohibited and may be unlawful. If you have received this communication in error, please immediately notify the sender by reply e-mail and destroy all copies of the communication and any attachments. This communication is the property of CenturyLink and may contain confidential or privileged information. Unauthorized use of this communication is strictly prohibited and may be unlawful. If you have received this communication in error, please immediately notify the sender by reply e-mail and destroy all copies of the communication and any attachments.
Re: MLlib vs Madlib
I don't have any solid performance numbers, no. Let's start with some questions * Do you have to do any feature extraction before you start the routine? E.g. NLP, NER or tokenization? Have you already vectorized? * Which routine(s) do you wish to use? Things like k-means do very well in a relational setting, neural networks not as much. * Where does the data live now? How often will you have to re-load the data and re-run the pipeline? * The ML portion is probably the most expensive portion of the pipeline, so it may justify moving it in/out of HDFS or Greenplum for just the ML. For processing speed, my guess is Greenplum will be fastest, then Spark + HDFS, then Greenplum + HAWQ. I've done quite a bit of scale text analysis, and process is typically 1. Source the data. Either in Solr or HDFS or a drive somewhere 2. Annotation / Feature Extraction (just get the bits you need from the data) 3. Create vectors from the data. Tf/Idf is the most popular format. 4. Run the routine 5. Shout Damn when you realize you did it wrong. 6. Do 1-5 again. And again. 7. Create a report of some sort. 8. Visualize. When asking about performance, most people focus on (4). When focused on production, you need to consider the total cost of the pipeline. So my basic recommendation is to do the whole thing on a small scale first. If you end up with very relational questions, put everything in Greenplum. If it all comes down to a query on a single table, use Spark RDD and maybe Spark SQL. Just as an example, I've seen standard Postgres run extremely fast on Weighted Dictionaries. This demands just two tables, the weighted dictionary and a table with your documents. Though it's possible (and I've been foolish enough to do it), you don't want to spend the time embedding Stanford NLP into Postgres, the performance is awful. Let me know how it goes! b https://twitter.com/buddha_314 ~~ May All Your Sequences Converge On Dec 14, 2014, at 4:07 PM, Venkat, Ankam ankam.ven...@centurylink.com wrote: Thanks for the info Brian. I am trying to compare performance difference between “Pivotal HAWQ/Greenplum with MADlib” vs “HDFS with MLlib”. Do you think Spark MLlib will perform better because of in-memory, caching and iterative processing capabilities? I need to perform large scale text analytics and I can data store on HDFS or on Pivotal Greenplum/Hawq. Regards, Venkat Ankam From: Brian Dolan [mailto:buddha_...@yahoo.com] Sent: Sunday, December 14, 2014 10:02 AM To: Venkat, Ankam Cc: 'user@spark.apache.org' Subject: Re: MLlib vs Madlib MADLib (http://madlib.net/) was designed to bring large-scale ML techniques to a relational database, primarily postgresql. MLlib assumes the data exists in some Spark-compatible data format. I would suggest you pick the library that matches your data platform first. DISCLAIMER: I am the original author of MADLib, though EMC/Pivotal assumed ownership rather quickly. ~~ May All Your Sequences Converge On Dec 14, 2014, at 6:26 AM, Venkat, Ankam ankam.ven...@centurylink.com wrote: Can somebody throw light on MLlib vs Madlib? Which is better for machine learning? and are there any specific use case scenarios MLlib or Madlib will shine in? Regards, Venkat Ankam This communication is the property of CenturyLink and may contain confidential or privileged information. Unauthorized use of this communication is strictly prohibited and may be unlawful. If you have received this communication in error, please immediately notify the sender by reply e-mail and destroy all copies of the communication and any attachments. This communication is the property of CenturyLink and may contain confidential or privileged information. Unauthorized use of this communication is strictly prohibited and may be unlawful. If you have received this communication in error, please immediately notify the sender by reply e-mail and destroy all copies of the communication and any attachments.
Re: Running spark-submit from a remote machine using a YARN application
Hi, On Fri, Dec 12, 2014 at 7:01 AM, ryaminal tacmot...@gmail.com wrote: Now our solution is to make a very simply YARN application which execustes as its command spark-submit --master yarn-cluster s3n://application/jar.jar This seemed so simple and elegant, but it has some weird issues. We get NoClassDefFoundErrors. When we ssh to the box, run the same spark-submit command it works, but doing this through YARN leads in the NoClassDefFoundErrors mentioned. I do something similar, I start Spark using spark-submit from a non-Spark server application. Make sure that HADOOP_CONF_DIR is set correctly when running spark-submit from your program so that the YARN configuration can be found correctly. Also, keep in mind that some parameters to spark-submit have a different behavior when using yarn-cluster vs. local[*] master. For example, system properties set using `--conf` will be available in your Spark application only in local[*] mode, for YARN you need to wrap them with `--conf spark.executor.extraJavaOptions=...`. Tobias
Re: Adding a column to a SchemaRDD
Nathan, On Fri, Dec 12, 2014 at 3:11 PM, Nathan Kronenfeld nkronenf...@oculusinfo.com wrote: I can see how to do it if can express the added values in SQL - just run SELECT *,valueCalculation AS newColumnName FROM table I've been searching all over for how to do this if my added value is a scala function, with no luck. Let's say I have a SchemaRDD with columns A, B, and C, and I want to add a new column, D, calculated using Utility.process(b, c), and I want (of course) to pass in the value B and C from each row, ending up with a new SchemaRDD with columns A, B, C, and D. nkronenf...@oculusinfo.com I guess you would have to do two things: - schemardd.map(row = { extend the row here }) which will give you a plain RDD[Row] without a schema - take the schema from the schemardd and extend it manually by the name and type of the newly added column, - create a new SchemaRDD from your mapped RDD and the manually extended schema. Does that make sense? Tobias
Run Spark job on Playframework + Spark Master/Worker in one Mac
Hi all, I am trying to run Spark job on Playframework + Spark Master/Worker in one Mac. When job ran, I encountered java.lang.ClassNotFoundException. Would you teach me how to solve it? Here is my code in Github. https://github.com/TomoyaIgarashi/spark_cluster_sample * Envrionments: Mac 10.9.5 Java 1.7.0_71 Playframework 2.2.3 Spark 1.1.1 * Setup history: cd ~ git clone g...@github.com:apache/spark.git cd spark git checkout -b v1.1.1 v1.1.1 sbt/sbt assembly vi ~/.bashrc export SPARK_HOME=/Users/tomoya/spark . ~/.bashrc hostname Tomoya-Igarashis-MacBook-Air.local vi $SPARK_HOME/conf/slaves Tomoya-Igarashis-MacBook-Air.local play new spark_cluster_sample default name type - scala * Run history: $SPARK_HOME/sbin/start-all.sh jps which play /Users/tomoya/play/play git clone https://github.com/TomoyaIgarashi/spark_cluster_sample cd spark_cluster_sample play run * Error trace: Here is error trace in Gist. https://gist.github.com/TomoyaIgarashi/4bd45ab3685a532f5511 Regards
Re: what is the best way to implement mini batches?
I think it could be done like: 1. using mapPartition to randomly drop some partition 2. drop some elements randomly(for selected partition) 3. calculate gradient step for selected elements I don't think fixed step is needed, but fixed step could be done: 1. zipWithIndex 2. create ShuffleRDD based on the index(eg. using index/10 as key) 3. using mapPartition to calculate each bach I also have a question: Can mini batches run in parallel? I think parallel all batches just like a full batch GD in some case. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/what-is-the-best-way-to-implement-mini-batches-tp20264p20677.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Streaming Python APIs?
Hi spark experts Are there any Python APIs for Spark Streaming? I didn't find the Python APIs in Spark Streaming programming guide.. http://spark.apache.org/docs/latest/streaming-programming-guide.html Xiaoyong
RE: Spark Streaming Python APIs?
AFAIK, this will be a new feature in version 1.2, you can check out the master branch or 1.2 branch to take a try. Thanks Jerry From: Xiaoyong Zhu [mailto:xiaoy...@microsoft.com] Sent: Monday, December 15, 2014 10:53 AM To: user@spark.apache.org Subject: Spark Streaming Python APIs? Hi spark experts Are there any Python APIs for Spark Streaming? I didn't find the Python APIs in Spark Streaming programming guide.. http://spark.apache.org/docs/latest/streaming-programming-guide.html Xiaoyong
RE: Spark Streaming Python APIs?
Cool thanks! Xiaoyong From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: Monday, December 15, 2014 10:57 AM To: Xiaoyong Zhu Cc: user@spark.apache.org Subject: RE: Spark Streaming Python APIs? AFAIK, this will be a new feature in version 1.2, you can check out the master branch or 1.2 branch to take a try. Thanks Jerry From: Xiaoyong Zhu [mailto:xiaoy...@microsoft.com] Sent: Monday, December 15, 2014 10:53 AM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Spark Streaming Python APIs? Hi spark experts Are there any Python APIs for Spark Streaming? I didn't find the Python APIs in Spark Streaming programming guide.. http://spark.apache.org/docs/latest/streaming-programming-guide.html Xiaoyong
RE: Spark Streaming Python APIs?
Btw I have seen the python related docs in the 1.2 doc here: http://people.apache.org/~pwendell/spark-1.2.0-rc2-docs/streaming-programming-guide.html Xiaoyong From: Xiaoyong Zhu [mailto:xiaoy...@microsoft.com] Sent: Monday, December 15, 2014 10:58 AM To: Shao, Saisai Cc: user@spark.apache.org Subject: RE: Spark Streaming Python APIs? Cool thanks! Xiaoyong From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: Monday, December 15, 2014 10:57 AM To: Xiaoyong Zhu Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: RE: Spark Streaming Python APIs? AFAIK, this will be a new feature in version 1.2, you can check out the master branch or 1.2 branch to take a try. Thanks Jerry From: Xiaoyong Zhu [mailto:xiaoy...@microsoft.com] Sent: Monday, December 15, 2014 10:53 AM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Spark Streaming Python APIs? Hi spark experts Are there any Python APIs for Spark Streaming? I didn't find the Python APIs in Spark Streaming programming guide.. http://spark.apache.org/docs/latest/streaming-programming-guide.html Xiaoyong
Re: ALS failure with size Integer.MAX_VALUE
Hi Xiangrui, The block size limit was encountered even with reduced number of item blocks as you had expected. I'm wondering if I could try the new implementation as a standalone library against a 1.1 deployment. Does it have dependencies on any core API's in the current master? Thanks, Bharath On Wed, Dec 3, 2014 at 10:10 PM, Bharath Ravi Kumar reachb...@gmail.com wrote: Thanks Xiangrui. I'll try out setting a smaller number of item blocks. And yes, I've been following the JIRA for the new ALS implementation. I'll try it out when it's ready for testing. . On Wed, Dec 3, 2014 at 4:24 AM, Xiangrui Meng men...@gmail.com wrote: Hi Bharath, You can try setting a small item blocks in this case. 1200 is definitely too large for ALS. Please try 30 or even smaller. I'm not sure whether this could solve the problem because you have 100 items connected with 10^8 users. There is a JIRA for this issue: https://issues.apache.org/jira/browse/SPARK-3735 which I will try to implement in 1.3. I'll ping you when it is ready. Best, Xiangrui On Tue, Dec 2, 2014 at 10:40 AM, Bharath Ravi Kumar reachb...@gmail.com wrote: Yes, the issue appears to be due to the 2GB block size limitation. I am hence looking for (user, product) block sizing suggestions to work around the block size limitation. On Sun, Nov 30, 2014 at 3:01 PM, Sean Owen so...@cloudera.com wrote: (It won't be that, since you see that the error occur when reading a block from disk. I think this is an instance of the 2GB block size limitation.) On Sun, Nov 30, 2014 at 4:36 AM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Hi Bharath – I’m unsure if this is your problem but the MatrixFactorizationModel in MLLIB which is the underlying component for ALS expects your User/Product fields to be integers. Specifically, the input to ALS is an RDD[Rating] and Rating is an (Int, Int, Double). I am wondering if perhaps one of your identifiers exceeds MAX_INT, could you write a quick check for that? I have been running a very similar use case to yours (with more constrained hardware resources) and I haven’t seen this exact problem but I’m sure we’ve seen similar issues. Please let me know if you have other questions. From: Bharath Ravi Kumar reachb...@gmail.com Date: Thursday, November 27, 2014 at 1:30 PM To: user@spark.apache.org user@spark.apache.org Subject: ALS failure with size Integer.MAX_VALUE We're training a recommender with ALS in mllib 1.1 against a dataset of 150M users and 4.5K items, with the total number of training records being 1.2 Billion (~30GB data). The input data is spread across 1200 partitions on HDFS. For the training, rank=10, and we've configured {number of user data blocks = number of item data blocks}. The number of user/item blocks was varied between 50 to 1200. Irrespective of the block size (e.g. at 1200 blocks each), there are atleast a couple of tasks that end up shuffle reading 9.7G each in the aggregate stage (ALS.scala:337) and failing with the following exception: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108) at org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124) at org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)
Re: Limit the # of columns in Spark Scala
Oh, just figured it out: tabs.map(c = Array(c(167), c(110), c(200)) Thanks for all of the advice, eh?! On Sun Dec 14 2014 at 1:14:00 PM Yana Kadiyska yana.kadiy...@gmail.com wrote: Denny, I am not sure what exception you're observing but I've had luck with 2 things: val table = sc.textFile(hdfs://) You can try calling table.first here and you'll see the first line of the file. You can also do val debug = table.first.split(\t) which would give you an array and you can indeed verify that the array contains what you want in positions 167,119 and 200. In the case of large files with a random bad line I find wrapping the call within the map in try/catch very valuable -- you can dump out the whole line in the catch statement Lastly I would guess that you're getting a compile error and not a runtime error -- I believe c is an array of values so I think you want tabs.map(c = (c(167), c(110), c(200)) instead of tabs.map(c = (c._(167), c._(110), c._(200)) On Sun, Dec 14, 2014 at 3:12 PM, Denny Lee denny.g@gmail.com wrote: Yes - that works great! Sorry for implying I couldn't. Was just more flummoxed that I couldn't make the Scala call work on its own. Will continue to debug ;-) On Sun, Dec 14, 2014 at 11:39 Michael Armbrust mich...@databricks.com wrote: BTW, I cannot use SparkSQL / case right now because my table has 200 columns (and I'm on Scala 2.10.3) You can still apply the schema programmatically: http://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema
Re: KafkaUtils explicit acks
Thanks TD Francois for the explanation documentation. I'm curious if we have any performance benchmark with without WAL for spark-streaming-kafka. Also In spark-streaming-kafka (as kafka provides a way to acknowledge logs) on top of WAL can we modify KafkaUtils to acknowledge the offsets only when the RRDs are fully processed and are getting evicted out of the Spark memory thus we can be cent percent sure that all the records are getting processed in the system. I was thinking if it's good to have the kafka offset information of each batch as part of RDDs metadata and commit the offsets once the RDDs lineage is complete. On Thu, Dec 11, 2014 at 6:26 PM, Tathagata Das tathagata.das1...@gmail.com wrote: I am updating the docs right now. Here is a staged copy that you can have sneak peek of. This will be part of the Spark 1.2. http://people.apache.org/~tdas/spark-1.2-temp/streaming-programming-guide.html The updated fault-tolerance section tries to simplify the explanation of when and what data can be lost, and how to prevent that using the new experimental feature of write ahead logs. Any feedback will be much appreciated. TD On Wed, Dec 10, 2014 at 2:42 AM, francois.garil...@typesafe.com wrote: [sorry for the botched half-message] Hi Mukesh, There's been some great work on Spark Streaming reliability lately. https://www.youtube.com/watch?v=jcJq3ZalXD8 Look at the links from: https://issues.apache.org/jira/browse/SPARK-3129 I'm not aware of any doc yet (did I miss something ?) but you can look at the ReliableKafkaReceiver's test suite: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala -- FG On Wed, Dec 10, 2014 at 11:17 AM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Guys, Any insights on this?? If I'm not clear enough my question is how can I use kafka consumer and not loose any data in cases of failures with spark-streaming. On Tue, Dec 9, 2014 at 2:53 PM, Mukesh Jha me.mukesh@gmail.com wrote: Hello Experts, I'm working on a spark app which reads data from kafka persists it in hbase. Spark documentation states the below [1] that in case of worker failure we can loose some data. If not how can I make my kafka stream more reliable? I have seen there is a simple consumer [2] but I'm not sure if it has been used/tested extensively. I was wondering if there is a way to explicitly acknowledge the kafka offsets once they are replicated in memory of other worker nodes (if it's not already done) to tackle this issue. Any help is appreciated in advance. Using any input source that receives data through a network - For network-based data sources like Kafka and Flume, the received input data is replicated in memory between nodes of the cluster (default replication factor is 2). So if a worker node fails, then the system can recompute the lost from the the left over copy of the input data. However, if the worker node where a network receiver was running fails, then a tiny bit of data may be lost, that is, the data received by the system but not yet replicated to other node(s). The receiver will be started on a different node and it will continue to receive data. https://github.com/dibbhatt/kafka-spark-consumer Txz, Mukesh Jha -- Thanks Regards, Mukesh Jha -- Thanks Regards, *Mukesh Jha me.mukesh@gmail.com*
Q about Spark MLlib- Decision tree - scala.MatchError: 2.0 (of class java.lang.Double)
I am working some kind of Spark MLlib Test(Decision Tree) and I used IRIS data from Cran-R package. Original IRIS Data is not a good format for Spark MLlib. so I changed data format(change data format and features's location) When I ran sample Spark MLlib code for DT, I met the error like below How can i solve this error? == 14/12/15 14:27:30 ERROR TaskSetManager: Task 21.0:0 failed 4 times; aborting job 14/12/15 14:27:30 INFO TaskSchedulerImpl: Cancelling stage 21 14/12/15 14:27:30 INFO DAGScheduler: Failed to run aggregate at DecisionTree.scala:657 14/12/15 14:27:30 INFO TaskSchedulerImpl: Stage 21 was cancelled 14/12/15 14:27:30 WARN TaskSetManager: Loss was due to org.apache.spark.TaskKilledException org.apache.spark.TaskKilledException at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 14/12/15 14:27:30 INFO TaskSchedulerImpl: Removed TaskSet 21.0, whose tasks have all completed, from pool org.apache.spark.SparkException: Job aborted due to stage failure: Task 21.0:0 failed 4 times, most recent failure: Exception failure in TID 34 on host krbda1anode01.kr.test.com: scala.MatchError: 2.0 (of class java.lang.Double) org.apache.spark.mllib.tree.DecisionTree$.classificationBinSeqOp$1(DecisionTree.scala:568) org.apache.spark.mllib.tree.DecisionTree$.org$apache$spark$mllib$tree$DecisionTree$$binSeqOp$1(DecisionTree.scala:623) org.apache.spark.mllib.tree.DecisionTree$$anonfun$4.apply(DecisionTree.scala:657) org.apache.spark.mllib.tree.DecisionTree$$anonfun$4.apply(DecisionTree.scala:657) scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144) scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144) scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157) scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201) scala.collection.AbstractIterator.aggregate(Iterator.scala:1157) org.apache.spark.rdd.RDD$$anonfun$21.apply(RDD.scala:838) org.apache.spark.rdd.RDD$$anonfun$21.apply(RDD.scala:838) org.apache.spark.SparkContext$$anonfun$23.apply(SparkContext.scala:1116) org.apache.spark.SparkContext$$anonfun$23.apply(SparkContext.scala:1116) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at
RE: KafkaUtils explicit acks
Hi, It is not a trivial work to acknowledge the offsets when RDD is fully processed, I think from my understanding only modify the KafakUtils is not enough to meet your requirement, you need to add a metadata management stuff for each block/RDD, and track them both in executor-driver side, and many other things should also be taken care :). Thanks Jerry From: mukh@gmail.com [mailto:mukh@gmail.com] On Behalf Of Mukesh Jha Sent: Monday, December 15, 2014 1:31 PM To: Tathagata Das Cc: francois.garil...@typesafe.com; user@spark.apache.org Subject: Re: KafkaUtils explicit acks Thanks TD Francois for the explanation documentation. I'm curious if we have any performance benchmark with without WAL for spark-streaming-kafka. Also In spark-streaming-kafka (as kafka provides a way to acknowledge logs) on top of WAL can we modify KafkaUtils to acknowledge the offsets only when the RRDs are fully processed and are getting evicted out of the Spark memory thus we can be cent percent sure that all the records are getting processed in the system. I was thinking if it's good to have the kafka offset information of each batch as part of RDDs metadata and commit the offsets once the RDDs lineage is complete. On Thu, Dec 11, 2014 at 6:26 PM, Tathagata Das tathagata.das1...@gmail.commailto:tathagata.das1...@gmail.com wrote: I am updating the docs right now. Here is a staged copy that you can have sneak peek of. This will be part of the Spark 1.2. http://people.apache.org/~tdas/spark-1.2-temp/streaming-programming-guide.html The updated fault-tolerance section tries to simplify the explanation of when and what data can be lost, and how to prevent that using the new experimental feature of write ahead logs. Any feedback will be much appreciated. TD On Wed, Dec 10, 2014 at 2:42 AM, francois.garil...@typesafe.commailto:francois.garil...@typesafe.com wrote: [sorry for the botched half-message] Hi Mukesh, There's been some great work on Spark Streaming reliability lately. https://www.youtube.com/watch?v=jcJq3ZalXD8 Look at the links from: https://issues.apache.org/jira/browse/SPARK-3129 I'm not aware of any doc yet (did I miss something ?) but you can look at the ReliableKafkaReceiver's test suite: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala - FG On Wed, Dec 10, 2014 at 11:17 AM, Mukesh Jha me.mukesh@gmail.commailto:me.mukesh@gmail.com wrote: Hello Guys, Any insights on this?? If I'm not clear enough my question is how can I use kafka consumer and not loose any data in cases of failures with spark-streaming. On Tue, Dec 9, 2014 at 2:53 PM, Mukesh Jha me.mukesh@gmail.commailto:me.mukesh@gmail.com wrote: Hello Experts, I'm working on a spark app which reads data from kafka persists it in hbase. Spark documentation states the below [1] that in case of worker failure we can loose some data. If not how can I make my kafka stream more reliable? I have seen there is a simple consumer [2] but I'm not sure if it has been used/tested extensively. I was wondering if there is a way to explicitly acknowledge the kafka offsets once they are replicated in memory of other worker nodes (if it's not already done) to tackle this issue. Any help is appreciated in advance. Using any input source that receives data through a network - For network-based data sources like Kafka and Flume, the received input data is replicated in memory between nodes of the cluster (default replication factor is 2). So if a worker node fails, then the system can recompute the lost from the the left over copy of the input data. However, if the worker node where a network receiver was running fails, then a tiny bit of data may be lost, that is, the data received by the system but not yet replicated to other node(s). The receiver will be started on a different node and it will continue to receive data. https://github.com/dibbhatt/kafka-spark-consumer Txz, Mukesh Jha -- Thanks Regards, Mukesh Jha -- Thanks Regards, Mukesh Jhamailto:me.mukesh@gmail.com