global variable in spark streaming with no dependency on key
Hi Gurus, Please help. But please don't tell me to use updateStateByKey because I need a global variable (something like the clock time) across the micro batches but not depending on key. For my case, it is not acceptable to maintain a state for each key since each key comes in different times. Yes my global variable is related to time but cannot use machine clock. Any hint? Or is this lack of global variable by design? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Difference btw MEMORY_ONLY and MEMORY_AND_DISK
Hello Sparkers, I would like to understand difference btw these Storage levels for a RDD portion that doesn't fit in memory. As it seems like in both storage levels, whatever portion doesnt fit in memory will be spilled to disk. Any difference as such? Thanks, Harsha
Re: Difference btw MEMORY_ONLY and MEMORY_AND_DISK
MEMORY_ONLY will fail if there is not enough memory but MEMORY_AND_DISK will spill to disk Regards Sab On Tue, Aug 18, 2015 at 12:45 PM, Harsha HN 99harsha.h@gmail.com wrote: Hello Sparkers, I would like to understand difference btw these Storage levels for a RDD portion that doesn't fit in memory. As it seems like in both storage levels, whatever portion doesnt fit in memory will be spilled to disk. Any difference as such? Thanks, Harsha -- Architect - Big Data Ph: +91 99805 99458 Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan India ICT)* +++
Re: Regarding rdd.collect()
I think you are mixing the notion of job from hadoop map reduce world with spark. In spark, RDDs are immutable and transformations are lazy. So the first time rdd is actually fills up memory is when you run first transformation. After that, it stays up in memory until either application is stopped or new rdd s are generated causing old rdd to get pushed out to disk. Remember spark does not provide fault tolerance through replication but through lineage. So it is important to keep old rdds around in case of any failure downstream transformations On Tue, Aug 18, 2015 at 5:46 PM, Dawid Wysakowicz wysakowicz.da...@gmail.com wrote: No, the data is not stored between two jobs. But it is stored for a lifetime of a job. Job can have multiple actions run. For a matter of sharing an rdd between jobs you can have a look at Spark Job Server(spark-jobserver https://github.com/ooyala/spark-jobserver) or some In-Memory storages: Tachyon(http://tachyon-project.org/) or Ignite(https://ignite.incubator.apache.org/) 2015-08-18 9:37 GMT+02:00 Hemant Bhanawat hemant9...@gmail.com: It is still in memory for future rdd transformations and actions. This is interesting. You mean Spark holds the data in memory between two job executions. How does the second job get the handle of the data in memory? I am interested in knowing more about it. Can you forward me a spark article or JIRA that talks about it? On Tue, Aug 18, 2015 at 12:49 PM, Sabarish Sasidharan sabarish.sasidha...@manthan.com wrote: It is still in memory for future rdd transformations and actions. What you get in driver is a copy of the data. Regards Sab On Tue, Aug 18, 2015 at 12:02 PM, praveen S mylogi...@gmail.com wrote: When I do an rdd.collect().. The data moves back to driver Or is still held in memory across the executors? -- Architect - Big Data Ph: +91 99805 99458 Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan India ICT)* +++ -- Best Regards, Ayan Guha
Spark SQL Partition discovery - schema evolution
Hi all, I'm using Spark SQL using data from Openstack Swift. I'm trying to load parquet files with partition discovery, but I can't do it when the partitions don't match between two objects. For example, container which contains: /zone=0/object2 /zone=0/area=0/object1 Won't load, and will result with the error: java.lang.AssertionError: assertion failed: Conflicting partition column names detected: ArrayBuffer(zone, area) ArrayBuffer(zone) I know that there is support for schema evolution for the schemas inside the parquet files, but is there a schema evolution for partition discovery as well? Guy Hadash IBM Research - Haifa 03-7689436
Why there are overlapping for tasks on the EventTimeline UI
Hi, Following is copied from the spark EventTimeline UI. I don't understand why there are overlapping between tasks? I think they should be sequentially one by one in one executor(there are one core each executor). The blue part of each task is the scheduler delay time. Does it mean it is the delay that the task is put into the thread pool and the task is picked to run?
Re: Regarding rdd.collect()
On Tue, Aug 18, 2015 at 1:16 PM, Dawid Wysakowicz wysakowicz.da...@gmail.com wrote: No, the data is not stored between two jobs. But it is stored for a lifetime of a job. Job can have multiple actions run. I too thought so but wanted to confirm. Thanks. For a matter of sharing an rdd between jobs you can have a look at Spark Job Server(spark-jobserver https://github.com/ooyala/spark-jobserver) or some In-Memory storages: Tachyon(http://tachyon-project.org/) or Ignite(https://ignite.incubator.apache.org/) 2015-08-18 9:37 GMT+02:00 Hemant Bhanawat hemant9...@gmail.com: It is still in memory for future rdd transformations and actions. This is interesting. You mean Spark holds the data in memory between two job executions. How does the second job get the handle of the data in memory? I am interested in knowing more about it. Can you forward me a spark article or JIRA that talks about it? On Tue, Aug 18, 2015 at 12:49 PM, Sabarish Sasidharan sabarish.sasidha...@manthan.com wrote: It is still in memory for future rdd transformations and actions. What you get in driver is a copy of the data. Regards Sab On Tue, Aug 18, 2015 at 12:02 PM, praveen S mylogi...@gmail.com wrote: When I do an rdd.collect().. The data moves back to driver Or is still held in memory across the executors? -- Architect - Big Data Ph: +91 99805 99458 Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan India ICT)* +++
Re: global variable in spark streaming with no dependency on key
See if SparkContext.accumulator helps. On Tue, Aug 18, 2015 at 2:27 PM, Joanne Contact joannenetw...@gmail.com wrote: Hi Gurus, Please help. But please don't tell me to use updateStateByKey because I need a global variable (something like the clock time) across the micro batches but not depending on key. For my case, it is not acceptable to maintain a state for each key since each key comes in different times. Yes my global variable is related to time but cannot use machine clock. Any hint? Or is this lack of global variable by design? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Why standalone mode don't allow to set num-executor ?
num-executor only works for yarn mode. In standalone mode, I have to set the --total-executor-cores and --executor-cores. Isn't this way so intuitive ? Any reason for that ?
Re:Re: Regarding rdd.collect()
One spark application can have many jobs,eg,first call rdd.count then call rdd.collect At 2015-08-18 15:37:14, Hemant Bhanawat hemant9...@gmail.com wrote: It is still in memory for future rdd transformations and actions. This is interesting. You mean Spark holds the data in memory between two job executions. How does the second job get the handle of the data in memory? I am interested in knowing more about it. Can you forward me a spark article or JIRA that talks about it? On Tue, Aug 18, 2015 at 12:49 PM, Sabarish Sasidharan sabarish.sasidha...@manthan.com wrote: It is still in memory for future rdd transformations and actions. What you get in driver is a copy of the data. Regards Sab On Tue, Aug 18, 2015 at 12:02 PM, praveen S mylogi...@gmail.com wrote: When I do an rdd.collect().. The data moves back to driver Or is still held in memory across the executors? -- Architect - Big Data Ph: +91 99805 99458 Manthan Systems | Company of the year - Analytics (2014 Frost and Sullivan India ICT) +++
Regarding rdd.collect()
When I do an rdd.collect().. The data moves back to driver Or is still held in memory across the executors?
Re:Changed Column order in DataFrame.Columns call and insertIntoJDBC
Take a look at the doc for the method: /** * Applies a schema to an RDD of Java Beans. * * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, * SELECT * queries will return the columns in an undefined order. * @group dataframes * @since 1.3.0 */ At 2015-08-18 15:40:54, MooseSpark pandey.mayur...@gmail.com wrote: I have a RDD which I am using to create the data frame based on one POJO, but when Dataframe is created, the sequence of column order get changed. DataFrame df=sqlCtx.createDataFrame(rdd, Pojo.class); String[] columns=df.columns(); //columns here are of different order what has been defined in pojo //in pojo properties are p1,p2,p3 //but in columns it is p3 p1 p2 and same is being saved into jdbc df.insertIntoJDBC(jdbc:sqlserver://xx.yyy.00.11:PORT;databaseName=spark_gpeh;user=saw;password=password@123;, Test, false); any idea ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Changed-Column-order-in-DataFrame-Columns-call-and-insertIntoJDBC-tp24309.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Regarding rdd.collect()
No, the data is not stored between two jobs. But it is stored for a lifetime of a job. Job can have multiple actions run. For a matter of sharing an rdd between jobs you can have a look at Spark Job Server(spark-jobserver https://github.com/ooyala/spark-jobserver) or some In-Memory storages: Tachyon(http://tachyon-project.org/) or Ignite( https://ignite.incubator.apache.org/) 2015-08-18 9:37 GMT+02:00 Hemant Bhanawat hemant9...@gmail.com: It is still in memory for future rdd transformations and actions. This is interesting. You mean Spark holds the data in memory between two job executions. How does the second job get the handle of the data in memory? I am interested in knowing more about it. Can you forward me a spark article or JIRA that talks about it? On Tue, Aug 18, 2015 at 12:49 PM, Sabarish Sasidharan sabarish.sasidha...@manthan.com wrote: It is still in memory for future rdd transformations and actions. What you get in driver is a copy of the data. Regards Sab On Tue, Aug 18, 2015 at 12:02 PM, praveen S mylogi...@gmail.com wrote: When I do an rdd.collect().. The data moves back to driver Or is still held in memory across the executors? -- Architect - Big Data Ph: +91 99805 99458 Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan India ICT)* +++
Re: Regarding rdd.collect()
It is still in memory for future rdd transformations and actions. This is interesting. You mean Spark holds the data in memory between two job executions. How does the second job get the handle of the data in memory? I am interested in knowing more about it. Can you forward me a spark article or JIRA that talks about it? On Tue, Aug 18, 2015 at 12:49 PM, Sabarish Sasidharan sabarish.sasidha...@manthan.com wrote: It is still in memory for future rdd transformations and actions. What you get in driver is a copy of the data. Regards Sab On Tue, Aug 18, 2015 at 12:02 PM, praveen S mylogi...@gmail.com wrote: When I do an rdd.collect().. The data moves back to driver Or is still held in memory across the executors? -- Architect - Big Data Ph: +91 99805 99458 Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan India ICT)* +++
Re: Transform KafkaRDD to KafkaRDD, not plain RDD, or how to keep OffsetRanges after transformation
The solution how to share offsetRanges after DirectKafkaInputStream is transformed is in: https://github.com/apache/spark/blob/master/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala https://github.com/apache/spark/blob/master/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java One thing I would like to understand is why Scala version is using normal variable while Java version uses AtomicReference. Another thing which I don't get is about closure serialization. The question why logger in the below code doesn't throw NPE even its instance isn't copied like in the case of offsetRanges, when val offsets = offsetRanges is removed form foreachRDD then mapPratitionsWithIndex throws on offsets(idx). I have something like this code: object StreamOps { val logger = LoggerFactory.getLogger(StreamOps) var offsetRanges = Array[OffsetRange]() def transform[T](stream: InputDStream[Array[Byte]]): DStream[T] = { stream transform { rdd = offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd flatmap { message = Try(... decode Array[Byte] to F...) match { case Success(fact) = Some(fact) case _ = None } } } // Error handling removed for brevity def save[F](stream: DStream[F]): Unit { stream foreachRDD { rdd = // It has to be here otherwise NullPointerException val offsets = offsetRanges rdd mapartitionWithIndex { (idx, facts) = // Use offsets here val writer = new MyWriter[F](offsets(idx), ...) facts foreach { fact = writer.write(fact) } writer.close() // Why logger works and doesn't throw NullPointerException? logger.info(...) Iterator.empty } foreach { (_: Nothing) = } } } Many thanks for any advice, I'm sure its a noob question. Petr On Mon, Aug 17, 2015 at 1:12 PM, Petr Novak oss.mli...@gmail.com wrote: Or can I generally create new RDD from transformation and enrich its partitions with some metadata so that I would copy OffsetRanges in my new RDD in DStream? On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak oss.mli...@gmail.com wrote: Hi all, I need to transform KafkaRDD into a new stream of deserialized case classes. I want to use the new stream to save it to file and to perform additional transformations on it. To save it I want to use offsets in filenames, hence I need OffsetRanges in transformed RDD. But KafkaRDD is private, hence I don't know how to do it. Alternatively I could deserialize directly in messageHandler before KafkaRDD but it seems it is 1:1 transformation while I need to drop bad messages (KafkaRDD = RDD it would be flatMap). Is there a way how to do it using messageHandler, is there another approach? Many thanks for any help. Petr
Re: Regarding rdd.collect()
It is still in memory for future rdd transformations and actions. What you get in driver is a copy of the data. Regards Sab On Tue, Aug 18, 2015 at 12:02 PM, praveen S mylogi...@gmail.com wrote: When I do an rdd.collect().. The data moves back to driver Or is still held in memory across the executors? -- Architect - Big Data Ph: +91 99805 99458 Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan India ICT)* +++
Re: issue Running Spark Job on Yarn Cluster
Please check logs in your hadoop yarn cluster, there you would get precise error or exception. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/issue-Running-Spark-Job-on-Yarn-Cluster-tp21779p24308.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: registering an empty RDD as a temp table in a PySpark SQL context
It is definitely not the case for Spark SQL. A temporary table (much like dataFrame) is a just a logical plan with a name and it is not iterated unless a query is fired on it. I am not sure if using rdd.take in py code to verify the schema is a right approach as it creates a spark job. BTW, why would you want to update the Spark code? rdd.take in py code is the problem. All you want is to avoid the schema verification in the createDataFrame. I do not see any issue in the spark side in the way it handles a RDD that has no data. On Tue, Aug 18, 2015 at 1:23 AM, Eric Walker e...@node.io wrote: I have an RDD queried from a scan of a data source. Sometimes the RDD has rows and at other times it has none. I would like to register this RDD as a temporary table in a SQL context. I suspect this will work in Scala, but in PySpark some code assumes that the RDD has rows in it, which are used to verify the schema: https://github.com/apache/spark/blob/branch-1.3/python/pyspark/sql/context.py#L299 Before I attempt to extend the Scala code to handle an empty RDD or provide an empty DataFrame that can be registered, I was wondering what people recommend in this case. Perhaps there's a simple way of registering an empty RDD as a temporary table in a PySpark SQL context that I'm overlooking. An alternative is to add special case logic in the client code to deal with an RDD backed by an empty table scan. But since the SQL will already handle that, I was hoping to avoid special case logic. Eric
Changed Column order in DataFrame.Columns call and insertIntoJDBC
I have a RDD which I am using to create the data frame based on one POJO, but when Dataframe is created, the sequence of column order get changed. DataFrame df=sqlCtx.createDataFrame(rdd, Pojo.class); String[] columns=df.columns(); //columns here are of different order what has been defined in pojo //in pojo properties are p1,p2,p3 //but in columns it is p3 p1 p2 and same is being saved into jdbc df.insertIntoJDBC(jdbc:sqlserver://xx.yyy.00.11:PORT;databaseName=spark_gpeh;user=saw;password=password@123;, Test, false); any idea ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Changed-Column-order-in-DataFrame-Columns-call-and-insertIntoJDBC-tp24309.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running Spark on user-provided Hadoop installation
Refer: http://spark.apache.org/docs/latest/hadoop-provided.html Specifically if you want to refer s3a paths. Please edit spark-env.sh and add following lines at end: SPARK_DIST_CLASSPATH=$(/path/to/hadoop/hadoop-2.7.1/bin/hadoop classpath) export SPARK_DIST_CLASSPATH=$SPARK_DIST_CLASSPATH:/path/to/hadoop/hadoop-2.7.1/share/hadoop/tools/lib/* -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Spark-on-user-provided-Hadoop-installation-tp24076p24310.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how do I execute a job on a single worker node in standalone mode
by default standalone creates 1 executor on every worker machine per application number of overall cores is configured with --total-executor-cores so in general if you'll specify --total-executor-cores=1 then there would be only 1 core on some executor and you'll get what you want on the other hand, if you application needs all cores of your cluster and only some specific job should run on single executor there are few methods to achieve this e.g. coallesce(1) or dummyRddWithOnePartitionOnly.foreachPartition On 18 August 2015 at 01:36, Axel Dahl a...@whisperstream.com wrote: I have a 4 node cluster and have been playing around with the num-executors parameters, executor-memory and executor-cores I set the following: --executor-memory=10G --num-executors=1 --executor-cores=8 But when I run the job, I see that each worker, is running one executor which has 2 cores and 2.5G memory. What I'd like to do instead is have Spark just allocate the job to a single worker node? Is that possible in standalone mode or do I need a job/resource scheduler like Yarn to do that? Thanks in advance, -Axel
Re: how to write any data (non RDD) to a file inside closure?
Still not sure what you are trying to achieve. If you could post some code that doesn’t work the community can help you understand where the error (syntactic or conceptual) is. On 17 Aug 2015, at 17:42, dianweih001 [via Apache Spark User List] ml-node+s1001560n24299...@n3.nabble.com wrote: Hi Robin, I know how to write/read file outside of RDDs and executor closure. Just not sure how to write data to file inside closure because within closure we have to define RDDs which will introduce SparkContext error sometimes. Thank you for your reply. Dianwei If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-write-any-data-non-RDD-to-a-file-inside-closure-tp24243p24299.html http://apache-spark-user-list.1001560.n3.nabble.com/how-to-write-any-data-non-RDD-to-a-file-inside-closure-tp24243p24299.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=Um9iaW4uZWFzdEB4ZW5zZS5jby51a3wxfDIzMzQzMDUyNg==. NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-write-any-data-non-RDD-to-a-file-inside-closure-tp24243p24315.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Evaluating spark + Cassandra for our use cases
My company is interested in building a real-time time-series querying solution using Spark and Cassandra. Specifically, we're interested in setting up a Spark system against Cassandra running a hive thrift server. We need to be able to perform real-time queries on time-series data - things like, how many accounts have spent in total more than $300 on product X in the past 3 months, and purchased product Y in the past month. These queries need to be fast - preferably sub-second but we can deal with a few seconds if absolutely necessary. The data sizes are in the millions of records when rolled up to be per-monthly records. Something on the order of 100M per customer. My question is, based on experience, how hard would it be to get Cassandra and Spark working together to give us sub-second response times in this use case? Note that we'll need to use DataStax enterprise (which is unappealing from a cost standpoint) because it's the only thing that provides the hive spark thrift server to Cassandra. The two top contenders for our solution are Spark+Cassandra and Druid. Neither of these solutions work perfectly out of the box: - Druid would need to be modified, possibly hacked, to support the queries we require. I'm also not clear how operationally ready it is. - Cassandra and Spark would require paying money for DataStax enterprise. It really feels like it's going to be tricky to configure Cassandra and Spark to be lightning fast for our use case. Finally, window functions (which we need - see above) are not supported unless we use a pre-release milestone of the datastax spark Cassandra connector. I was wondering if anyone had any thoughts. How easy is it to get Spark and Cassandra down to sub-second speeds in our use case? Thanks, Ben
Re: Spark executor lost because of GC overhead limit exceeded even though using 20 executors using 25GB each
Do you mind providing a bit more information ? release of Spark code snippet of your app version of Java Thanks On Tue, Aug 18, 2015 at 8:57 AM, unk1102 umesh.ka...@gmail.com wrote: Hi this GC overhead limit error is making me crazy. I have 20 executors using 25 GB each I dont understand at all how can it throw GC overhead I also dont that that big datasets. Once this GC error occurs in executor it will get lost and slowly other executors getting lost because of IOException, Rpc client disassociated, shuffle not found etc Please help me solve this I am getting mad as I am new to Spark. Thanks in advance. WARN scheduler.TaskSetManager: Lost task 7.0 in stage 363.0 (TID 3373, myhost.com): java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.spark.sql.types.UTF8String.toString(UTF8String.scala:150) at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:120) at org.apache.spark.sql.columnar.STRING$.actualSize(ColumnType.scala:312) at org.apache.spark.sql.columnar.compression.DictionaryEncoding$Encoder.gatherCompressibilityStats(compressionSchemes.scala:224) at org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.gatherCompressibilityStats(CompressibleColumnBuilder.scala:72) at org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.appendFrom(CompressibleColumnBuilder.scala:80) at org.apache.spark.sql.columnar.NativeColumnBuilder.appendFrom(ColumnBuilder.scala:87) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:148) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:124) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-executor-lost-because-of-GC-overhead-limit-exceeded-even-though-using-20-executors-using-25GB-h-tp24322.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark + Jupyter (IPython Notebook)
Hi Guru, Thanks! Great to hear that someone tried it in production. How do you like it so far? Best Regards, Jerry On Tue, Aug 18, 2015 at 11:38 AM, Guru Medasani gdm...@gmail.com wrote: Hi Jerry, Yes. I’ve seen customers using this in production for data science work. I’m currently using this for one of my projects on a cluster as well. Also, here is a blog that describes how to configure this. http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/ Guru Medasani gdm...@gmail.com On Aug 18, 2015, at 8:35 AM, Jerry Lam chiling...@gmail.com wrote: Hi spark users and developers, Did anyone have IPython Notebook (Jupyter) deployed in production that uses Spark as the computational engine? I know Databricks Cloud provides similar features with deeper integration with Spark. However, Databricks Cloud has to be hosted by Databricks so we cannot do this. Other solutions (e.g. Zeppelin) seem to reinvent the wheel that IPython has already offered years ago. It would be great if someone can educate me the reason behind this. Best Regards, Jerry
Re: spark streaming 1.3 doubts(force it to not consume anything)
But KafkaRDD[K, V, U, T, R] is not subclass of RDD[R] as per java generic inheritance is not supported so derived class cannot return different genric typed subclass from overriden method. On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger c...@koeninger.org wrote: Option is covariant and KafkaRDD is a subclass of RDD On Tue, Aug 18, 2015 at 1:12 PM, Shushant Arora shushantaror...@gmail.com wrote: Is it that in scala its allowed for derived class to have any return type ? And streaming jar is originally created in scala so its allowed for DirectKafkaInputDStream to return Option[KafkaRDD[K, V, U, T, R]] compute method ? On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora shushantaror...@gmail.com wrote: looking at source code of org.apache.spark.streaming.kafka.DirectKafkaInputDStream override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = { val untilOffsets = clamp(latestLeaderOffsets(maxRetries)) val rdd = KafkaRDD[K, V, U, T, R]( context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) currentOffsets = untilOffsets.map(kv = kv._1 - kv._2.offset) Some(rdd) } But in DStream its def compute (validTime: Time): Option[RDD[T]] , So what should be the return type of custom DStream extends DirectKafkaInputDStream . Since I want the behaviour to be same as of DirectKafkaInputDStream in normal scenarios and return none in specific scenario. And why the same error did not come while extending DirectKafkaInputDStream from InputDStream ? Since new return type Option[KafkaRDD[K, V, U, T, R]] is not subclass of Option[RDD[T] so it should have been failed? On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger c...@koeninger.org wrote: The superclass method in DStream is defined as returning an Option[RDD[T]] On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora shushantaror...@gmail.com wrote: Getting compilation error while overriding compute method of DirectKafkaInputDStream. [ERROR] CustomDirectKafkaInputDstream.java:[51,83] compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream cannot override compute(org.apache.spark.streaming.Time) in org.apache.spark.streaming.dstream.DStream; attempting to use incompatible return type [ERROR] found : scala.Optionorg.apache.spark.streaming.kafka.KafkaRDDbyte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][] [ERROR] required: scala.Optionorg.apache.spark.rdd.RDDbyte[][] class : public class CustomDirectKafkaInputDstream extends DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder, byte[][]{ @Override public OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder, byte[][] compute( Time validTime) { int processed=processedCounter.value(); int failed = failedProcessingsCounter.value(); if((processed==failed)){ System.out.println(backing off since its 100 % failure); return Option.empty(); }else{ System.out.println(starting the stream ); return super.compute(validTime); } } } What should be the return type of compute method ? super class is returning OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder, byte[][] but its expecting scala.Optionorg.apache.spark.rdd.RDDbyte[][] from derived class . Is there something wring with code? On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org wrote: Look at the definitions of the java-specific KafkaUtils.createDirectStream methods (the ones that take a JavaStreamingContext) On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora shushantaror...@gmail.com wrote: How to create classtag in java ?Also Constructor of DirectKafkaInputDStream takes Function1 not Function but kafkautils.createDirectStream allows function. I have below as overriden DirectKafkaInputDStream. public class CustomDirectKafkaInputDstream extends DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder, byte[][]{ public CustomDirectKafkaInputDstream( StreamingContext ssc_, MapString, String kafkaParams, MapTopicAndPartition, Object fromOffsets, Function1MessageAndMetadatabyte[], byte[], byte[][] messageHandler, ClassTagbyte[] evidence$1, ClassTagbyte[] evidence$2, ClassTagDefaultDecoder evidence$3, ClassTagDefaultDecoder evidence$4, ClassTagbyte[][] evidence$5) { super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, evidence$2, evidence$3, evidence$4, evidence$5); } @Override public OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder, byte[][] compute( Time validTime) { int processe=processedCounter.value(); int failed = failedProcessingsCounter.value(); if((processed==failed)){ System.out.println(backing off since its 100 % failure); return Option.empty(); }else{ System.out.println(starting the stream ); return super.compute(validTime); } } To create this stream I am
Re: Left outer joining big data set with small lookups
Nope. Count action did not help to choose broadcast join. All of my tables are hive external tables. So, I tried to trigger compute statistics from sqlContext.sql. It gives me an error saying “nonsuch table”. I am not sure that is due to following bug in 1.4.1 https://issues.apache.org/jira/browse/SPARK-8105 https://issues.apache.org/jira/browse/SPARK-8105 I don’t find a way to enable broadcastHashjoin in my case :( On Aug 17, 2015, at 12:52 PM, Silvio Fiorito silvio.fior...@granturing.com wrote: Try doing a count on both lookups to force the caching to occur before the join. On 8/17/15, 12:39 PM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io wrote: Thanks for your help I tried to cache the lookup tables and left out join with the big table (DF). Join does not seem to be using broadcast join-still it goes with hash partition join and shuffling big table. Here is the scenario … table1 as big_df left outer join table2 as lkup on big_df.lkupid = lkup.lkupid table1 above is well distributed across all 40 partitions because sqlContext.sql(SET spark.sql.shuffle.partitions=40). table2 is small, using just 2 partition. s. After the join stage, sparkUI showed me that all activities ended up in just 2 executors. When I tried to dump the data in hdfs after join stage, all data ended up in 2 partition files and rest 38 files are 0 sized files. Since above one did not work, I tried to broadcast DF and registered as table before join. val table2_df = sqlContext.sql(select * from table2) val broadcast_table2 =sc.broadcast(table2_df) broadcast_table2.value.registerTempTable(“table2”) Broadcast is also having same issue as explained above. All data processed by just executors due to lookup skew. Any more idea to tackle this issue in Spark Dataframe? Thanks Vijay On Aug 14, 2015, at 10:27 AM, Silvio Fiorito silvio.fior...@granturing.com wrote: You could cache the lookup DataFrames, it’ll then do a broadcast join. On 8/14/15, 9:39 AM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io wrote: Hi I am facing huge performance problem when I am trying to left outer join very big data set (~140GB) with bunch of small lookups [Start schema type]. I am using data frame in spark sql. It looks like data is shuffled and skewed when that join happens. Is there any way to improve performance of such type of join in spark? How can I hint optimizer to go with replicated join etc., to avoid shuffle? Would it help to create broadcast variables on small lookups? If I create broadcast variables, how can I convert them into data frame and use them in sparksql type of join? Thanks Vijay - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark streaming 1.3 doubts(force it to not consume anything)
Option is covariant and KafkaRDD is a subclass of RDD On Tue, Aug 18, 2015 at 1:12 PM, Shushant Arora shushantaror...@gmail.com wrote: Is it that in scala its allowed for derived class to have any return type ? And streaming jar is originally created in scala so its allowed for DirectKafkaInputDStream to return Option[KafkaRDD[K, V, U, T, R]] compute method ? On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora shushantaror...@gmail.com wrote: looking at source code of org.apache.spark.streaming.kafka.DirectKafkaInputDStream override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = { val untilOffsets = clamp(latestLeaderOffsets(maxRetries)) val rdd = KafkaRDD[K, V, U, T, R]( context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) currentOffsets = untilOffsets.map(kv = kv._1 - kv._2.offset) Some(rdd) } But in DStream its def compute (validTime: Time): Option[RDD[T]] , So what should be the return type of custom DStream extends DirectKafkaInputDStream . Since I want the behaviour to be same as of DirectKafkaInputDStream in normal scenarios and return none in specific scenario. And why the same error did not come while extending DirectKafkaInputDStream from InputDStream ? Since new return type Option[KafkaRDD[K, V, U, T, R]] is not subclass of Option[RDD[T] so it should have been failed? On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger c...@koeninger.org wrote: The superclass method in DStream is defined as returning an Option[RDD[T]] On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora shushantaror...@gmail.com wrote: Getting compilation error while overriding compute method of DirectKafkaInputDStream. [ERROR] CustomDirectKafkaInputDstream.java:[51,83] compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream cannot override compute(org.apache.spark.streaming.Time) in org.apache.spark.streaming.dstream.DStream; attempting to use incompatible return type [ERROR] found : scala.Optionorg.apache.spark.streaming.kafka.KafkaRDDbyte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][] [ERROR] required: scala.Optionorg.apache.spark.rdd.RDDbyte[][] class : public class CustomDirectKafkaInputDstream extends DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder, byte[][]{ @Override public OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder, byte[][] compute( Time validTime) { int processed=processedCounter.value(); int failed = failedProcessingsCounter.value(); if((processed==failed)){ System.out.println(backing off since its 100 % failure); return Option.empty(); }else{ System.out.println(starting the stream ); return super.compute(validTime); } } } What should be the return type of compute method ? super class is returning OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder, byte[][] but its expecting scala.Optionorg.apache.spark.rdd.RDDbyte[][] from derived class . Is there something wring with code? On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org wrote: Look at the definitions of the java-specific KafkaUtils.createDirectStream methods (the ones that take a JavaStreamingContext) On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora shushantaror...@gmail.com wrote: How to create classtag in java ?Also Constructor of DirectKafkaInputDStream takes Function1 not Function but kafkautils.createDirectStream allows function. I have below as overriden DirectKafkaInputDStream. public class CustomDirectKafkaInputDstream extends DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder, byte[][]{ public CustomDirectKafkaInputDstream( StreamingContext ssc_, MapString, String kafkaParams, MapTopicAndPartition, Object fromOffsets, Function1MessageAndMetadatabyte[], byte[], byte[][] messageHandler, ClassTagbyte[] evidence$1, ClassTagbyte[] evidence$2, ClassTagDefaultDecoder evidence$3, ClassTagDefaultDecoder evidence$4, ClassTagbyte[][] evidence$5) { super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, evidence$2, evidence$3, evidence$4, evidence$5); } @Override public OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder, byte[][] compute( Time validTime) { int processe=processedCounter.value(); int failed = failedProcessingsCounter.value(); if((processed==failed)){ System.out.println(backing off since its 100 % failure); return Option.empty(); }else{ System.out.println(starting the stream ); return super.compute(validTime); } } To create this stream I am using scala.collection.immutable.MapString, String scalakafkaParams = JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.Tuple2String, Stringconforms()); scala.collection.immutable.MapTopicAndPartition, Long scalaktopicOffsetMap=
Re: Spark + Jupyter (IPython Notebook)
Refer this post http://blog.prabeeshk.com/blog/2015/06/19/pyspark-notebook-with-docker/ Spark + Jupyter + Docker On 18 August 2015 at 21:29, Jerry Lam chiling...@gmail.com wrote: Hi Guru, Thanks! Great to hear that someone tried it in production. How do you like it so far? Best Regards, Jerry On Tue, Aug 18, 2015 at 11:38 AM, Guru Medasani gdm...@gmail.com wrote: Hi Jerry, Yes. I’ve seen customers using this in production for data science work. I’m currently using this for one of my projects on a cluster as well. Also, here is a blog that describes how to configure this. http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/ Guru Medasani gdm...@gmail.com On Aug 18, 2015, at 8:35 AM, Jerry Lam chiling...@gmail.com wrote: Hi spark users and developers, Did anyone have IPython Notebook (Jupyter) deployed in production that uses Spark as the computational engine? I know Databricks Cloud provides similar features with deeper integration with Spark. However, Databricks Cloud has to be hosted by Databricks so we cannot do this. Other solutions (e.g. Zeppelin) seem to reinvent the wheel that IPython has already offered years ago. It would be great if someone can educate me the reason behind this. Best Regards, Jerry
Re: What am I missing that's preventing javac from finding the libraries (CLASSPATH is setup...)?
Normally people would establish maven project with Spark dependencies or, use sbt. Can you go with either approach ? Cheers On Tue, Aug 18, 2015 at 10:28 AM, Jerry jerry.c...@gmail.com wrote: Hello, So I setup Spark to run on my local machine to see if I can reproduce the issue I'm having with data frames, but I'm running into issues with the compiler. Here's what I got: $ echo $CLASSPATH /usr/lib/jvm/java-6-oracle/lib:/home/adminz/dev/spark/spark-1.4.1/lib/spark-assembly-1.4.1-hadoop2.6.0.jar javac Test.java Test.java:1: package org.apache.spark.sql.api.java does not exist import org.apache.spark.sql.api.java.*; ^ Test.java:6: package org.apache.spark.sql does not exist import org.apache.spark.sql.*; ^ Test.java:7: package org.apache.spark.sql.hive does not exist import org.apache.spark.sql.hive.*; Let me know what I'm doing wrong. Thanks, Jerry
Re: Too many files/dirs in hdfs
Is there a way to store all the results in one file and keep the file roll over separate than the spark streaming batch interval? On Mon, Aug 17, 2015 at 2:39 AM, UMESH CHAUDHARY umesh9...@gmail.com wrote: In Spark Streaming you can simply check whether your RDD contains any records or not and if records are there you can save them using FIleOutputStream: DStream.foreachRDD(t= { var count = t.count(); if (count0){ // SAVE YOUR STUFF} }; This will not create unnecessary files of 0 bytes. On Mon, Aug 17, 2015 at 2:51 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Currently, spark streaming would create a new directory for every batch and store the data to it (whether it has anything or not). There is no direct append call as of now, but you can achieve this either with FileUtil.copyMerge http://apache-spark-user-list.1001560.n3.nabble.com/save-spark-streaming-output-to-single-file-on-hdfs-td21124.html#a21167 or have a separate program which will do the clean up for you. Thanks Best Regards On Sat, Aug 15, 2015 at 5:20 AM, Mohit Anchlia mohitanch...@gmail.com wrote: Spark stream seems to be creating 0 bytes files even when there is no data. Also, I have 2 concerns here: 1) Extra unnecessary files is being created from the output 2) Hadoop doesn't work really well with too many files and I see that it is creating a directory with a timestamp every 1 second. Is there a better way of writing a file, may be use some kind of append mechanism where one doesn't have to change the batch interval.
COMPUTE STATS on hive table - NoSuchTableException
Hi I am trying to compute stats on a lookup table from spark which resides in hive. I am invoking spark API as follows. It gives me NoSuchTableException. Table is double verified and subsequent statement “sqlContext.sql(“select * from cpatext.lkup”)” picks up the table correctly. I am wondering whether it is related to https://issues.apache.org/jira/browse/SPARK-8105 https://issues.apache.org/jira/browse/SPARK-8105. I am using Spark 1.4.1 Please let me know. scala sqlContext.sql(ANALYZE TABLE cpatext.lkup COMPUTE STATISTICS NOSCAN) 2015-08-18 18:12:19,299 INFO [main] parse.ParseDriver (ParseDriver.java:parse(185)) - Parsing command: ANALYZE TABLE cpatext.lkup COMPUTE STATISTICS NOSCAN 2015-08-18 18:12:19,299 INFO [main] parse.ParseDriver (ParseDriver.java:parse(206)) - Parse Completed org.apache.spark.sql.catalyst.analysis.NoSuchTableException at org.apache.spark.sql.hive.client.ClientInterface$$anonfun$getTable$1.apply(ClientInterface.scala:112) at org.apache.spark.sql.hive.client.ClientInterface$$anonfun$getTable$1.apply(ClientInterface.scala:112) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.hive.client.ClientInterface$class.getTable(ClientInterface.scala:112) at org.apache.spark.sql.hive.client.ClientWrapper.getTable(ClientWrapper.scala:60) at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:227) at org.apache.spark.sql.hive.HiveContext$$anon$2.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:371) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:165) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:165) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:165) at org.apache.spark.sql.hive.HiveContext$$anon$2.lookupRelation(HiveContext.scala:371) at org.apache.spark.sql.hive.HiveContext.analyze(HiveContext.scala:293) at org.apache.spark.sql.hive.execution.AnalyzeTable.run(commands.scala:43) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57) Thanks Vijay
Re: Spark Job Hangs on our production cluster
just looking at the thread dump from your original email, the 3 executor threads are all trying to load classes. (One thread is actually loading some class, and the others are blocked waiting to load a class, most likely trying to load the same thing.) That is really weird, definitely not something which should keep things blocked for 30 min. It suggest something wrong w/ the jvm, or classpath configuration, or a combination. Looks like you are trying to run in the repl, and for whatever reason the http server for the repl to serve classes is not responsive. I'd try running outside of the repl and see if that works. sorry not a full diagnosis but maybe this'll help a bit. On Tue, Aug 11, 2015 at 3:19 PM, java8964 java8...@hotmail.com wrote: Currently we have a IBM BigInsight cluster with 1 namenode + 1 JobTracker + 42 data/task nodes, which runs with BigInsight V3.0.0.2, corresponding with Hadoop 2.2.0 with MR1. Since IBM BigInsight doesn't come with Spark, so we build Spark 1.2.2 with Hadoop 2.2.0 + Hive 0.12 by ourselves, and deploy it on the same cluster. The IBM Biginsight comes with IBM jdk 1.7, but during our experience on stage environment, we found out Spark works better with Oracle JVM. So we run spark under Oracle JDK 1.7.0_79. Now on production, we are facing a issue we never faced, nor can reproduce on our staging cluster. We are using Spark Standalone cluster, and here is the basic configurations: more spark-env.sh export JAVA_HOME=/opt/java export PATH=$JAVA_HOME/bin:$PATH export HADOOP_CONF_DIR=/opt/ibm/biginsights/hadoop-conf/ export SPARK_CLASSPATH=/opt/ibm/biginsights/IHC/lib/ibm-compression.jar:/opt/ibm/biginsights/hive/lib /db2jcc4-10.6.jar export SPARK_LOCAL_DIRS=/data1/spark/local,/data2/spark/local,/data3/spark/local export SPARK_MASTER_WEBUI_PORT=8081 export SPARK_MASTER_IP=host1 export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=42 export SPARK_WORKER_MEMORY=24g export SPARK_WORKER_CORES=6 export SPARK_WORKER_DIR=/tmp/spark/work export SPARK_DRIVER_MEMORY=2g export SPARK_EXECUTOR_MEMORY=2g more spark-defaults.conf spark.master spark://host1:7077 spark.eventLog.enabled true spark.eventLog.dir hdfs://host1:9000/spark/eventLog spark.serializer org.apache.spark.serializer.KryoSerializer spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps We are using AVRO file format a lot, and we have these 2 datasets, one is about 96G, and the other one is a little over 1T. Since we are using AVRO, so we also built spark-avro of commit a788c9fce51b0ec1bb4ce88dc65c1d55aaa675b8 https://github.com/databricks/spark-avro/tree/a788c9fce51b0ec1bb4ce88dc65c1d55aaa675b8, which is the latest version supporting Spark 1.2.x. Here is the problem we are facing on our production cluster, even the following simple spark-shell commands will hang in our production cluster: import org.apache.spark.sql.SQLContext val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) import com.databricks.spark.avro._ val bigData = sqlContext.avroFile(hdfs://namenode:9000/bigData/) bigData.registerTempTable(bigData) bigData.count() From the console, we saw following: [Stage 0: (44 + 42) / 7800] no update for more than 30 minutes and longer. The big dataset with 1T should generate 7800 HDFS block, but Spark's HDFS client looks like having problem to read them. Since we are running Spark on the data nodes, all the Spark tasks are running as NODE_LOCAL on locality level. If I go to the data/task node which Spark tasks hang, and use the JStack to dump the thread, I got the following on the top: 015-08-11 15:38:38 Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.79-b02 mixed mode): Attach Listener daemon prio=10 tid=0x7f0660589000 nid=0x1584d waiting on condition [0x] java.lang.Thread.State: RUNNABLE org.apache.hadoop.hdfs.PeerCache@4a88ec00 daemon prio=10 tid=0x7f06508b7800 nid=0x13302 waiting on condition [0x7f060be94000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.hadoop.hdfs.PeerCache.run(PeerCache.java:252) at org.apache.hadoop.hdfs.PeerCache.access$000(PeerCache.java:39) at org.apache.hadoop.hdfs.PeerCache$1.run(PeerCache.java:135) at java.lang.Thread.run(Thread.java:745) shuffle-client-1 daemon prio=10 tid=0x7f0650687000 nid=0x132fc runnable [0x7f060d198000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87) - locked 0x00067bf47710 (a io.netty.channel.nio.SelectedSelectionKeySet) - locked 0x00067bf374e8 (a java.util.Collections$UnmodifiableSet)
What am I missing that's preventing javac from finding the libraries (CLASSPATH is setup...)?
Hello, So I setup Spark to run on my local machine to see if I can reproduce the issue I'm having with data frames, but I'm running into issues with the compiler. Here's what I got: $ echo $CLASSPATH /usr/lib/jvm/java-6-oracle/lib:/home/adminz/dev/spark/spark-1.4.1/lib/spark-assembly-1.4.1-hadoop2.6.0.jar javac Test.java Test.java:1: package org.apache.spark.sql.api.java does not exist import org.apache.spark.sql.api.java.*; ^ Test.java:6: package org.apache.spark.sql does not exist import org.apache.spark.sql.*; ^ Test.java:7: package org.apache.spark.sql.hive does not exist import org.apache.spark.sql.hive.*; Let me know what I'm doing wrong. Thanks, Jerry
RE: Spark Job Hangs on our production cluster
Hi, Imran: Thanks for your reply. I am not sure what do you mean repl. Can you be more detail about that? This is only happened when the Spark 1.2.2 try to scan big data set, and cannot reproduce if it scans smaller dataset. FYI, I have to build and deploy Spark 1.3.1 on our production cluster. Right now, I cannot reproduce this hang problem on the same cluster for the same big dataset. On this point, we will continue trying Spark 1.3.1, hope we will have more positive experience with it. But just for wondering, what class Spark needs to be loaded at this time? From my understanding, the executor already scan the first block data from HDFS, and hanging while starting the 2nd block. All the class should be already loaded in JVM in this case. Thanks Yong From: iras...@cloudera.com Date: Tue, 18 Aug 2015 12:17:56 -0500 Subject: Re: Spark Job Hangs on our production cluster To: java8...@hotmail.com CC: user@spark.apache.org just looking at the thread dump from your original email, the 3 executor threads are all trying to load classes. (One thread is actually loading some class, and the others are blocked waiting to load a class, most likely trying to load the same thing.) That is really weird, definitely not something which should keep things blocked for 30 min. It suggest something wrong w/ the jvm, or classpath configuration, or a combination. Looks like you are trying to run in the repl, and for whatever reason the http server for the repl to serve classes is not responsive. I'd try running outside of the repl and see if that works. sorry not a full diagnosis but maybe this'll help a bit. On Tue, Aug 11, 2015 at 3:19 PM, java8964 java8...@hotmail.com wrote: Currently we have a IBM BigInsight cluster with 1 namenode + 1 JobTracker + 42 data/task nodes, which runs with BigInsight V3.0.0.2, corresponding with Hadoop 2.2.0 with MR1. Since IBM BigInsight doesn't come with Spark, so we build Spark 1.2.2 with Hadoop 2.2.0 + Hive 0.12 by ourselves, and deploy it on the same cluster. The IBM Biginsight comes with IBM jdk 1.7, but during our experience on stage environment, we found out Spark works better with Oracle JVM. So we run spark under Oracle JDK 1.7.0_79. Now on production, we are facing a issue we never faced, nor can reproduce on our staging cluster. We are using Spark Standalone cluster, and here is the basic configurations: more spark-env.shexport JAVA_HOME=/opt/javaexport PATH=$JAVA_HOME/bin:$PATHexport HADOOP_CONF_DIR=/opt/ibm/biginsights/hadoop-conf/export SPARK_CLASSPATH=/opt/ibm/biginsights/IHC/lib/ibm-compression.jar:/opt/ibm/biginsights/hive/lib/db2jcc4-10.6.jarexport SPARK_LOCAL_DIRS=/data1/spark/local,/data2/spark/local,/data3/spark/localexport SPARK_MASTER_WEBUI_PORT=8081export SPARK_MASTER_IP=host1export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=42export SPARK_WORKER_MEMORY=24gexport SPARK_WORKER_CORES=6export SPARK_WORKER_DIR=/tmp/spark/workexport SPARK_DRIVER_MEMORY=2gexport SPARK_EXECUTOR_MEMORY=2g more spark-defaults.confspark.master spark://host1:7077spark.eventLog.enabledtruespark.eventLog.dir hdfs://host1:9000/spark/eventLogspark.serializer org.apache.spark.serializer.KryoSerializerspark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps We are using AVRO file format a lot, and we have these 2 datasets, one is about 96G, and the other one is a little over 1T. Since we are using AVRO, so we also built spark-avro of commit a788c9fce51b0ec1bb4ce88dc65c1d55aaa675b8, which is the latest version supporting Spark 1.2.x. Here is the problem we are facing on our production cluster, even the following simple spark-shell commands will hang in our production cluster: import org.apache.spark.sql.SQLContextval sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)import com.databricks.spark.avro._val bigData = sqlContext.avroFile(hdfs://namenode:9000/bigData/)bigData.registerTempTable(bigData)bigData.count() From the console, we saw following:[Stage 0: (44 + 42) / 7800] no update for more than 30 minutes and longer. The big dataset with 1T should generate 7800 HDFS block, but Spark's HDFS client looks like having problem to read them. Since we are running Spark on the data nodes, all the Spark tasks are running as NODE_LOCAL on locality level. If I go to the data/task node which Spark tasks hang, and use the JStack to dump the thread, I got the following on the top: 015-08-11 15:38:38Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.79-b02 mixed mode): Attach Listener daemon prio=10 tid=0x7f0660589000 nid=0x1584d waiting on condition [0x] java.lang.Thread.State: RUNNABLE org.apache.hadoop.hdfs.PeerCache@4a88ec00 daemon prio=10 tid=0x7f06508b7800 nid=0x13302 waiting on condition [0x7f060be94000] java.lang.Thread.State: TIMED_WAITING
RE: Scala: How to match a java object????
Hi, thank you for further assistance you can reproduce this by simply running 5 match { case java.math.BigDecimal = 2 } In my personal case, I am applying a map acton to a Seq[Any], so the elements inside are of type any, to which I need to apply a proper .asInstanceOf[WhoYouShouldBe]. Saif From: William Briggs [mailto:wrbri...@gmail.com] Sent: Tuesday, August 18, 2015 4:46 PM To: Ellafi, Saif A.; user@spark.apache.org Subject: Re: Scala: How to match a java object Could you share your pattern matching expression that is failing? On Tue, Aug 18, 2015, 3:38 PM saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote: Hi all, I am trying to run a spark job, in which I receive java.math.BigDecimal objects, instead of the scala equivalents, and I am trying to convert them into Doubles. If I try to match-case this object class, I get: “error: object java.math.BigDecimal is not a value” How could I get around matching java objects? I would like to avoid a multiple try-catch on ClassCastExceptions for all my checks. Thank you, Saif
Re: Why standalone mode don't allow to set num-executor ?
Hi Canan, This is mainly for legacy reasons. The default behavior in standalone in mode is that the application grabs all available resources in the cluster. This effectively means we want one executor per worker, where each executor grabs all the available cores and memory on that worker. In this model, it doesn't really make sense to express number of executors, because that's equivalent to the number of workers. In 1.4+, however, we do support multiple executors per worker, but that's not the default so we decided not to add support for the --num-executors setting to avoid potential confusion. -Andrew 2015-08-18 2:35 GMT-07:00 canan chen ccn...@gmail.com: num-executor only works for yarn mode. In standalone mode, I have to set the --total-executor-cores and --executor-cores. Isn't this way so intuitive ? Any reason for that ?
Re: Evaluating spark + Cassandra for our use cases
Hi, First you need to make your SLA clear. It does not sound for me they are defined very well or that your solution is necessary for the scenario. I also find it hard to believe that 1 customer has 100Million transactions per month. Time series data is easy to precalculate - you do not necessarily need in-memory technology here. I recommend your company to do a Proof of Concept and get more details/clarificarion on the requirements before risking million of dollars of investment. Le mar. 18 août 2015 à 21:18, Benjamin Ross br...@lattice-engines.com a écrit : My company is interested in building a real-time time-series querying solution using Spark and Cassandra. Specifically, we’re interested in setting up a Spark system against Cassandra running a hive thrift server. We need to be able to perform real-time queries on time-series data – things like, how many accounts have spent in total more than $300 on product X in the past 3 months, and purchased product Y in the past month. These queries need to be fast – preferably sub-second but we can deal with a few seconds if absolutely necessary. The data sizes are in the millions of records when rolled up to be per-monthly records. Something on the order of 100M per customer. My question is, based on experience, how hard would it be to get Cassandra and Spark working together to give us sub-second response times in this use case? Note that we’ll need to use DataStax enterprise (which is unappealing from a cost standpoint) because it’s the only thing that provides the hive spark thrift server to Cassandra. The two top contenders for our solution are Spark+Cassandra and Druid. Neither of these solutions work perfectly out of the box: - Druid would need to be modified, possibly hacked, to support the queries we require. I’m also not clear how operationally ready it is. - Cassandra and Spark would require paying money for DataStax enterprise. It really feels like it’s going to be tricky to configure Cassandra and Spark to be lightning fast for our use case. Finally, window functions (which we need – see above) are not supported unless we use a pre-release milestone of the datastax spark Cassandra connector. I was wondering if anyone had any thoughts. How easy is it to get Spark and Cassandra down to sub-second speeds in our use case? Thanks, Ben
RE: Evaluating spark + Cassandra for our use cases
Hi Jorn, Of course we're planning on doing a proof of concept here - the difficulty is that our timeline is short, so we cannot afford too many PoCs before we have to make a decision. We also need to figure out *which* databases to proof of concept. Note that one tricky aspect of our problem is that we need to support window functions partitioned on a per account basis. I've found that support for window functions is very limited in most databases, and they're also generally slow when available. Also, 1 customer certainly does not have 100M transactions per month. There are 100M transactions total for a given customer when we roll everything up to be per-month. We do not care about granularity smaller than a month. There are also many columns that we care about - on the order of many thousands. What makes you suggest that we do not need in-memory technology? Ben From: Jörn Franke [jornfra...@gmail.com] Sent: Tuesday, August 18, 2015 4:14 PM To: Benjamin Ross; user@spark.apache.org Cc: Ron Gonzalez Subject: Re: Evaluating spark + Cassandra for our use cases Hi, First you need to make your SLA clear. It does not sound for me they are defined very well or that your solution is necessary for the scenario. I also find it hard to believe that 1 customer has 100Million transactions per month. Time series data is easy to precalculate - you do not necessarily need in-memory technology here. I recommend your company to do a Proof of Concept and get more details/clarificarion on the requirements before risking million of dollars of investment. Le mar. 18 août 2015 à 21:18, Benjamin Ross br...@lattice-engines.commailto:br...@lattice-engines.com a écrit : My company is interested in building a real-time time-series querying solution using Spark and Cassandra. Specifically, we’re interested in setting up a Spark system against Cassandra running a hive thrift server. We need to be able to perform real-time queries on time-series data – things like, how many accounts have spent in total more than $300 on product X in the past 3 months, and purchased product Y in the past month. These queries need to be fast – preferably sub-second but we can deal with a few seconds if absolutely necessary. The data sizes are in the millions of records when rolled up to be per-monthly records. Something on the order of 100M per customer. My question is, based on experience, how hard would it be to get Cassandra and Spark working together to give us sub-second response times in this use case? Note that we’ll need to use DataStax enterprise (which is unappealing from a cost standpoint) because it’s the only thing that provides the hive spark thrift server to Cassandra. The two top contenders for our solution are Spark+Cassandra and Druid. Neither of these solutions work perfectly out of the box: - Druid would need to be modified, possibly hacked, to support the queries we require. I’m also not clear how operationally ready it is. - Cassandra and Spark would require paying money for DataStax enterprise. It really feels like it’s going to be tricky to configure Cassandra and Spark to be lightning fast for our use case. Finally, window functions (which we need – see above) are not supported unless we use a pre-release milestone of the datastax spark Cassandra connector. I was wondering if anyone had any thoughts. How easy is it to get Spark and Cassandra down to sub-second speeds in our use case? Thanks, Ben
Re: Json Serde used by Spark Sql
Under the covers we use Jackson's Streaming API as of Spark 1.4. On Tue, Aug 18, 2015 at 1:12 PM, Udit Mehta ume...@groupon.com wrote: Hi, I was wondering what json serde does spark sql use. I created a JsonRDD out of a json file and then registered it as a temp table to query. I can then query the table using dot notation for nested structs/arrays. I was wondering how does spark sql deserialize the json data based on the query. Thanks in advance, Udit
Re: Difference between Sort based and Hash based shuffle
Hi Muhammad, On a high level, in hash-based shuffle each mapper M writes R shuffle files, one for each reducer where R is the number of reduce partitions. This results in M * R shuffle files. Since it is not uncommon for M and R to be O(1000), this quickly becomes expensive. An optimization with hash-based shuffle is consolidation, where all mappers run in the same core C write one file per reducer, resulting in C * R files. This is a strict improvement, but it is still relatively expensive. Instead, in sort-based shuffle each mapper writes a single partitioned file. This allows a particular reducer to request a specific portion of each mapper's single output file. In more detail, the mapper first fills up an internal buffer in memory and continually spills the contents of the buffer to disk, then finally merges all the spilled files together to form one final output file. This places much less stress on the file system and requires much fewer I/O operations especially on the read side. -Andrew 2015-08-16 11:08 GMT-07:00 Muhammad Haseeb Javed 11besemja...@seecs.edu.pk : I did check it out and although I did get a general understanding of the various classes used to implement Sort and Hash shuffles, however these slides lack details as to how they are implemented and why sort generally has better performance than hash On Sun, Aug 16, 2015 at 4:31 AM, Ravi Kiran ravikiranmag...@gmail.com wrote: Have a look at this presentation. http://www.slideshare.net/colorant/spark-shuffle-introduction . Can be of help to you. On Sat, Aug 15, 2015 at 1:42 PM, Muhammad Haseeb Javed 11besemja...@seecs.edu.pk wrote: What are the major differences between how Sort based and Hash based shuffle operate and what is it that cause Sort Shuffle to perform better than Hash? Any talks that discuss both shuffles in detail, how they are implemented and the performance gains ?
Json Serde used by Spark Sql
Hi, I was wondering what json serde does spark sql use. I created a JsonRDD out of a json file and then registered it as a temp table to query. I can then query the table using dot notation for nested structs/arrays. I was wondering how does spark sql deserialize the json data based on the query. Thanks in advance, Udit
Re: how do I execute a job on a single worker node in standalone mode
Hi Axel, You can try setting `spark.deploy.spreadOut` to false (through your conf/spark-defaults.conf file). What this does is essentially try to schedule as many cores on one worker as possible before spilling over to other workers. Note that you *must* restart the cluster through the sbin scripts. For more information see: http://spark.apache.org/docs/latest/spark-standalone.html. Feel free to let me know whether it works, -Andrew 2015-08-18 4:49 GMT-07:00 Igor Berman igor.ber...@gmail.com: by default standalone creates 1 executor on every worker machine per application number of overall cores is configured with --total-executor-cores so in general if you'll specify --total-executor-cores=1 then there would be only 1 core on some executor and you'll get what you want on the other hand, if you application needs all cores of your cluster and only some specific job should run on single executor there are few methods to achieve this e.g. coallesce(1) or dummyRddWithOnePartitionOnly.foreachPartition On 18 August 2015 at 01:36, Axel Dahl a...@whisperstream.com wrote: I have a 4 node cluster and have been playing around with the num-executors parameters, executor-memory and executor-cores I set the following: --executor-memory=10G --num-executors=1 --executor-cores=8 But when I run the job, I see that each worker, is running one executor which has 2 cores and 2.5G memory. What I'd like to do instead is have Spark just allocate the job to a single worker node? Is that possible in standalone mode or do I need a job/resource scheduler like Yarn to do that? Thanks in advance, -Axel
Re: Scala: How to match a java object????
On Tue, Aug 18, 2015 at 12:59 PM, saif.a.ell...@wellsfargo.com wrote: 5 match { case java.math.BigDecimal = 2 } 5 match { case _: java.math.BigDecimal = 2 } -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Scala: How to match a java object????
On Tue, Aug 18, 2015 at 1:19 PM, saif.a.ell...@wellsfargo.com wrote: Hi, Can you please elaborate? I am confused :-) You did note that the two pieces of code are different, right? See http://docs.scala-lang.org/tutorials/tour/pattern-matching.html for how to match things in Scala, especially the typed pattern example. -Original Message- From: Marcelo Vanzin [mailto:van...@cloudera.com] Sent: Tuesday, August 18, 2015 5:15 PM To: Ellafi, Saif A. Cc: wrbri...@gmail.com; user@spark.apache.org Subject: Re: Scala: How to match a java object On Tue, Aug 18, 2015 at 12:59 PM, saif.a.ell...@wellsfargo.com wrote: 5 match { case java.math.BigDecimal = 2 } 5 match { case _: java.math.BigDecimal = 2 } -- Marcelo -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: dse spark-submit multiple jars issue
Hi Satish, The problem is that `--jars` accepts a comma-delimited list of jars! E.g. spark-submit ... --jars lib1.jar,lib2.jar,lib3.jar main.jar where main.jar is your main application jar (the one that starts a SparkContext), and lib*.jar refer to additional libraries that your main application jar uses. -Andrew 2015-08-13 3:22 GMT-07:00 Javier Domingo Cansino javier.domi...@fon.com: Please notice that 'jars: null' I don't know why you put ///. but I would propose you just put normal absolute paths. dse spark-submit --master spark://10.246.43.15:7077 --class HelloWorld --jars /home/missingmerch/postgresql-9.4-1201.jdbc41.jar /home/missingmerch/dse.jar /home/missingmerch/spark-cassandra-connector-java_2.10-1.1.1.jar /home/missingmerch/etl-0.0.1-SNAPSHOT.jar Hope this is helpful! [image: Fon] http://www.fon.com/Javier Domingo CansinoResearch Development Engineer+34 946545847Skype: javier.domingo.fonAll information in this email is confidential http://corp.fon.com/legal/email-disclaimer On Tue, Aug 11, 2015 at 3:42 PM, satish chandra j jsatishchan...@gmail.com wrote: HI, Please find the log details below: dse spark-submit --verbose --master local --class HelloWorld etl-0.0.1-SNAPSHOT.jar --jars file:/home/missingmerch/postgresql-9.4-1201.jdbc41.jar file:/home/missingmerch/dse.jar file:/home/missingmerch/postgresql-9.4-1201.jdbc41.jar Using properties file: /etc/dse/spark/spark-defaults.conf Adding default property: spark.cassandra.connection.factory=com.datastax.bdp.spark.DseCassandraConnectionFactory Adding default property: spark.ssl.keyStore=.keystore Adding default property: spark.ssl.enabled=false Adding default property: spark.ssl.trustStore=.truststore Adding default property: spark.cassandra.auth.conf.factory=com.datastax.bdp.spark.DseAuthConfFactory Adding default property: spark.ssl.keyPassword=cassandra Adding default property: spark.ssl.keyStorePassword=cassandra Adding default property: spark.ssl.protocol=TLS Adding default property: spark.ssl.useNodeLocalConf=true Adding default property: spark.ssl.trustStorePassword=cassandra Adding default property: spark.ssl.enabledAlgorithms=TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA Parsed arguments: master local deployMode null executorMemory null executorCores null totalExecutorCores null propertiesFile /etc/dse/spark/spark-defaults.conf driverMemory512M driverCores null driverExtraClassPathnull driverExtraLibraryPath null driverExtraJavaOptions -Dcassandra.username=missingmerch -Dcassandra.password=STMbrjrlb -XX:MaxPermSize=256M supervise false queue null numExecutorsnull files null pyFiles null archivesnull mainClass HelloWorld primaryResource file:/home/missingmerch/etl-0.0.1-SNAPSHOT.jar nameHelloWorld childArgs [--jars file:/home/missingmerch/postgresql-9.4-1201.jdbc41.jar file:/home/missingmerch/dse.jar file:/home/missingmerch/postgresql-9.4-1201.jdbc41.jar] jarsnull verbose true Spark properties used, including those specified through --conf and those from the properties file /etc/dse/spark/spark-defaults.conf: spark.cassandra.connection.factory - com.datastax.bdp.spark.DseCassandraConnectionFactory spark.ssl.useNodeLocalConf - true spark.ssl.enabled - false spark.executor.extraJavaOptions - -XX:MaxPermSize=256M spark.ssl.keyStore - .keystore spark.ssl.trustStore - .truststore spark.ssl.trustStorePassword - cassandra spark.ssl.enabledAlgorithms - TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA spark.cassandra.auth.conf.factory - com.datastax.bdp.spark.DseAuthConfFactory spark.ssl.protocol - TLS spark.ssl.keyPassword - cassandra spark.ssl.keyStorePassword - cassandra Main class: HelloWorld Arguments: --jars file:/home/missingmerch/postgresql-9.4-1201.jdbc41.jar file:/home/missingmerch/dse.jar file:/home/missingmerch/postgresql-9.4-1201.jdbc41.jar System properties: spark.cassandra.connection.factory - com.datastax.bdp.spark.DseCassandraConnectionFactory spark.driver.memory - 512M spark.ssl.useNodeLocalConf - true spark.ssl.enabled - false SPARK_SUBMIT - true spark.executor.extraJavaOptions - -XX:MaxPermSize=256M spark.app.name - HelloWorld spark.ssl.enabledAlgorithms - TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA spark.ssl.trustStorePassword - cassandra spark.driver.extraJavaOptions - -Dcassandra.username=missingmerch -Dcassandra.password=STMbrjrlb -XX:MaxPermSize=256M spark.ssl.keyStore - .keystore spark.ssl.trustStore - .truststore spark.jars -
Re: Is it this a BUG?: Why Spark Flume Streaming job is not deploying the Receiver to the specified host?
Are you using the Flume polling stream or the older stream? Such problems of binding used to occur in the older push-based approach, hence we built the polling stream (pull-based). On Tue, Aug 18, 2015 at 4:45 AM, diplomatic Guru diplomaticg...@gmail.com wrote: I'm testing the Flume + Spark integration example (flume count). I'm deploying the job using yarn cluster mode. I first logged into the Yarn cluster, then submitted the job and passed in a specific worker node's IP to deploy the job. But when I checked the WebUI, it failed to bind to the specified IP because the receiver was deployed to a different host, not the one I asked it to. Do you know? For your information, I've also tried passing the IP address used by the resource manager to find resources but no joy. But when I set the host to 'localhost' and deploy to the cluster it is binding a worker node that is selected by the resource manager.
Re: Spark Job Hangs on our production cluster
sorry, by repl I mean spark-shell, I guess I'm used to them being used interchangeably. From that thread dump, the one thread that isn't stuck is trying to get classes specifically related to the shell / repl: java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.read(SocketInputStream.java:152) at java.net.SocketInputStream.read(SocketInputStream.java:122) at java.io.BufferedInputStream.fill(BufferedInputStream.java:235) at java.io.BufferedInputStream.read1(BufferedInputStream.java:275) at java.io.BufferedInputStream.read(BufferedInputStream.java:334) - locked 0x00072477d530 (a java.io.BufferedInputStream) at sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:689) at sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:633) at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1324) - locked 0x000724772bf8 (a sun.net.www.protocol.http.HttpURLConnection) at java.net.URL.openStream(URL.java:1037) at org.apache.spark.repl.ExecutorClassLoader.findClassLocally(ExecutorClassLoader.scala:86) at org.apache.spark.repl.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:63) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) ... thats because the repl needs to package up the code for every single line, and it serves those compiled classes to each executor over http. This particular executor seems to be stuck pulling one of those lines compiled in the repl. (This is all assuming that the thread dump is the same over the entire 30 minutes that spark seems to be stuck.) Yes, the classes should be loaded for the first partition that is processed. (there certainly could be cases where different classes are needed for each partition, but it doesn't sound like you are doing anything that would trigger this.) But to be clear, in repl mode, there will be additional classes to be sent with every single job. Hope that helps a little more ... maybe there was some issue w/ 1.2.2, though I didn't see anything with a quick search, hopefully you'll have more luck w/ 1.3.1 On Tue, Aug 18, 2015 at 2:23 PM, java8964 java8...@hotmail.com wrote: Hi, Imran: Thanks for your reply. I am not sure what do you mean repl. Can you be more detail about that? This is only happened when the Spark 1.2.2 try to scan big data set, and cannot reproduce if it scans smaller dataset. FYI, I have to build and deploy Spark 1.3.1 on our production cluster. Right now, I cannot reproduce this hang problem on the same cluster for the same big dataset. On this point, we will continue trying Spark 1.3.1, hope we will have more positive experience with it. But just for wondering, what class Spark needs to be loaded at this time? From my understanding, the executor already scan the first block data from HDFS, and hanging while starting the 2nd block. All the class should be already loaded in JVM in this case. Thanks Yong -- From: iras...@cloudera.com Date: Tue, 18 Aug 2015 12:17:56 -0500 Subject: Re: Spark Job Hangs on our production cluster To: java8...@hotmail.com CC: user@spark.apache.org just looking at the thread dump from your original email, the 3 executor threads are all trying to load classes. (One thread is actually loading some class, and the others are blocked waiting to load a class, most likely trying to load the same thing.) That is really weird, definitely not something which should keep things blocked for 30 min. It suggest something wrong w/ the jvm, or classpath configuration, or a combination. Looks like you are trying to run in the repl, and for whatever reason the http server for the repl to serve classes is not responsive. I'd try running outside of the repl and see if that works. sorry not a full diagnosis but maybe this'll help a bit. On Tue, Aug 11, 2015 at 3:19 PM, java8964 java8...@hotmail.com wrote: Currently we have a IBM BigInsight cluster with 1 namenode + 1 JobTracker + 42 data/task nodes, which runs with BigInsight V3.0.0.2, corresponding with Hadoop 2.2.0 with MR1. Since IBM BigInsight doesn't come with Spark, so we build Spark 1.2.2 with Hadoop 2.2.0 + Hive 0.12 by ourselves, and deploy it on the same cluster. The IBM Biginsight comes with IBM jdk 1.7, but during our experience on stage environment, we found out Spark works better with Oracle JVM. So we run spark under Oracle JDK 1.7.0_79. Now on production, we are facing a issue we never faced, nor can reproduce on our staging cluster. We are using Spark Standalone cluster, and here is the basic configurations: more spark-env.sh export JAVA_HOME=/opt/java export PATH=$JAVA_HOME/bin:$PATH export HADOOP_CONF_DIR=/opt/ibm/biginsights/hadoop-conf/ export
Re: Scala: How to match a java object????
Could you share your pattern matching expression that is failing? On Tue, Aug 18, 2015, 3:38 PM saif.a.ell...@wellsfargo.com wrote: Hi all, I am trying to run a spark job, in which I receive *java.math.BigDecimal* objects, instead of the scala equivalents, and I am trying to convert them into Doubles. If I try to match-case this object class, I get: *“**error: object java.math.BigDecimal is not a value**”* How could I get around matching java objects? I would like to avoid a multiple try-catch on ClassCastExceptions for all my checks. Thank you, Saif
to retrive full stack trace
HI All, Please let me know if any arguments to be passed in CLI to retrieve FULL STACK TRACE in Apache Spark I am stuck in a issue for which it would be helpful to analyze full stack trace Regards, Satish Chandra
Re: to retrive full stack trace
if you error is on executors you need to check the executor logs for full stacktrace On Tue, Aug 18, 2015 at 10:01 PM, satish chandra j jsatishchan...@gmail.com wrote: HI All, Please let me know if any arguments to be passed in CLI to retrieve FULL STACK TRACE in Apache Spark I am stuck in a issue for which it would be helpful to analyze full stack trace Regards, Satish Chandra
RE: Scala: How to match a java object????
Hi, Can you please elaborate? I am confused :-) Saif -Original Message- From: Marcelo Vanzin [mailto:van...@cloudera.com] Sent: Tuesday, August 18, 2015 5:15 PM To: Ellafi, Saif A. Cc: wrbri...@gmail.com; user@spark.apache.org Subject: Re: Scala: How to match a java object On Tue, Aug 18, 2015 at 12:59 PM, saif.a.ell...@wellsfargo.com wrote: 5 match { case java.math.BigDecimal = 2 } 5 match { case _: java.math.BigDecimal = 2 } -- Marcelo
NaN in GraphX PageRank answer
Hi all, I was trying to use GraphX to compute pagerank and found that pagerank value for several vertices is NaN. I am using Spark 1.3. Any idea how to fix that? -- Thanks, -Khaled
Re: Programmatically create SparkContext on YARN
Hi Andreas, I believe the distinction is not between standalone and YARN mode, but between client and cluster mode. In client mode, your Spark submit JVM runs your driver code. In cluster mode, one of the workers (or NodeManagers if you're using YARN) in the cluster runs your driver code. In the latter case, it doesn't really make sense to call `setMaster` in your driver because Spark needs to know which cluster you're submitting the application to. Instead, the recommended way is to set the master through the `--master` flag in the command line, e.g. $ bin/spark-submit --master spark://1.2.3.4:7077 --class some.user.Clazz --name My app name --jars lib1.jar,lib2.jar --deploy-mode cluster app.jar Both YARN and standalone modes support client and cluster modes, and the spark-submit script is the common interface through which you can launch your application. In other words, you shouldn't have to do anything more than providing a different value to `--master` to use YARN. -Andrew 2015-08-17 0:34 GMT-07:00 Andreas Fritzler andreas.fritz...@gmail.com: Hi all, when runnig the Spark cluster in standalone mode I am able to create the Spark context from Java via the following code snippet: SparkConf conf = new SparkConf() .setAppName(MySparkApp) .setMaster(spark://SPARK_MASTER:7077) .setJars(jars); JavaSparkContext sc = new JavaSparkContext(conf); As soon as I'm done with my processing, I can just close it via sc.stop(); Now my question: Is the same also possible when running Spark on YARN? I currently don't see how this should be possible without submitting your application as a packaged jar file. Is there a way to get this kind of interactivity from within your Scala/Java code? Regards, Andrea
Spark scala addFile retrieving file with incorrect size
Hi all, I'm trying to run a spark job (written in scala) that uses addFile to download some small files to each node. However, one of the downloaded files has an incorrect size (the other ones are ok), which causes an error when using it in the code. I have looked more into the issue and hexdump'ed both the original and the spark-retrieved files. The beginning of the files are exactly equal, but the spark-retrieved one just gets truncated at a random position. This position appears random, however I noticed that it is exactly half the size of the original file. Not sure if a coincidence or not. The original file has a size of 296 bytes (the others are a little bit bigger, around 13 kbytes). I'm kinda new to spark, so I'm stuck at this point trying to figure out what is the problem. Does anyone have any idea of what might be the problem here? Thank you, Bernardo
Re: Scala: How to match a java object????
Hi Saif, Would this work? import scala.collection.JavaConversions._ new java.math.BigDecimal(5) match { case x: java.math.BigDecimal = x.doubleValue } It gives me on the scala console. res9: Double = 5.0 Assuming you had a stream of BigDecimals, you could just call map on it. myBigDecimals.map(_.doubleValue) to get your Seq of Doubles. You will need the JavaConversions._ import to allow Java Doubles to be treated by Scala as Scala Doubles. -sujit On Tue, Aug 18, 2015 at 12:59 PM, saif.a.ell...@wellsfargo.com wrote: Hi, thank you for further assistance you can reproduce this by simply running *5 match { case java.math.BigDecimal = 2 }* In my personal case, I am applying a map acton to a Seq[Any], so the elements inside are of type any, to which I need to apply a proper .asInstanceOf[WhoYouShouldBe]. Saif *From:* William Briggs [mailto:wrbri...@gmail.com] *Sent:* Tuesday, August 18, 2015 4:46 PM *To:* Ellafi, Saif A.; user@spark.apache.org *Subject:* Re: Scala: How to match a java object Could you share your pattern matching expression that is failing? On Tue, Aug 18, 2015, 3:38 PM saif.a.ell...@wellsfargo.com wrote: Hi all, I am trying to run a spark job, in which I receive *java.math.BigDecimal *objects, instead of the scala equivalents, and I am trying to convert them into Doubles. If I try to match-case this object class, I get: *“error: object java.math.BigDecimal is not a value”* How could I get around matching java objects? I would like to avoid a multiple try-catch on ClassCastExceptions for all my checks. Thank you, Saif
What is the reason for ExecutorLostFailure?
Hi All Why am I getting ExecutorLostFailure and executors are completely lost for rest of the processing? Eventually it makes job to fail. One thing for sure that lot of shuffling happens across executors in my program. Is there a way to understand and debug ExecutorLostFailure? Any pointers regarding “ExecutorLostFailure” would help me a lot. Thanks Vijay
Re:Why there are overlapping for tasks on the EventTimeline UI
I think I find the answer.. On the UI, the recording time of each task is when it is put into the thread pool. Then the UI makes sense At 2015-08-18 17:40:07, Todd bit1...@163.com wrote: Hi, Following is copied from the spark EventTimeline UI. I don't understand why there are overlapping between tasks? I think they should be sequentially one by one in one executor(there are one core each executor). The blue part of each task is the scheduler delay time. Does it mean it is the delay that the task is put into the thread pool and the task is picked to run?
Re: global variable in spark streaming with no dependency on key
Thanks. I tried. The problem is I have to updateStatebyKey to maintain other states related to keys. Not sure where to pass this accumulator variable into updateStateBykey. On Tue, Aug 18, 2015 at 2:17 AM, Hemant Bhanawat hemant9...@gmail.com wrote: See if SparkContext.accumulator helps. On Tue, Aug 18, 2015 at 2:27 PM, Joanne Contact joannenetw...@gmail.com wrote: Hi Gurus, Please help. But please don't tell me to use updateStateByKey because I need a global variable (something like the clock time) across the micro batches but not depending on key. For my case, it is not acceptable to maintain a state for each key since each key comes in different times. Yes my global variable is related to time but cannot use machine clock. Any hint? Or is this lack of global variable by design? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[mllib] Random forest maxBins and confidence in training points
Hi everyone, I have two questions regarding the random forest implementation in mllib 1- maxBins: Say the value of a feature is between [0,100]. In my dataset there are a lot of data points between [0,10] and one datapoint at 100 and nothing between (10, 100). I am wondering how does the binning work in this case? I obviously don't want all my points that are in between [0,10] to fall into the same bin and other bins to be empty. would mllib do any smart reallocation of bins such that each bin gets some datapoints in them and one bin does not get all the datapoints? 2- Is there any way to do this in Spark? http://stats.stackexchange.com/questions/165062/incorporating-the-confidence-in-the-training-data-into-the-ml-model Thanks a lotMark
Re: broadcast variable of Kafka producer throws ConcurrentModificationException
All of you are right. I was trying to create too many producers. My idea was to create a pool(for now the pool contains only one producer) shared by all the executors. After I realized it was related to the serializable issues (though I did not find clear clues in the source code to indicate the broacast template type parameter must be implement serializable), I followed spark cassandra connector design and created a singleton of Kafka producer pools. There is not exception noticed. Thanks for all your comments. On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das t...@databricks.com wrote: Why are you even trying to broadcast a producer? A broadcast variable is some immutable piece of serializable DATA that can be used for processing on the executors. A Kafka producer is neither DATA nor immutable, and definitely not serializable. The right way to do this is to create the producer in the executors. Please see the discussion in the programming guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger c...@koeninger.org wrote: I wouldn't expect a kafka producer to be serializable at all... among other things, it has a background thread On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan wansheng...@gmail.com wrote: Hi, Did anyone see java.util.ConcurrentModificationException when using broadcast variables? I encountered this exception when wrapping a Kafka producer like this in the spark streaming driver. Here is what I did. KafkaProducerString, String producer = new KafkaProducerString, String(properties); final BroadcastKafkaDataProducer bCastProducer = streamingContext.sparkContext().broadcast(producer); Then within an closure called by a foreachRDD, I was trying to get the wrapped producer, i.e. KafkaProducerString, String p = bCastProducer.value(); after rebuilding and rerunning, I got the stack trace like this Exception in thread main com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException Serialization trace: classes (sun.misc.Launcher$AppClassLoader) classloader (java.security.ProtectionDomain) context (java.security.AccessControlContext) acc (org.apache.spark.util.MutableURLClassLoader) contextClassLoader (org.apache.kafka.common.utils.KafkaThread) ioThread (org.apache.kafka.clients.producer.KafkaProducer) producer (my driver) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:85) at
Re: Spark + Jupyter (IPython Notebook)
Hi Prabeesh, That's even better! Thanks for sharing Jerry On Tue, Aug 18, 2015 at 1:31 PM, Prabeesh K. prabsma...@gmail.com wrote: Refer this post http://blog.prabeeshk.com/blog/2015/06/19/pyspark-notebook-with-docker/ Spark + Jupyter + Docker On 18 August 2015 at 21:29, Jerry Lam chiling...@gmail.com wrote: Hi Guru, Thanks! Great to hear that someone tried it in production. How do you like it so far? Best Regards, Jerry On Tue, Aug 18, 2015 at 11:38 AM, Guru Medasani gdm...@gmail.com wrote: Hi Jerry, Yes. I’ve seen customers using this in production for data science work. I’m currently using this for one of my projects on a cluster as well. Also, here is a blog that describes how to configure this. http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/ Guru Medasani gdm...@gmail.com On Aug 18, 2015, at 8:35 AM, Jerry Lam chiling...@gmail.com wrote: Hi spark users and developers, Did anyone have IPython Notebook (Jupyter) deployed in production that uses Spark as the computational engine? I know Databricks Cloud provides similar features with deeper integration with Spark. However, Databricks Cloud has to be hosted by Databricks so we cannot do this. Other solutions (e.g. Zeppelin) seem to reinvent the wheel that IPython has already offered years ago. It would be great if someone can educate me the reason behind this. Best Regards, Jerry
Re: Is it this a BUG?: Why Spark Flume Streaming job is not deploying the Receiver to the specified host?
Thank you Tathagata for your response. Yes, I'm using push model on Spark 1.2. For my scenario I do prefer the push model. Is this the case on the later version 1.4 too? I think I can find a workaround for this issue but only if I know how to obtain the worker(executor) ID. I can get the detail of the driver like this: *ss.ssc().env().blockManager().blockManagerId().host()* *But not sure how I could the executor Id from the driver.* *When the job is submitted, I can see that blockmanager being registered with the Driver and Executor IP address:* *15/08/18 23:31:40 INFO YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@05151113997207:41630/user/Executor#1210147506] with ID 115/08/18 23:31:40 INFO RackResolver: Resolved 05151113997207 to /0513_R-0050/RJ0515/08/18 23:31:41 INFO BlockManagerMasterActor: Registering block manager 05151113997207:56921 with 530.3 MB RAM, BlockManagerId(1, 05151113997207, 56921)The BlockManagerMasterActor appears to be doing the registering. Is there anyway I can access this from the SparkContext?Thanks.* On 18 August 2015 at 22:40, Tathagata Das t...@databricks.com wrote: Are you using the Flume polling stream or the older stream? Such problems of binding used to occur in the older push-based approach, hence we built the polling stream (pull-based). On Tue, Aug 18, 2015 at 4:45 AM, diplomatic Guru diplomaticg...@gmail.com wrote: I'm testing the Flume + Spark integration example (flume count). I'm deploying the job using yarn cluster mode. I first logged into the Yarn cluster, then submitted the job and passed in a specific worker node's IP to deploy the job. But when I checked the WebUI, it failed to bind to the specified IP because the receiver was deployed to a different host, not the one I asked it to. Do you know? For your information, I've also tried passing the IP address used by the resource manager to find resources but no joy. But when I set the host to 'localhost' and deploy to the cluster it is binding a worker node that is selected by the resource manager.
Re: What is the reason for ExecutorLostFailure?
Usually more information as to the cause of this will be found down in your logs. I generally see this happen when an out of memory exception has occurred for one reason or another on an executor. It's possible your memory settings are too small per executor or the concurrent number of tasks you are running are too large for some of the executors. Other times, it's possible using RDD functions like groupBy() that collect an unbounded amount of items into memory could be causing it. Either way, the logs for the executors should be able to give you some insight, have you looked at those yet? On Tue, Aug 18, 2015 at 6:26 PM, VIJAYAKUMAR JAWAHARLAL sparkh...@data2o.io wrote: Hi All Why am I getting ExecutorLostFailure and executors are completely lost for rest of the processing? Eventually it makes job to fail. One thing for sure that lot of shuffling happens across executors in my program. Is there a way to understand and debug ExecutorLostFailure? Any pointers regarding “ExecutorLostFailure” would help me a lot. Thanks Vijay
Re: Is it this a BUG?: Why Spark Flume Streaming job is not deploying the Receiver to the specified host?
I dont think there is a super clean way for doing this. Here is an idea. Run a dummy job with large number of partitions/tasks, which will access SparkEnv.get.blockManager().blockManagerId().host() and return it. sc.makeRDD(1 to 100, 100).map { _ = SparkEnv.get.blockManager().blockManagerId().host() }.collect().distinct() But that said, I do recommend using the pull based model. With the push based model it become realy hard to deal with scenarios where the whole node (where the receiver is supposed to run) goes down and the receiver cannot run anywhere else. On Tue, Aug 18, 2015 at 5:25 PM, diplomatic Guru diplomaticg...@gmail.com wrote: Thank you Tathagata for your response. Yes, I'm using push model on Spark 1.2. For my scenario I do prefer the push model. Is this the case on the later version 1.4 too? I think I can find a workaround for this issue but only if I know how to obtain the worker(executor) ID. I can get the detail of the driver like this: *ss.ssc().env().blockManager().blockManagerId().host()* *But not sure how I could the executor Id from the driver.* *When the job is submitted, I can see that blockmanager being registered with the Driver and Executor IP address:* *15/08/18 23:31:40 INFO YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@05151113997207:41630/user/Executor#1210147506] with ID 115/08/18 23:31:40 INFO RackResolver: Resolved 05151113997207 to /0513_R-0050/RJ0515/08/18 23:31:41 INFO BlockManagerMasterActor: Registering block manager 05151113997207:56921 with 530.3 MB RAM, BlockManagerId(1, 05151113997207, 56921)The BlockManagerMasterActor appears to be doing the registering. Is there anyway I can access this from the SparkContext?Thanks.* On 18 August 2015 at 22:40, Tathagata Das t...@databricks.com wrote: Are you using the Flume polling stream or the older stream? Such problems of binding used to occur in the older push-based approach, hence we built the polling stream (pull-based). On Tue, Aug 18, 2015 at 4:45 AM, diplomatic Guru diplomaticg...@gmail.com wrote: I'm testing the Flume + Spark integration example (flume count). I'm deploying the job using yarn cluster mode. I first logged into the Yarn cluster, then submitted the job and passed in a specific worker node's IP to deploy the job. But when I checked the WebUI, it failed to bind to the specified IP because the receiver was deployed to a different host, not the one I asked it to. Do you know? For your information, I've also tried passing the IP address used by the resource manager to find resources but no joy. But when I set the host to 'localhost' and deploy to the cluster it is binding a worker node that is selected by the resource manager.
Re: broadcast variable of Kafka producer throws ConcurrentModificationException
Why are you even trying to broadcast a producer? A broadcast variable is some immutable piece of serializable DATA that can be used for processing on the executors. A Kafka producer is neither DATA nor immutable, and definitely not serializable. The right way to do this is to create the producer in the executors. Please see the discussion in the programming guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger c...@koeninger.org wrote: I wouldn't expect a kafka producer to be serializable at all... among other things, it has a background thread On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan wansheng...@gmail.com wrote: Hi, Did anyone see java.util.ConcurrentModificationException when using broadcast variables? I encountered this exception when wrapping a Kafka producer like this in the spark streaming driver. Here is what I did. KafkaProducerString, String producer = new KafkaProducerString, String(properties); final BroadcastKafkaDataProducer bCastProducer = streamingContext.sparkContext().broadcast(producer); Then within an closure called by a foreachRDD, I was trying to get the wrapped producer, i.e. KafkaProducerString, String p = bCastProducer.value(); after rebuilding and rerunning, I got the stack trace like this Exception in thread main com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException Serialization trace: classes (sun.misc.Launcher$AppClassLoader) classloader (java.security.ProtectionDomain) context (java.security.AccessControlContext) acc (org.apache.spark.util.MutableURLClassLoader) contextClassLoader (org.apache.kafka.common.utils.KafkaThread) ioThread (org.apache.kafka.clients.producer.KafkaProducer) producer (my driver) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:85) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289) at org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648) at my driver at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at
Re: Spark + Jupyter (IPython Notebook)
For python it is really great. There is some work in progress in bringing Scala support to Jupyter as well. https://github.com/hohonuuli/sparknotebook https://github.com/hohonuuli/sparknotebook https://github.com/alexarchambault/jupyter-scala https://github.com/alexarchambault/jupyter-scala Guru Medasani gdm...@gmail.com On Aug 18, 2015, at 12:29 PM, Jerry Lam chiling...@gmail.com wrote: Hi Guru, Thanks! Great to hear that someone tried it in production. How do you like it so far? Best Regards, Jerry On Tue, Aug 18, 2015 at 11:38 AM, Guru Medasani gdm...@gmail.com mailto:gdm...@gmail.com wrote: Hi Jerry, Yes. I’ve seen customers using this in production for data science work. I’m currently using this for one of my projects on a cluster as well. Also, here is a blog that describes how to configure this. http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/ http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/ Guru Medasani gdm...@gmail.com mailto:gdm...@gmail.com On Aug 18, 2015, at 8:35 AM, Jerry Lam chiling...@gmail.com mailto:chiling...@gmail.com wrote: Hi spark users and developers, Did anyone have IPython Notebook (Jupyter) deployed in production that uses Spark as the computational engine? I know Databricks Cloud provides similar features with deeper integration with Spark. However, Databricks Cloud has to be hosted by Databricks so we cannot do this. Other solutions (e.g. Zeppelin) seem to reinvent the wheel that IPython has already offered years ago. It would be great if someone can educate me the reason behind this. Best Regards, Jerry
Re: Spark + Jupyter (IPython Notebook)
Hey, Actually, for Scala, I'd better using https://github.com/andypetrella/spark-notebook/ It's deployed at several places like *Alibaba*, *EBI*, *Cray* and is supported by both the Scala community and the company Data Fellas. For instance, it was part of the Big Scala Pipeline training given this 16th August at Galvanize in San Francisco with the collaboration of *Datastax, Mesosphere, Databricks, Confluent and Typesafe*: http://scala.bythebay.io/pipeline.html. It was a successful 100+ attendants training day. Also, it's the only one fully reactive including a reactive plotting library in Scala, allowing you to creatively plot a moving average computed in a DStream, or a D3 Graph layout dynamically updated or even a dynamic map of the received tweets having geoloc set. Of course, you can plot lines, pies, bars, hist, boxplot for any kind of data, being Dataframe, SQL stuffs, Seq, List, Map or whatever of tuples or classes. Checkout http://spark-notebook.io/, for your specific distro. Note that you can also use it directly on DCOS. For any question, I'll be glad helping you on the ~200 crowded gitter chatroom: https://gitter.im/andypetrella/spark-notebook cheers and have fun :-) On Tue, Aug 18, 2015 at 10:24 PM Guru Medasani gdm...@gmail.com wrote: For python it is really great. There is some work in progress in bringing Scala support to Jupyter as well. https://github.com/hohonuuli/sparknotebook https://github.com/alexarchambault/jupyter-scala Guru Medasani gdm...@gmail.com On Aug 18, 2015, at 12:29 PM, Jerry Lam chiling...@gmail.com wrote: Hi Guru, Thanks! Great to hear that someone tried it in production. How do you like it so far? Best Regards, Jerry On Tue, Aug 18, 2015 at 11:38 AM, Guru Medasani gdm...@gmail.com wrote: Hi Jerry, Yes. I’ve seen customers using this in production for data science work. I’m currently using this for one of my projects on a cluster as well. Also, here is a blog that describes how to configure this. http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/ Guru Medasani gdm...@gmail.com On Aug 18, 2015, at 8:35 AM, Jerry Lam chiling...@gmail.com wrote: Hi spark users and developers, Did anyone have IPython Notebook (Jupyter) deployed in production that uses Spark as the computational engine? I know Databricks Cloud provides similar features with deeper integration with Spark. However, Databricks Cloud has to be hosted by Databricks so we cannot do this. Other solutions (e.g. Zeppelin) seem to reinvent the wheel that IPython has already offered years ago. It would be great if someone can educate me the reason behind this. Best Regards, Jerry -- andy
Failed to fetch block error
Hi, I see the following error in my Spark Job even after using like 100 cores and 16G memory. Did any of you experience the same problem earlier? 15/08/18 21:51:23 ERROR shuffle.RetryingBlockFetcher: Failed to fetch block input-0-1439959114400, and will not retry (0 retries) java.lang.RuntimeException: java.io.FileNotFoundException: /data1/spark/spark-aed30958-2ee1-4eb7-984e-6402fb0a0503/blockmgr-ded36b52-ccc7-48dc-ba05-65bb21fc4136/34/input-0-1439959114400 (Too many open files) at java.io.RandomAccessFile.open(Native Method) at java.io.RandomAccessFile.init(RandomAccessFile.java:241) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:110) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Failed-to-fetch-block-error-tp24335.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Too many files/dirs in hdfs
Of course, Java or Scala can do that: 1) Create a FileWriter with append or roll over option 2) For each RDD create a StringBuilder after applying your filters 3) Write this StringBuilder to File when you want to write (The duration can be defined as a condition) On Tue, Aug 18, 2015 at 11:05 PM, Mohit Anchlia mohitanch...@gmail.com wrote: Is there a way to store all the results in one file and keep the file roll over separate than the spark streaming batch interval? On Mon, Aug 17, 2015 at 2:39 AM, UMESH CHAUDHARY umesh9...@gmail.com wrote: In Spark Streaming you can simply check whether your RDD contains any records or not and if records are there you can save them using FIleOutputStream: DStream.foreachRDD(t= { var count = t.count(); if (count0){ // SAVE YOUR STUFF} }; This will not create unnecessary files of 0 bytes. On Mon, Aug 17, 2015 at 2:51 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Currently, spark streaming would create a new directory for every batch and store the data to it (whether it has anything or not). There is no direct append call as of now, but you can achieve this either with FileUtil.copyMerge http://apache-spark-user-list.1001560.n3.nabble.com/save-spark-streaming-output-to-single-file-on-hdfs-td21124.html#a21167 or have a separate program which will do the clean up for you. Thanks Best Regards On Sat, Aug 15, 2015 at 5:20 AM, Mohit Anchlia mohitanch...@gmail.com wrote: Spark stream seems to be creating 0 bytes files even when there is no data. Also, I have 2 concerns here: 1) Extra unnecessary files is being created from the output 2) Hadoop doesn't work really well with too many files and I see that it is creating a directory with a timestamp every 1 second. Is there a better way of writing a file, may be use some kind of append mechanism where one doesn't have to change the batch interval.
Re: broadcast variable of Kafka producer throws ConcurrentModificationException
As long as Kafka producent is thread-safe you don't need any pool at all. Just share single producer on every executor. Please look at my blog post for more details. http://allegro.tech/spark-kafka-integration.html 19 sie 2015 2:00 AM Shenghua(Daniel) Wan wansheng...@gmail.com napisał(a): All of you are right. I was trying to create too many producers. My idea was to create a pool(for now the pool contains only one producer) shared by all the executors. After I realized it was related to the serializable issues (though I did not find clear clues in the source code to indicate the broacast template type parameter must be implement serializable), I followed spark cassandra connector design and created a singleton of Kafka producer pools. There is not exception noticed. Thanks for all your comments. On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das t...@databricks.com wrote: Why are you even trying to broadcast a producer? A broadcast variable is some immutable piece of serializable DATA that can be used for processing on the executors. A Kafka producer is neither DATA nor immutable, and definitely not serializable. The right way to do this is to create the producer in the executors. Please see the discussion in the programming guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger c...@koeninger.org wrote: I wouldn't expect a kafka producer to be serializable at all... among other things, it has a background thread On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan wansheng...@gmail.com wrote: Hi, Did anyone see java.util.ConcurrentModificationException when using broadcast variables? I encountered this exception when wrapping a Kafka producer like this in the spark streaming driver. Here is what I did. KafkaProducerString, String producer = new KafkaProducerString, String(properties); final BroadcastKafkaDataProducer bCastProducer = streamingContext.sparkContext().broadcast(producer); Then within an closure called by a foreachRDD, I was trying to get the wrapped producer, i.e. KafkaProducerString, String p = bCastProducer.value(); after rebuilding and rerunning, I got the stack trace like this Exception in thread main com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException Serialization trace: classes (sun.misc.Launcher$AppClassLoader) classloader (java.security.ProtectionDomain) context (java.security.AccessControlContext) acc (org.apache.spark.util.MutableURLClassLoader) contextClassLoader (org.apache.kafka.common.utils.KafkaThread) ioThread (org.apache.kafka.clients.producer.KafkaProducer) producer (my driver) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148) at
SparkR csv without headers
Hi, Does anyone have an example of how to create a DataFrame in SparkR which specifies the column names - the csv files I have do not have column names in the first row. I can get read a csv nicely with com.databricks:spark-csv_2.10:1.0.3, but I end up with column names C1, C2, C3 etc thanks -- *Franc Carter* I Systems ArchitectI RoZetta Technology [image: Description: Description: Description: cid:image003.jpg@01D02903.9B540580] L4. 55 Harrington Street, THE ROCKS, NSW, 2000 PO Box H58, Australia Square, Sydney NSW, 1215, AUSTRALIA *T* +61 2 8355 2515 Iwww.rozettatechnology.com [image: cid:image002.jpg@01D02903.0B41B280] DISCLAIMER: The contents of this email, inclusive of attachments, may be legally privileged and confidential. Any unauthorised use of the contents is expressly prohibited.
Repartitioning external table in Spark sql
I am using Spark 1.4.1 , in stand-alone mode, on a cluster of 3 nodes. Using Spark sql and Hive Context, I am trying to run a simple scan query on an existing Hive table (which is an external table consisting of rows in text files stored in HDFS - it is NOT parquet, ORC or any other richer format). DataFrame res = hiveCtx.sql(SELECT * FROM lineitem WHERE L_LINENUMBER 0); What I observe is the performance of this full scan in Spark is not comparable with Hive (it is almost 4 times slower). Checking the resource usage, what I see is workers/executors do not do parallel scans but they scan on a per-node basis (first executors from the worker(s) on node 1 do reading from disk, while other two nodes are not doing I/O and just receive data from the first node and through network, then 2nd node does the scan and then the third one). I also realized that if I load this data file directly from my spark context (using textFile() ) and run count() on that (not using spark sql) then I can get a better performance by increasing number of partitions. I am just trying to do the same thing (increasing number of partitions in the beginning) in Spark sql as: var tab = sqlContext.read.table(lineitem); tab.repartition(1000); OR tab.coalesce(1000); but none of repartition() or coalesce() methods actually work - they do not return an error, but if I check var p = tab.rdd.partitions.size; before and after calling any of them, it returns the same number of partitions. I am just wondering how I can change the number of partitions for a Hive external table, in Spark Sql. Any help/suggestion would be appreciated.
Re: broadcast variable of Kafka producer throws ConcurrentModificationException
Its a cool blog post! Tweeted it! Broadcasting the configuration necessary for lazily instantiating the producer is a good idea. Nitpick: The first code example has an extra `}` ;) On Tue, Aug 18, 2015 at 10:49 PM, Marcin Kuthan marcin.kut...@gmail.com wrote: As long as Kafka producent is thread-safe you don't need any pool at all. Just share single producer on every executor. Please look at my blog post for more details. http://allegro.tech/spark-kafka-integration.html 19 sie 2015 2:00 AM Shenghua(Daniel) Wan wansheng...@gmail.com napisał(a): All of you are right. I was trying to create too many producers. My idea was to create a pool(for now the pool contains only one producer) shared by all the executors. After I realized it was related to the serializable issues (though I did not find clear clues in the source code to indicate the broacast template type parameter must be implement serializable), I followed spark cassandra connector design and created a singleton of Kafka producer pools. There is not exception noticed. Thanks for all your comments. On Tue, Aug 18, 2015 at 4:28 PM, Tathagata Das t...@databricks.com wrote: Why are you even trying to broadcast a producer? A broadcast variable is some immutable piece of serializable DATA that can be used for processing on the executors. A Kafka producer is neither DATA nor immutable, and definitely not serializable. The right way to do this is to create the producer in the executors. Please see the discussion in the programming guide http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams On Tue, Aug 18, 2015 at 3:08 PM, Cody Koeninger c...@koeninger.org wrote: I wouldn't expect a kafka producer to be serializable at all... among other things, it has a background thread On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan wansheng...@gmail.com wrote: Hi, Did anyone see java.util.ConcurrentModificationException when using broadcast variables? I encountered this exception when wrapping a Kafka producer like this in the spark streaming driver. Here is what I did. KafkaProducerString, String producer = new KafkaProducerString, String(properties); final BroadcastKafkaDataProducer bCastProducer = streamingContext.sparkContext().broadcast(producer); Then within an closure called by a foreachRDD, I was trying to get the wrapped producer, i.e. KafkaProducerString, String p = bCastProducer.value(); after rebuilding and rerunning, I got the stack trace like this Exception in thread main com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException Serialization trace: classes (sun.misc.Launcher$AppClassLoader) classloader (java.security.ProtectionDomain) context (java.security.AccessControlContext) acc (org.apache.spark.util.MutableURLClassLoader) contextClassLoader (org.apache.kafka.common.utils.KafkaThread) ioThread (org.apache.kafka.clients.producer.KafkaProducer) producer (my driver) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at
Re: spark streaming 1.3 doubts(force it to not consume anything)
The superclass method in DStream is defined as returning an Option[RDD[T]] On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora shushantaror...@gmail.com wrote: Getting compilation error while overriding compute method of DirectKafkaInputDStream. [ERROR] CustomDirectKafkaInputDstream.java:[51,83] compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream cannot override compute(org.apache.spark.streaming.Time) in org.apache.spark.streaming.dstream.DStream; attempting to use incompatible return type [ERROR] found : scala.Optionorg.apache.spark.streaming.kafka.KafkaRDDbyte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][] [ERROR] required: scala.Optionorg.apache.spark.rdd.RDDbyte[][] class : public class CustomDirectKafkaInputDstream extends DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder, byte[][]{ @Override public OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder, byte[][] compute( Time validTime) { int processed=processedCounter.value(); int failed = failedProcessingsCounter.value(); if((processed==failed)){ System.out.println(backing off since its 100 % failure); return Option.empty(); }else{ System.out.println(starting the stream ); return super.compute(validTime); } } } What should be the return type of compute method ? super class is returning OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder, byte[][] but its expecting scala.Optionorg.apache.spark.rdd.RDDbyte[][] from derived class . Is there something wring with code? On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org wrote: Look at the definitions of the java-specific KafkaUtils.createDirectStream methods (the ones that take a JavaStreamingContext) On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora shushantaror...@gmail.com wrote: How to create classtag in java ?Also Constructor of DirectKafkaInputDStream takes Function1 not Function but kafkautils.createDirectStream allows function. I have below as overriden DirectKafkaInputDStream. public class CustomDirectKafkaInputDstream extends DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder, byte[][]{ public CustomDirectKafkaInputDstream( StreamingContext ssc_, MapString, String kafkaParams, MapTopicAndPartition, Object fromOffsets, Function1MessageAndMetadatabyte[], byte[], byte[][] messageHandler, ClassTagbyte[] evidence$1, ClassTagbyte[] evidence$2, ClassTagDefaultDecoder evidence$3, ClassTagDefaultDecoder evidence$4, ClassTagbyte[][] evidence$5) { super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, evidence$2, evidence$3, evidence$4, evidence$5); } @Override public OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder, byte[][] compute( Time validTime) { int processe=processedCounter.value(); int failed = failedProcessingsCounter.value(); if((processed==failed)){ System.out.println(backing off since its 100 % failure); return Option.empty(); }else{ System.out.println(starting the stream ); return super.compute(validTime); } } To create this stream I am using scala.collection.immutable.MapString, String scalakafkaParams = JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.Tuple2String, Stringconforms()); scala.collection.immutable.MapTopicAndPartition, Long scalaktopicOffsetMap= JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.Tuple2TopicAndPartition, Longconforms()); scala.Function1MessageAndMetadatabyte[], byte[], byte[][] handler = new FunctionMessageAndMetadatabyte[], byte[], byte[][]() { ..}); JavaDStreambyte[][] directKafkaStream = new CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap, handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class, kafka.serializer.DefaultDecoder.class,byte[][].class); How to pass classTag to constructor in CustomDirectKafkaInputDstream ? And how to use Function instead of Function1 ? On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger c...@koeninger.org wrote: I'm not aware of an existing api per se, but you could create your own subclass of the DStream that returns None for compute() under certain conditions. On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi Cody Can you help here if streaming 1.3 has any api for not consuming any message in next few runs? Thanks -- Forwarded message -- From: Shushant Arora shushantaror...@gmail.com Date: Wed, Aug 12, 2015 at 11:23 PM Subject: spark streaming 1.3 doubts(force it to not consume anything) To: user user@spark.apache.org I Can't make my stream application batch interval to change at run time . Its always fixed and it always creates jobs at specified batch inetval and enqueue them if earleir batch is not finished.
Re: Spark 1.4.1 - Mac OSX Yosemite
Looks like Scala 2.11.6 and Java 1.7.0_79. ✔ ~ 09:17 $ scala Welcome to Scala version 2.11.6 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_79). Type in expressions to have them evaluated. Type :help for more information. scala ✔ ~ 09:26 $ echo $JAVA_HOME /Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home On Mon, Aug 17, 2015 at 11:11 PM, Alun Champion a...@achampion.net wrote: Yes, they both are set. Just recompiled and still no success, silent failure. Which versions of java and scala are you using? On 17 August 2015 at 19:59, Charlie Hack charles.t.h...@gmail.com wrote: I had success earlier today on OSX Yosemite 10.10.4 building Spark 1.4.1 using these instructions http://genomegeek.blogspot.com/2014/11/how-to-install-apache-spark-on-mac-os-x.html (using `$ sbt/sbt clean assembly`, with the additional step of downloading the proper sbt-launch.jar (0.13.7) from here http://dl.bintray.com/typesafe/ivy-releases/org.scala-sbt/sbt-launch/0.13.7/ and replacing the one that is in build/ as you noted. You've set SCALA_HOME and JAVA_HOME environment variables? On Mon, Aug 17, 2015 at 8:36 PM, Alun Champion a...@achampion.net wrote: Has anyone experienced issues running Spark 1.4.1 on a Mac OSX Yosemite? I'm been running a standalone 1.3.1 fine but it failed when trying to run 1.4.1. (I also trie 1.4.0). I've tried both the pre-built packages as well as compiling from source, both with the same results (I can successfully compile with both mvn and sbt (after fixing the sbt.jar - which was corrupt) After downloading/building spark and running ./bin/pyspark or ./bin/spark-shell it silently exits with a code 1. Creating a context in python I get: Exception: Java gateway process exited before sending the driver its port number I couldn't find any specific resolutions on the web. I did add 'pyspark-shell' to the PYSPARK_SUBMIT_ARGS but to no effect. Anyone have any further ideas I can explore? Cheers -Alun. -- # +17344761472 -- # +17344761472
Re: Transform KafkaRDD to KafkaRDD, not plain RDD, or how to keep OffsetRanges after transformation
Python version has been available since 1.4. It should be close to feature parity with the jvm version in 1.5 On Tue, Aug 18, 2015 at 9:36 AM, ayan guha guha.a...@gmail.com wrote: Hi Cody A non-related question. Any idea when Python-version of direct receiver is expected? Me personally looking forward to it :) On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger c...@koeninger.org wrote: The solution you found is also in the docs: http://spark.apache.org/docs/latest/streaming-kafka-integration.html Java uses an atomic reference because Java doesn't allow you to close over non-final references. I'm not clear on your other question. On Tue, Aug 18, 2015 at 3:43 AM, Petr Novak oss.mli...@gmail.com wrote: The solution how to share offsetRanges after DirectKafkaInputStream is transformed is in: https://github.com/apache/spark/blob/master/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala https://github.com/apache/spark/blob/master/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java One thing I would like to understand is why Scala version is using normal variable while Java version uses AtomicReference. Another thing which I don't get is about closure serialization. The question why logger in the below code doesn't throw NPE even its instance isn't copied like in the case of offsetRanges, when val offsets = offsetRanges is removed form foreachRDD then mapPratitionsWithIndex throws on offsets(idx). I have something like this code: object StreamOps { val logger = LoggerFactory.getLogger(StreamOps) var offsetRanges = Array[OffsetRange]() def transform[T](stream: InputDStream[Array[Byte]]): DStream[T] = { stream transform { rdd = offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd flatmap { message = Try(... decode Array[Byte] to F...) match { case Success(fact) = Some(fact) case _ = None } } } // Error handling removed for brevity def save[F](stream: DStream[F]): Unit { stream foreachRDD { rdd = // It has to be here otherwise NullPointerException val offsets = offsetRanges rdd mapartitionWithIndex { (idx, facts) = // Use offsets here val writer = new MyWriter[F](offsets(idx), ...) facts foreach { fact = writer.write(fact) } writer.close() // Why logger works and doesn't throw NullPointerException? logger.info(...) Iterator.empty } foreach { (_: Nothing) = } } } Many thanks for any advice, I'm sure its a noob question. Petr On Mon, Aug 17, 2015 at 1:12 PM, Petr Novak oss.mli...@gmail.com wrote: Or can I generally create new RDD from transformation and enrich its partitions with some metadata so that I would copy OffsetRanges in my new RDD in DStream? On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak oss.mli...@gmail.com wrote: Hi all, I need to transform KafkaRDD into a new stream of deserialized case classes. I want to use the new stream to save it to file and to perform additional transformations on it. To save it I want to use offsets in filenames, hence I need OffsetRanges in transformed RDD. But KafkaRDD is private, hence I don't know how to do it. Alternatively I could deserialize directly in messageHandler before KafkaRDD but it seems it is 1:1 transformation while I need to drop bad messages (KafkaRDD = RDD it would be flatMap). Is there a way how to do it using messageHandler, is there another approach? Many thanks for any help. Petr -- Best Regards, Ayan Guha
Re: spark streaming 1.3 doubts(force it to not consume anything)
looking at source code of org.apache.spark.streaming.kafka.DirectKafkaInputDStream override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = { val untilOffsets = clamp(latestLeaderOffsets(maxRetries)) val rdd = KafkaRDD[K, V, U, T, R]( context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) currentOffsets = untilOffsets.map(kv = kv._1 - kv._2.offset) Some(rdd) } But in DStream its def compute (validTime: Time): Option[RDD[T]] , So what should be the return type of custom DStream extends DirectKafkaInputDStream . Since I want the behaviour to be same as of DirectKafkaInputDStream in normal scenarios and return none in specific scenario. And why the same error did not come while extending DirectKafkaInputDStream from InputDStream ? Since new return type Option[KafkaRDD[K, V, U, T, R]] is not subclass of Option[RDD[T] so it should have been failed? On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger c...@koeninger.org wrote: The superclass method in DStream is defined as returning an Option[RDD[T]] On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora shushantaror...@gmail.com wrote: Getting compilation error while overriding compute method of DirectKafkaInputDStream. [ERROR] CustomDirectKafkaInputDstream.java:[51,83] compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream cannot override compute(org.apache.spark.streaming.Time) in org.apache.spark.streaming.dstream.DStream; attempting to use incompatible return type [ERROR] found : scala.Optionorg.apache.spark.streaming.kafka.KafkaRDDbyte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][] [ERROR] required: scala.Optionorg.apache.spark.rdd.RDDbyte[][] class : public class CustomDirectKafkaInputDstream extends DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder, byte[][]{ @Override public OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder, byte[][] compute( Time validTime) { int processed=processedCounter.value(); int failed = failedProcessingsCounter.value(); if((processed==failed)){ System.out.println(backing off since its 100 % failure); return Option.empty(); }else{ System.out.println(starting the stream ); return super.compute(validTime); } } } What should be the return type of compute method ? super class is returning OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder, byte[][] but its expecting scala.Optionorg.apache.spark.rdd.RDDbyte[][] from derived class . Is there something wring with code? On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger c...@koeninger.org wrote: Look at the definitions of the java-specific KafkaUtils.createDirectStream methods (the ones that take a JavaStreamingContext) On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora shushantaror...@gmail.com wrote: How to create classtag in java ?Also Constructor of DirectKafkaInputDStream takes Function1 not Function but kafkautils.createDirectStream allows function. I have below as overriden DirectKafkaInputDStream. public class CustomDirectKafkaInputDstream extends DirectKafkaInputDStreambyte[], byte[], kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder, byte[][]{ public CustomDirectKafkaInputDstream( StreamingContext ssc_, MapString, String kafkaParams, MapTopicAndPartition, Object fromOffsets, Function1MessageAndMetadatabyte[], byte[], byte[][] messageHandler, ClassTagbyte[] evidence$1, ClassTagbyte[] evidence$2, ClassTagDefaultDecoder evidence$3, ClassTagDefaultDecoder evidence$4, ClassTagbyte[][] evidence$5) { super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, evidence$2, evidence$3, evidence$4, evidence$5); } @Override public OptionKafkaRDDbyte[], byte[], DefaultDecoder, DefaultDecoder, byte[][] compute( Time validTime) { int processe=processedCounter.value(); int failed = failedProcessingsCounter.value(); if((processed==failed)){ System.out.println(backing off since its 100 % failure); return Option.empty(); }else{ System.out.println(starting the stream ); return super.compute(validTime); } } To create this stream I am using scala.collection.immutable.MapString, String scalakafkaParams = JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.Tuple2String, Stringconforms()); scala.collection.immutable.MapTopicAndPartition, Long scalaktopicOffsetMap= JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.Tuple2TopicAndPartition, Longconforms()); scala.Function1MessageAndMetadatabyte[], byte[], byte[][] handler = new FunctionMessageAndMetadatabyte[], byte[], byte[][]() { ..}); JavaDStreambyte[][] directKafkaStream = new CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap, handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
Spark + Jupyter (IPython Notebook)
Hi spark users and developers, Did anyone have IPython Notebook (Jupyter) deployed in production that uses Spark as the computational engine? I know Databricks Cloud provides similar features with deeper integration with Spark. However, Databricks Cloud has to be hosted by Databricks so we cannot do this. Other solutions (e.g. Zeppelin) seem to reinvent the wheel that IPython has already offered years ago. It would be great if someone can educate me the reason behind this. Best Regards, Jerry
Re: Transform KafkaRDD to KafkaRDD, not plain RDD, or how to keep OffsetRanges after transformation
The solution you found is also in the docs: http://spark.apache.org/docs/latest/streaming-kafka-integration.html Java uses an atomic reference because Java doesn't allow you to close over non-final references. I'm not clear on your other question. On Tue, Aug 18, 2015 at 3:43 AM, Petr Novak oss.mli...@gmail.com wrote: The solution how to share offsetRanges after DirectKafkaInputStream is transformed is in: https://github.com/apache/spark/blob/master/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala https://github.com/apache/spark/blob/master/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java One thing I would like to understand is why Scala version is using normal variable while Java version uses AtomicReference. Another thing which I don't get is about closure serialization. The question why logger in the below code doesn't throw NPE even its instance isn't copied like in the case of offsetRanges, when val offsets = offsetRanges is removed form foreachRDD then mapPratitionsWithIndex throws on offsets(idx). I have something like this code: object StreamOps { val logger = LoggerFactory.getLogger(StreamOps) var offsetRanges = Array[OffsetRange]() def transform[T](stream: InputDStream[Array[Byte]]): DStream[T] = { stream transform { rdd = offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd flatmap { message = Try(... decode Array[Byte] to F...) match { case Success(fact) = Some(fact) case _ = None } } } // Error handling removed for brevity def save[F](stream: DStream[F]): Unit { stream foreachRDD { rdd = // It has to be here otherwise NullPointerException val offsets = offsetRanges rdd mapartitionWithIndex { (idx, facts) = // Use offsets here val writer = new MyWriter[F](offsets(idx), ...) facts foreach { fact = writer.write(fact) } writer.close() // Why logger works and doesn't throw NullPointerException? logger.info(...) Iterator.empty } foreach { (_: Nothing) = } } } Many thanks for any advice, I'm sure its a noob question. Petr On Mon, Aug 17, 2015 at 1:12 PM, Petr Novak oss.mli...@gmail.com wrote: Or can I generally create new RDD from transformation and enrich its partitions with some metadata so that I would copy OffsetRanges in my new RDD in DStream? On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak oss.mli...@gmail.com wrote: Hi all, I need to transform KafkaRDD into a new stream of deserialized case classes. I want to use the new stream to save it to file and to perform additional transformations on it. To save it I want to use offsets in filenames, hence I need OffsetRanges in transformed RDD. But KafkaRDD is private, hence I don't know how to do it. Alternatively I could deserialize directly in messageHandler before KafkaRDD but it seems it is 1:1 transformation while I need to drop bad messages (KafkaRDD = RDD it would be flatMap). Is there a way how to do it using messageHandler, is there another approach? Many thanks for any help. Petr
Java 8 lambdas
Hi Is there a way to execute spark jobs with Java 8 lambdas instead of using anonymous inner classes as seen in the examples? I think I remember seeing real lambdas in the examples before and in articles [1]? Cheers, -Kristoffer [1] http://blog.cloudera.com/blog/2014/04/making-apache-spark-easier-to-use-in-java-with-java-8 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark works with the data in another cluster(Elasticsearch)
Hi, Currently, I have my data in the cluster of Elasticsearch and I try to use spark to analyse those data. The cluster of Elasticsearch and the cluster of spark are two different clusters. And I use hadoop input format(es-hadoop) to read data in ES. I am wondering how this environment affect the speed of analysis. If I understand well, spark will read data from ES cluster and do calculate on its own cluster(include writing shuffle result on its own machine), Is this right? If this is correct, I think that the performance will just a little bit slower than the data stored on the same cluster. I will be appreciated if someone can share his/her experience about using spark with elasticsearch. Thanks a lot in advance for your help. Cheers Gen
Re: Java 8 lambdas
Yes, it should Just Work. lambdas can be used for any method that takes an instance of an interface with one method, and that describes Function, PairFunction, etc. On Tue, Aug 18, 2015 at 3:23 PM, Kristoffer Sjögren sto...@gmail.com wrote: Hi Is there a way to execute spark jobs with Java 8 lambdas instead of using anonymous inner classes as seen in the examples? I think I remember seeing real lambdas in the examples before and in articles [1]? Cheers, -Kristoffer [1] http://blog.cloudera.com/blog/2014/04/making-apache-spark-easier-to-use-in-java-with-java-8 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Scala: How to match a java object????
Hi all, I am trying to run a spark job, in which I receive java.math.BigDecimal objects, instead of the scala equivalents, and I am trying to convert them into Doubles. If I try to match-case this object class, I get: error: object java.math.BigDecimal is not a value How could I get around matching java objects? I would like to avoid a multiple try-catch on ClassCastExceptions for all my checks. Thank you, Saif
broadcast variable of Kafka producer throws ConcurrentModificationException
Hi, Did anyone see java.util.ConcurrentModificationException when using broadcast variables? I encountered this exception when wrapping a Kafka producer like this in the spark streaming driver. Here is what I did. KafkaProducerString, String producer = new KafkaProducerString, String(properties); final BroadcastKafkaDataProducer bCastProducer = streamingContext.sparkContext().broadcast(producer); Then within an closure called by a foreachRDD, I was trying to get the wrapped producer, i.e. KafkaProducerString, String p = bCastProducer.value(); after rebuilding and rerunning, I got the stack trace like this Exception in thread main com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException Serialization trace: classes (sun.misc.Launcher$AppClassLoader) classloader (java.security.ProtectionDomain) context (java.security.AccessControlContext) acc (org.apache.spark.util.MutableURLClassLoader) contextClassLoader (org.apache.kafka.common.utils.KafkaThread) ioThread (org.apache.kafka.clients.producer.KafkaProducer) producer (my driver) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:85) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289) at org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648) at my driver at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.util.ConcurrentModificationException at java.util.Vector$Itr.checkForComodification(Vector.java:1156) at java.util.Vector$Itr.next(Vector.java:1133) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67) at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at
Re: broadcast variable of Kafka producer throws ConcurrentModificationException
I wouldn't expect a kafka producer to be serializable at all... among other things, it has a background thread On Tue, Aug 18, 2015 at 4:55 PM, Shenghua(Daniel) Wan wansheng...@gmail.com wrote: Hi, Did anyone see java.util.ConcurrentModificationException when using broadcast variables? I encountered this exception when wrapping a Kafka producer like this in the spark streaming driver. Here is what I did. KafkaProducerString, String producer = new KafkaProducerString, String(properties); final BroadcastKafkaDataProducer bCastProducer = streamingContext.sparkContext().broadcast(producer); Then within an closure called by a foreachRDD, I was trying to get the wrapped producer, i.e. KafkaProducerString, String p = bCastProducer.value(); after rebuilding and rerunning, I got the stack trace like this Exception in thread main com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException Serialization trace: classes (sun.misc.Launcher$AppClassLoader) classloader (java.security.ProtectionDomain) context (java.security.AccessControlContext) acc (org.apache.spark.util.MutableURLClassLoader) contextClassLoader (org.apache.kafka.common.utils.KafkaThread) ioThread (org.apache.kafka.clients.producer.KafkaProducer) producer (my driver) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148) at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:203) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102) at org.apache.spark.broadcast.TorrentBroadcast.init(TorrentBroadcast.scala:85) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289) at org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:648) at my driver at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.util.ConcurrentModificationException at java.util.Vector$Itr.checkForComodification(Vector.java:1156) at
Spark and ActorSystem
Hi, I'd like to know where I could find more information related to the depreciation of the actor system in spark (from 1.4.x). I'm interested in the reasons for this decision, Cheers -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-ActorSystem-tp24321.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark + Jupyter (IPython Notebook)
Hi Jerry, Yes. I’ve seen customers using this in production for data science work. I’m currently using this for one of my projects on a cluster as well. Also, here is a blog that describes how to configure this. http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/ http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/ Guru Medasani gdm...@gmail.com On Aug 18, 2015, at 8:35 AM, Jerry Lam chiling...@gmail.com wrote: Hi spark users and developers, Did anyone have IPython Notebook (Jupyter) deployed in production that uses Spark as the computational engine? I know Databricks Cloud provides similar features with deeper integration with Spark. However, Databricks Cloud has to be hosted by Databricks so we cannot do this. Other solutions (e.g. Zeppelin) seem to reinvent the wheel that IPython has already offered years ago. It would be great if someone can educate me the reason behind this. Best Regards, Jerry
Spark executor lost because of GC overhead limit exceeded even though using 20 executors using 25GB each
Hi this GC overhead limit error is making me crazy. I have 20 executors using 25 GB each I dont understand at all how can it throw GC overhead I also dont that that big datasets. Once this GC error occurs in executor it will get lost and slowly other executors getting lost because of IOException, Rpc client disassociated, shuffle not found etc Please help me solve this I am getting mad as I am new to Spark. Thanks in advance. WARN scheduler.TaskSetManager: Lost task 7.0 in stage 363.0 (TID 3373, myhost.com): java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.spark.sql.types.UTF8String.toString(UTF8String.scala:150) at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:120) at org.apache.spark.sql.columnar.STRING$.actualSize(ColumnType.scala:312) at org.apache.spark.sql.columnar.compression.DictionaryEncoding$Encoder.gatherCompressibilityStats(compressionSchemes.scala:224) at org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.gatherCompressibilityStats(CompressibleColumnBuilder.scala:72) at org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.appendFrom(CompressibleColumnBuilder.scala:80) at org.apache.spark.sql.columnar.NativeColumnBuilder.appendFrom(ColumnBuilder.scala:87) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:148) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:124) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-executor-lost-because-of-GC-overhead-limit-exceeded-even-though-using-20-executors-using-25GB-h-tp24322.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org