Re: spark streaming job to hbase write
Is this map creation happening on client side ? But how does it know which RS will contain that row key in put operation until asking the .Meta. table . Does Hbase client first gets that ranges of keys of each Reagionservers and then group put objects based on Region servers ? On Fri, Jul 17, 2015 at 7:48 PM, Ted Yu yuzhih...@gmail.com wrote: Internally AsyncProcess uses a Map which is keyed by server name: MapServerName, MultiActionRow actionsByServer = new HashMapServerName, MultiActionRow(); Here MultiAction would group Put's in your example which are destined for the same server. Cheers On Fri, Jul 17, 2015 at 5:15 AM, Shushant Arora shushantaror...@gmail.com wrote: Thanks ! My key is random (hexadecimal). So hot spot should not be created. Is there any concept of bulk put. Say I want to raise a one put request for a 1000 size batch which will hit a region server instead of individual put for each key. Htable.put(ListPut) Does this handles batching of put based on regionserver to which they will land to finally. Say in my batch there are 10 puts- 5 for RS1,3 for RS3 and 2 for RS3. Does this handles that? On Thu, Jul 16, 2015 at 8:31 PM, Michael Segel michael_se...@hotmail.com wrote: You ask an interesting question… Lets set aside spark, and look at the overall ingestion pattern. Its really an ingestion pattern where your input in to the system is from a queue. Are the events discrete or continuous? (This is kinda important.) If the events are continuous then more than likely you’re going to be ingesting data where the key is somewhat sequential. If you use put(), you end up with hot spotting. And you’ll end up with regions half full. So you would be better off batching up the data and doing bulk imports. If the events are discrete, then you’ll want to use put() because the odds are you will not be using a sequential key. (You could, but I’d suggest that you rethink your primary key) Depending on the rate of ingestion, you may want to do a manual flush. (It depends on the velocity of data to be ingested and your use case ) (Remember what caching occurs and where when dealing with HBase.) A third option… Depending on how you use the data, you may want to avoid storing the data in HBase, and only use HBase as an index to where you store the data files for quick access. Again it depends on your data ingestion flow and how you intend to use the data. So really this is less a spark issue than an HBase issue when it comes to design. HTH -Mike On Jul 15, 2015, at 11:46 AM, Shushant Arora shushantaror...@gmail.com wrote: Hi I have a requirement of writing in hbase table from Spark streaming app after some processing. Is Hbase put operation the only way of writing to hbase or is there any specialised connector or rdd of spark for hbase write. Should Bulk load to hbase from streaming app be avoided if output of each batch interval is just few mbs? Thanks The opinions expressed here are mine, while they may reflect a cognitive thought, that is purely accidental. Use at your own risk. Michael Segel michael_segel (AT) hotmail.com
RE: java.lang.NoClassDefFoundError: Could not initialize class org.fusesource.jansi.internal.Kernel32
Does anybody have any idea what cuase this problem? Thanks. Ningjun From: Wang, Ningjun (LNG-NPV) Sent: Wednesday, July 15, 2015 11:09 AM To: user@spark.apache.org Subject: java.lang.NoClassDefFoundError: Could not initialize class org.fusesource.jansi.internal.Kernel32 I just installed spark 1.3.1 on windows 2008 server. When I start spark-shell, I got the following error Failed to created SparkJLineReader: java.lang.NoClassDefFoundError: Could not initialize class org.fusesource.jansi.internal.Kernel32 Please advise. Thanks. Ningjun
Re: spark streaming job to hbase write
It resorts to the following method for finding region location: private RegionLocations locateRegionInMeta(TableName tableName, byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException { Note: useCache value is true in this call path. Meaning the client side cache would be consulted to reduce RPC to server hosting hbase:meta Cheers On Fri, Jul 17, 2015 at 7:41 AM, Shushant Arora shushantaror...@gmail.com wrote: Is this map creation happening on client side ? But how does it know which RS will contain that row key in put operation until asking the .Meta. table . Does Hbase client first gets that ranges of keys of each Reagionservers and then group put objects based on Region servers ? On Fri, Jul 17, 2015 at 7:48 PM, Ted Yu yuzhih...@gmail.com wrote: Internally AsyncProcess uses a Map which is keyed by server name: MapServerName, MultiActionRow actionsByServer = new HashMapServerName, MultiActionRow(); Here MultiAction would group Put's in your example which are destined for the same server. Cheers On Fri, Jul 17, 2015 at 5:15 AM, Shushant Arora shushantaror...@gmail.com wrote: Thanks ! My key is random (hexadecimal). So hot spot should not be created. Is there any concept of bulk put. Say I want to raise a one put request for a 1000 size batch which will hit a region server instead of individual put for each key. Htable.put(ListPut) Does this handles batching of put based on regionserver to which they will land to finally. Say in my batch there are 10 puts- 5 for RS1,3 for RS3 and 2 for RS3. Does this handles that? On Thu, Jul 16, 2015 at 8:31 PM, Michael Segel michael_se...@hotmail.com wrote: You ask an interesting question… Lets set aside spark, and look at the overall ingestion pattern. Its really an ingestion pattern where your input in to the system is from a queue. Are the events discrete or continuous? (This is kinda important.) If the events are continuous then more than likely you’re going to be ingesting data where the key is somewhat sequential. If you use put(), you end up with hot spotting. And you’ll end up with regions half full. So you would be better off batching up the data and doing bulk imports. If the events are discrete, then you’ll want to use put() because the odds are you will not be using a sequential key. (You could, but I’d suggest that you rethink your primary key) Depending on the rate of ingestion, you may want to do a manual flush. (It depends on the velocity of data to be ingested and your use case ) (Remember what caching occurs and where when dealing with HBase.) A third option… Depending on how you use the data, you may want to avoid storing the data in HBase, and only use HBase as an index to where you store the data files for quick access. Again it depends on your data ingestion flow and how you intend to use the data. So really this is less a spark issue than an HBase issue when it comes to design. HTH -Mike On Jul 15, 2015, at 11:46 AM, Shushant Arora shushantaror...@gmail.com wrote: Hi I have a requirement of writing in hbase table from Spark streaming app after some processing. Is Hbase put operation the only way of writing to hbase or is there any specialised connector or rdd of spark for hbase write. Should Bulk load to hbase from streaming app be avoided if output of each batch interval is just few mbs? Thanks The opinions expressed here are mine, while they may reflect a cognitive thought, that is purely accidental. Use at your own risk. Michael Segel michael_segel (AT) hotmail.com
Re: Spark APIs memory usage?
Can you paste the code? How much memory does your system have and how big is your dataset? Did you try df.persist(StorageLevel.MEMORY_AND_DISK)? Thanks Best Regards On Fri, Jul 17, 2015 at 5:14 PM, Harit Vishwakarma harit.vishwaka...@gmail.com wrote: Thanks, Code is running on a single machine. And it still doesn't answer my question. On Fri, Jul 17, 2015 at 4:52 PM, ayan guha guha.a...@gmail.com wrote: You can bump up number of partitions while creating the rdd you are using for df On 17 Jul 2015 21:03, Harit Vishwakarma harit.vishwaka...@gmail.com wrote: Hi, I used createDataFrame API of SqlContext in python. and getting OutOfMemoryException. I am wondering if it is creating whole dataFrame in memory? I did not find any documentation describing memory usage of Spark APIs. Documentation given is nice but little more details (specially on memory usage/ data distribution etc.) will really help. -- Regards Harit Vishwakarma -- Regards Harit Vishwakarma
Re: Spark APIs memory usage?
1. load 3 matrices of size ~ 1 X 1 using numpy. 2. rdd2 = rdd1.values().flatMap( fun ) # rdd1 has roughly 10^7 tuples 3. df = sqlCtx.createDataFrame(rdd2) 4. df.save() # in parquet format It throws exception in createDataFrame() call. I don't know what exactly it is creating ? everything in memory? or can I make it to persist simultaneously while getting created. Thanks On Fri, Jul 17, 2015 at 5:16 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you paste the code? How much memory does your system have and how big is your dataset? Did you try df.persist(StorageLevel.MEMORY_AND_DISK)? Thanks Best Regards On Fri, Jul 17, 2015 at 5:14 PM, Harit Vishwakarma harit.vishwaka...@gmail.com wrote: Thanks, Code is running on a single machine. And it still doesn't answer my question. On Fri, Jul 17, 2015 at 4:52 PM, ayan guha guha.a...@gmail.com wrote: You can bump up number of partitions while creating the rdd you are using for df On 17 Jul 2015 21:03, Harit Vishwakarma harit.vishwaka...@gmail.com wrote: Hi, I used createDataFrame API of SqlContext in python. and getting OutOfMemoryException. I am wondering if it is creating whole dataFrame in memory? I did not find any documentation describing memory usage of Spark APIs. Documentation given is nice but little more details (specially on memory usage/ data distribution etc.) will really help. -- Regards Harit Vishwakarma -- Regards Harit Vishwakarma -- Regards Harit Vishwakarma
Re: Problem in Understanding concept of Physical Cores
Dear Community Request to help on below queries they are unanswered. Thanks and Regards Aniruddh On Wed, Jul 15, 2015 at 12:37 PM, Aniruddh Sharma asharma...@gmail.com wrote: Hi TD, Request your guidance on below 5 queries. Following is the context of them that I would use to evaluate based on your response. a) I need to decide whether to deploy Spark in Standalone mode or in Yarn. But it seems to me that Spark in Yarn is more parallel than Standalone mode (given same number of Physical cores) as it is possible to increase execution threads in Yarn by --executor-cores method b) Also need to understand following which is not clearly understandable. Other persons in mailing list are also raising this query in another words for different cases while doing tuning of jobs. Theoretically a JVM can support thousands of threads. But in context of Spark what is advisable usage of ratio of physical cores to ratio of threads to be created to ratio of partitions to be created. If you find this relevant and important then might be there could be better link to explain this both in Yarn and Stand Alone mode. Thanks and Regards Aniruddh On Fri, Jul 10, 2015 at 11:45 AM, Aniruddh Sharma asharma...@gmail.com wrote: Hi TD, Thanks for elaboration. I have further doubts based on further test that I did after your guidance Case 1: Standalone Spark-- In standalone mode, as you explained,master in spark-submit local[*] implicitly, so it uses as creates threads as the number of cores that VM has, but User can control the number of partitions which needs to be created and in accordance with number of partitions, tasks will be created. Query 1: If I have 4 cores, then 4 threads will be created but if I give 40 partitions to my data, than 40 tasks will be created which needs to be executed on 4 threads. Does it work this way, that 4 threads execute 4 tasks (out of 40 in parallel) and when first set of task gets complete then they pick next 4 tasks and then they ask execute tasks in sequential manner. That is 4 tasks concurrent but rest of tasks in sequence when first concurrent set gets complete. Query 2: When we pass total-num-cores to Spark in StandAlone mode, then it seems number of threads do not increase. When I execute sc.defaultParallelism then it does not seem to take any effect on passed total-num-cores parameter. So when we use this parameter what does it exactly mean. Does it control number of threads or does it say to Spark Master to provide these many number of physical cores to this job. I mean is this parameter relevant not for a single job but if multiple jobs are running in cluster than to tell Spark Scheduler not to overallocate resources to a single job. Also setting this parameter, does it guarantee any behavior or is it only an indicator for Spark Scheduler. Case 2: Spark on Yarn In Spark on Yarn, it seems that threads which get created is not based on number of physical cores underlying. Query 3: But it seems to be (defaultMinPartition * executor-cores). Is this understanding correct. If yes then does it mean Developer has a control on number of threads to request to Spark by passing executor-core option (which was not there in Standalone mode as number of threads was based on number of physical cores). Is there a special reason for this kind of difference Query 4: Also it seems there is a restriction on value I can pass in executor-cores option which seems to be dependent on underlying physical cores. For example If I have 4 cores and I pass this value to be 20 then it works, but if I pass this value to be 100 then it does not work. So it seems actual number of threads which can be created inside JVM are still limited by number of physical cores but it can be controlled by executor-cores option. Kindly elaborate what is best practice to request how many threads based on physical cores and how physical cores limit this behavior. Query 5: Is there a reason for difference in behavior of total-num-cores (does not create a thread ) in Stand Alone mode and exectuor-cores( creates thread) in Yarn mode in how threads to be created. It seems in Yarn mode we can create more threads in same Executor JVM compated to Standalone mode for same number of physical cores. Thanks and Regards Aniruddh On Thu, Jul 9, 2015 at 4:30 PM, Tathagata Das t...@databricks.com wrote: Query 1) What spark runs is tasks in task slots, whatever is the mapping ot tasks to physical cores it does not matter. If there are two task slots (2 threads in local mode, or an executor with 2 task slots in distributed mode), it can only run two tasks concurrently. That is true even if the task is really not doing much. There is no multiplexing going on between tasks and task slots. So to answer your query 1, there is 1 thread that is permanently allocated to the receiver task (a long running task) even if it does not do much. There is no thread left to
Re: Spark APIs memory usage?
Thanks, Code is running on a single machine. And it still doesn't answer my question. On Fri, Jul 17, 2015 at 4:52 PM, ayan guha guha.a...@gmail.com wrote: You can bump up number of partitions while creating the rdd you are using for df On 17 Jul 2015 21:03, Harit Vishwakarma harit.vishwaka...@gmail.com wrote: Hi, I used createDataFrame API of SqlContext in python. and getting OutOfMemoryException. I am wondering if it is creating whole dataFrame in memory? I did not find any documentation describing memory usage of Spark APIs. Documentation given is nice but little more details (specially on memory usage/ data distribution etc.) will really help. -- Regards Harit Vishwakarma -- Regards Harit Vishwakarma
Is it possible to set the number of cores per executor on standalone cluster?
Is it possible to set the number of cores per executor on standalone cluster? Because we find that, cores distribution may be very skewed on executor at some time, so the workload is skewed, that make our job become slow. Thanks! -- 郑旭东 Zheng, Xudong
Re: spark streaming job to hbase write
Thanks ! My key is random (hexadecimal). So hot spot should not be created. Is there any concept of bulk put. Say I want to raise a one put request for a 1000 size batch which will hit a region server instead of individual put for each key. Htable.put(ListPut) Does this handles batching of put based on regionserver to which they will land to finally. Say in my batch there are 10 puts- 5 for RS1,3 for RS3 and 2 for RS3. Does this handles that? On Thu, Jul 16, 2015 at 8:31 PM, Michael Segel michael_se...@hotmail.com wrote: You ask an interesting question… Lets set aside spark, and look at the overall ingestion pattern. Its really an ingestion pattern where your input in to the system is from a queue. Are the events discrete or continuous? (This is kinda important.) If the events are continuous then more than likely you’re going to be ingesting data where the key is somewhat sequential. If you use put(), you end up with hot spotting. And you’ll end up with regions half full. So you would be better off batching up the data and doing bulk imports. If the events are discrete, then you’ll want to use put() because the odds are you will not be using a sequential key. (You could, but I’d suggest that you rethink your primary key) Depending on the rate of ingestion, you may want to do a manual flush. (It depends on the velocity of data to be ingested and your use case ) (Remember what caching occurs and where when dealing with HBase.) A third option… Depending on how you use the data, you may want to avoid storing the data in HBase, and only use HBase as an index to where you store the data files for quick access. Again it depends on your data ingestion flow and how you intend to use the data. So really this is less a spark issue than an HBase issue when it comes to design. HTH -Mike On Jul 15, 2015, at 11:46 AM, Shushant Arora shushantaror...@gmail.com wrote: Hi I have a requirement of writing in hbase table from Spark streaming app after some processing. Is Hbase put operation the only way of writing to hbase or is there any specialised connector or rdd of spark for hbase write. Should Bulk load to hbase from streaming app be avoided if output of each batch interval is just few mbs? Thanks The opinions expressed here are mine, while they may reflect a cognitive thought, that is purely accidental. Use at your own risk. Michael Segel michael_segel (AT) hotmail.com
Re: spark streaming job to hbase write
Internally AsyncProcess uses a Map which is keyed by server name: MapServerName, MultiActionRow actionsByServer = new HashMapServerName, MultiActionRow(); Here MultiAction would group Put's in your example which are destined for the same server. Cheers On Fri, Jul 17, 2015 at 5:15 AM, Shushant Arora shushantaror...@gmail.com wrote: Thanks ! My key is random (hexadecimal). So hot spot should not be created. Is there any concept of bulk put. Say I want to raise a one put request for a 1000 size batch which will hit a region server instead of individual put for each key. Htable.put(ListPut) Does this handles batching of put based on regionserver to which they will land to finally. Say in my batch there are 10 puts- 5 for RS1,3 for RS3 and 2 for RS3. Does this handles that? On Thu, Jul 16, 2015 at 8:31 PM, Michael Segel michael_se...@hotmail.com wrote: You ask an interesting question… Lets set aside spark, and look at the overall ingestion pattern. Its really an ingestion pattern where your input in to the system is from a queue. Are the events discrete or continuous? (This is kinda important.) If the events are continuous then more than likely you’re going to be ingesting data where the key is somewhat sequential. If you use put(), you end up with hot spotting. And you’ll end up with regions half full. So you would be better off batching up the data and doing bulk imports. If the events are discrete, then you’ll want to use put() because the odds are you will not be using a sequential key. (You could, but I’d suggest that you rethink your primary key) Depending on the rate of ingestion, you may want to do a manual flush. (It depends on the velocity of data to be ingested and your use case ) (Remember what caching occurs and where when dealing with HBase.) A third option… Depending on how you use the data, you may want to avoid storing the data in HBase, and only use HBase as an index to where you store the data files for quick access. Again it depends on your data ingestion flow and how you intend to use the data. So really this is less a spark issue than an HBase issue when it comes to design. HTH -Mike On Jul 15, 2015, at 11:46 AM, Shushant Arora shushantaror...@gmail.com wrote: Hi I have a requirement of writing in hbase table from Spark streaming app after some processing. Is Hbase put operation the only way of writing to hbase or is there any specialised connector or rdd of spark for hbase write. Should Bulk load to hbase from streaming app be avoided if output of each batch interval is just few mbs? Thanks The opinions expressed here are mine, while they may reflect a cognitive thought, that is purely accidental. Use at your own risk. Michael Segel michael_segel (AT) hotmail.com
Re: Spark APIs memory usage?
I suspect its the numpy filling up Memory. Thanks Best Regards On Fri, Jul 17, 2015 at 5:46 PM, Harit Vishwakarma harit.vishwaka...@gmail.com wrote: 1. load 3 matrices of size ~ 1 X 1 using numpy. 2. rdd2 = rdd1.values().flatMap( fun ) # rdd1 has roughly 10^7 tuples 3. df = sqlCtx.createDataFrame(rdd2) 4. df.save() # in parquet format It throws exception in createDataFrame() call. I don't know what exactly it is creating ? everything in memory? or can I make it to persist simultaneously while getting created. Thanks On Fri, Jul 17, 2015 at 5:16 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you paste the code? How much memory does your system have and how big is your dataset? Did you try df.persist(StorageLevel.MEMORY_AND_DISK)? Thanks Best Regards On Fri, Jul 17, 2015 at 5:14 PM, Harit Vishwakarma harit.vishwaka...@gmail.com wrote: Thanks, Code is running on a single machine. And it still doesn't answer my question. On Fri, Jul 17, 2015 at 4:52 PM, ayan guha guha.a...@gmail.com wrote: You can bump up number of partitions while creating the rdd you are using for df On 17 Jul 2015 21:03, Harit Vishwakarma harit.vishwaka...@gmail.com wrote: Hi, I used createDataFrame API of SqlContext in python. and getting OutOfMemoryException. I am wondering if it is creating whole dataFrame in memory? I did not find any documentation describing memory usage of Spark APIs. Documentation given is nice but little more details (specially on memory usage/ data distribution etc.) will really help. -- Regards Harit Vishwakarma -- Regards Harit Vishwakarma -- Regards Harit Vishwakarma
RE: Select all columns except some
Hello, thank you for your time. Seq[String] works perfectly fine. I also tried running a for loop through all elements to see if any access to a value was broken, but no, they are alright. For now, I solved it properly calling this. Sadly, it takes a lot of time, but works: var data_sas = sqlContext.read.format(com.github.saurfang.sas.spark).load(/path/to/file.s) data_sas.cache for (col - clean_cols) { data_sas = data_sas.drop(col) } data_sas.unpersist Saif From: Yana Kadiyska [mailto:yana.kadiy...@gmail.com] Sent: Thursday, July 16, 2015 12:58 PM To: Ellafi, Saif A. Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Select all columns except some Have you tried to examine what clean_cols contains -- I'm suspect of this part mkString(“, “). Try this: val clean_cols : Seq[String] = df.columns... if you get a type error you need to work on clean_cols (I suspect yours is of type String at the moment and presents itself to Spark as a single column names with commas embedded). Not sure why the .drop call hangs but in either case drop returns a new dataframe -- it's not a setter call On Thu, Jul 16, 2015 at 10:57 AM, saif.a.ell...@wellsfargo.commailto:saif.a.ell...@wellsfargo.com wrote: Hi, In a hundred columns dataframe, I wish to either select all of them except or drop the ones I dont want. I am failing in doing such simple task, tried two ways val clean_cols = df.columns.filterNot(col_name = col_name.startWith(“STATE_”).mkString(“, “) df.select(clean_cols) But this throws exception: org.apache.spark.sql.AnalysisException: cannot resolve 'asd_dt, industry_area,...’ at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:63) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:52) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:285) at org.apache.spark.sql.catalyst.plans.QueryPlan.orghttp://org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionUp$1(QueryPlan.scala:108) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2$$anonfun$apply$2.apply(QueryPlan.scala:123) The other thing I tried is df.columns.filter(col_name = col_name.startWith(“STATE_”) for (col - cols) df.drop(col) But this other thing doesn’t do anything or hangs up. Saif
Adding meetup groups to Community page - Moscow, Slovenia, Zagreb
Dear all, The page https://spark.apache.org/community.html Says : If you'd like your meetup added, email user@spark.apache.org. So here I am emailing, could please someone add three new groups to the page Moscow : http://www.meetup.com/Apache-Spark-in-Moscow/ Slovenija (Ljubljana) http://www.meetup.com/Apache-Spark-Ljubljana-Meetup/ Zagreb http://www.meetup.com/Apache-Spark-Zagreb-Meetup/
Nullpointer when saving as table with a timestamp column type
So I have a very simple dataframe that looks like df: [name:String, Place:String, time: time:timestamp] I build this java.sql.Timestamp from a string and it works really well expect when I call saveAsTable(tableName) on this df. Without the timestamp, it saves fine but with the timestamp, it throws java.lang.NullPointerException Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1230) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1219) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1218) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1218) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:719) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:719) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:719) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1380) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Any ideas how I can get around this?
Re: Spark streaming Processing time keeps increasing
Responses inline. On Thu, Jul 16, 2015 at 9:27 PM, N B nb.nos...@gmail.com wrote: Hi TD, Yes, we do have the invertible function provided. However, I am not sure I understood how to use the filterFunction. Is there an example somewhere showing its usage? The header comment on the function says : * @param filterFunc function to filter expired key-value pairs; * only pairs that satisfy the function are retained * set this to null if you do not want to filter These are the questions I am confused about: 1. The code comment seems to imply that the filterFunc is only used to figure out which keyvalue pairs are used to form the window but how does it actually help expire the old data? It applies to filter and retains only the keys that pass through it. Underneath, its all RDDs, so only the filtered K, V pairs are retained (and cached) for future batches. 2. Shouldn't the values that are falling off of the window period automatically be removed without the need for an additional filter function? It cannot figure out the falling off the in this incremental version. For example, if you are counting over the window by adding (reduceFunc) and subtracting (invRedueFunc), unless your provided the concept of a zero , it will not know when to throw away the keys that have become 0. Over a window, the count may increase from nothing to 10, and then reduce 0 when the window moves forward, but it does not know 0 means dont track it any more. The filter function introduces that concept of zero. 3. Which side of the key-value pairs are passed to this function? The ones that are coming in or the ones that are going out of window or both? All of the k,v pairs that are being tracked. 4. The key-value pairs in use in a particular reduceByKeyAndWindow operation may not have the requisite info (such as a timestamp or similar eg if its aggregated data) to help determine whether to return true or false. What is the semantic expected here? I am not sure I get your question. It is upto you to provide sufficient information as part of the value so that you can take that decision in the filter function. As always, thanks for your help Nikunj On Thu, Jul 16, 2015 at 1:16 AM, Tathagata Das t...@databricks.com wrote: MAke sure you provide the filterFunction with the invertible reduceByKeyAndWindow. Otherwise none of the keys will get removed, and the key space will continue increase. This is what is leading to the lag. So use the filtering function to filter out the keys that are not needed any more. On Thu, Jul 16, 2015 at 12:44 AM, Akhil Das ak...@sigmoidanalytics.com wrote: What is your data volume? Are you having checkpointing/WAL enabled? In that case make sure you are having SSD disks as this behavior is mainly due to the IO wait. Thanks Best Regards On Thu, Jul 16, 2015 at 8:43 AM, N B nb.nos...@gmail.com wrote: Hello, We have a Spark streaming application and the problem that we are encountering is that the batch processing time keeps on increasing and eventually causes the application to start lagging. I am hoping that someone here can point me to any underlying cause of why this might happen. The batch interval is 1 minute as of now and the app does some maps, filters, joins and reduceByKeyAndWindow operations. All the reduces are invertible functions and so we do provide the inverse-reduce functions in all those. The largest window size we have is 1 hour right now. When the app is started, we see that the batch processing time is between 20 and 30 seconds. It keeps creeping up slowly and by the time it hits the 1 hour mark, it somewhere around 35-40 seconds. Somewhat expected and still not bad! I would expect that since the largest window we have is 1 hour long, the application should stabilize around the 1 hour mark and start processing subsequent batches within that 35-40 second zone. However, that is not what is happening. The processing time still keeps increasing and eventually in a few hours it exceeds 1 minute mark and then starts lagging. Eventually the lag builds up and becomes in minutes at which point we have to restart the system. Any pointers on why this could be happening and what we can do to troubleshoot further? Thanks Nikunj
use S3-Compatible Storage with spark
Hi, I wonder how to use S3 compatible Storage in Spark ? If I'm using s3n:// url schema, the it will point to amazon, is there a way I can specify the host somewhere ? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Java 8 vs Scala
If you takes time to actually learn Scala starting from its fundamental concepts AND quite importantly get familiar with general functional programming concepts, you'd immediately realize the things that you'd really miss going back to Java (8). On Fri, Jul 17, 2015 at 8:14 AM Wojciech Pituła w.pit...@gmail.com wrote: IMHO only Scala is an option. Once you're familiar with it you just cant even look at java code. czw., 16.07.2015 o 07:20 użytkownik spark user spark_u...@yahoo.com.invalid napisał: I struggle lots in Scala , almost 10 days n0 improvement , but when i switch to Java 8 , things are so smooth , and I used Data Frame with Redshift and Hive all are looking good . if you are very good In Scala the go with Scala otherwise Java is best fit . This is just my openion because I am Java guy. On Wednesday, July 15, 2015 12:33 PM, vaquar khan vaquar.k...@gmail.com wrote: My choice is java 8 On 15 Jul 2015 18:03, Alan Burlison alan.burli...@oracle.com wrote: On 15/07/2015 08:31, Ignacio Blasco wrote: The main advantage of using scala vs java 8 is being able to use a console https://bugs.openjdk.java.net/browse/JDK-8043364 -- Alan Burlison -- - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Master HA on YARN
Hi, Is Spark master high availability supported on YARN (yarn-client mode) analogous to https://spark.apache.org/docs/1.4.0/spark-standalone.html#high-availability? Thanks Bhaskie
Unread block data error
Hi, I have been running a batch of data through my application for the last couple of days and this morning discovered it had fallen over with the following error. java.lang.IllegalStateException: unread block data at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2376) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1360) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350) at spark.JavaDeserializationStream.readObject(JavaSerializer.scala:23) at spark.JavaSerializerInstance.deserialize(JavaSerializer.scala:45) at spark.executor.Executor$TaskRunner.run(Executor.scala:73) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:662) I have seen this once before and it turned out to be version clashes, however this time no dependencies have changed since the last successful run? I am using Spark 1.2 on CDH 5.3.2 Any ideas would be greatly appreciated! Thanks, Jem
Re: it seem like the exactly once feature not work on spark1.4
I see now. There are three steps in SparkStreaming + Kafka date processing 1.Receiving the data 2.Transforming the data 3.Pushing out the data SparkStreaming + Kafka only provide an exactly-once guarantee on step 1 2 We need to ensure exactly once on step 3 by myself. More details see base on http://spark.apache.org/docs/latest/streaming-programming-guide.html http://spark.apache.org/docs/latest/streaming-programming-guide.html -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/it-seem-like-the-exactly-once-feature-not-work-on-spark1-4-tp23871p23884.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.3.1 + Hive: write output to CSV with header on S3
Hi Roberto I have question regarding HiveContext . when you create HiveContext where you define Hive connection properties ? Suppose Hive is not in local machine i need to connect , how HiveConext will know the data base info like url ,username and password ? String username = ; String password = ; String url = jdbc:hive2://quickstart.cloudera:1/default; On Friday, July 17, 2015 2:29 AM, Roberto Coluccio roberto.coluc...@gmail.com wrote: Hello community, I'm currently using Spark 1.3.1 with Hive support for outputting processed data on an external Hive table backed on S3. I'm using a manual specification of the delimiter, but I'd want to know if is there any clean way to write in CSV format: val sparkConf = new SparkConf()val sc = new SparkContext(sparkConf)val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)import hiveContext.implicits._ hiveContext.sql( CREATE EXTERNAL TABLE IF NOT EXISTS table_name(field1 STRING, field2 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION ' + path_on_s3 + ')hiveContext.sql(an INSERT OVERWRITE query to write into the above table) I also need the header of the table to be printed on each written file. I tried with: hiveContext.sql(set hive.cli.print.header=true) But it didn't work. Any hint? Thank you. Best regards,Roberto
Re: ALS run method versus ALS train versus ALS fit and transform
the new ALS()...run() form is underneath both of the first two. I am not sure what you mean by underneath, so basically the mllib ALS()...run() does the same thing as the mllib ALS train() ? On Wed, Jul 15, 2015 at 2:02 PM, Sean Owen so...@cloudera.com wrote: The first two examples are from the .mllib API. Really, the new ALS()...run() form is underneath both of the first two. In the second case, you're calling a convenience method that calls something similar to the first example. The second example is from the new .ml pipelines API. Similar ideas, but a different API. On Wed, Jul 15, 2015 at 9:55 PM, Carol McDonald cmcdon...@maprtech.com wrote: In the Spark mllib examples MovieLensALS.scala ALS run is used, however in the movie recommendation with mllib tutorial ALS train is used , What is the difference, when should you use one versus the other val model = new ALS() .setRank(params.rank) .setIterations(params.numIterations) .setLambda(params.lambda) .setImplicitPrefs(params.implicitPrefs) .setUserBlocks(params.numUserBlocks) .setProductBlocks(params.numProductBlocks) .run(training) val model = ALS.train(training, rank, numIter, lambda) Also in org.apache.spark.examples.ml , fit and transform is used. Which one do you recommend using ? val als = new ALS() .setUserCol(userId) .setItemCol(movieId) .setRank(params.rank) .setMaxIter(params.maxIter) .setRegParam(params.regParam) .setNumBlocks(params.numBlocks) val model = als.fit(training.toDF()) val predictions = model.transform(test.toDF()).cache()
Re: ALS run method versus ALS train versus ALS fit and transform
Yes, just have a look at the method in the source code. It calls new ALS()run(). It's a convenience wrapper only. On Fri, Jul 17, 2015 at 4:59 PM, Carol McDonald cmcdon...@maprtech.com wrote: the new ALS()...run() form is underneath both of the first two. I am not sure what you mean by underneath, so basically the mllib ALS()...run()does the same thing as the mllib ALS train() ? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: The auxService:spark_shuffle does not exist
I have encountered the same problem after following the document. Here's my spark-defaults.confspark.shuffle.service.enabled true spark.dynamicAllocation.enabled true spark.dynamicAllocation.executorIdleTimeout 60 spark.dynamicAllocation.cachedExecutorIdleTimeout 120 spark.dynamicAllocation.initialExecutors 2 spark.dynamicAllocation.maxExecutors 8 spark.dynamicAllocation.minExecutors 1 spark.dynamicAllocation.schedulerBacklogTimeout 10 and yarn-site.xml configured. property nameyarn.nodemanager.aux-services/name valuespark_shuffle,mapreduce_shuffle/value /property ... property nameyarn.nodemanager.aux-services.spark_shuffle.class/name valueorg.apache.spark.network.yarn.YarnShuffleService/value /property and deployed the 2 JARs to NodeManager's classpath /opt/hadoop/share/hadoop/mapreduce/. (I also checked the NodeManager log and the JARs appear in the classpath). I notice that the JAR location is not the same as the document in 1.4. I found them under network/yarn/target and network/shuffle/target/ after building it with -Phadoop-2.4 -Psparkr -Pyarn -Phive -Phive-thriftserver in maven. spark-network-yarn_2.10-1.4.1.jarspark-network-shuffle_2.10-1.4.1.jar and still getting the following exception. Exception in thread ContainerLauncher #0 java.lang.Error: org.apache.spark.SparkException: Exception while starting container container_1437141440985_0003_01_02 on host alee-ci-2058-slave-2.test.altiscale.com at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: org.apache.spark.SparkException: Exception while starting container container_1437141440985_0003_01_02 on host alee-ci-2058-slave-2.test.altiscale.com at org.apache.spark.deploy.yarn.ExecutorRunnable.startContainer(ExecutorRunnable.scala:116) at org.apache.spark.deploy.yarn.ExecutorRunnable.run(ExecutorRunnable.scala:67) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) ... 2 more Caused by: org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException: The auxService:spark_shuffle does not exist at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:152) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106) Not sure what else am I missing here or doing wrong? Appreciate any insights or feedback, thanks. Date: Wed, 8 Jul 2015 09:25:39 +0800 Subject: Re: The auxService:spark_shuffle does not exist From: zjf...@gmail.com To: rp...@njit.edu CC: user@spark.apache.org Did you enable the dynamic resource allocation ? You can refer to this page for how to configure spark shuffle service for yarn. https://spark.apache.org/docs/1.4.0/job-scheduling.html On Tue, Jul 7, 2015 at 10:55 PM, roy rp...@njit.edu wrote: we tried --master yarn-client with no different result. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/The-auxService-spark-shuffle-does-not-exist-tp23662p23689.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Best Regards Jeff Zhang
spark-shell with Yarn failed
Hello, First of all I m a newbie in Spark , I m trying to start the spark-shell with yarn cluster by running: $ spark-shell --master yarn-client Sometimes it goes well, but most of the time I got an error: Container exited with a non-zero exit code 10 Failing this attempt. Failing the application. ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: default start time: 1437145851944 final status: FAILED tracking URL: http://My-HadoopServer:50080/cluster/app/application_143708028_0030 user: hadoop org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master. at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.waitForApplication(YarnClientSchedulerBackend.scala:115) searching in the yarn logs I got this log $ yarn logs -applicationId application_143708028_0030 2015-07-17 17:11:03,961 - INFO [sparkYarnAM-akka.actor.default-dispatcher-4:Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$3@74] - Starting remoting 2015-07-17 17:11:04,200 - ERROR [sparkYarnAM-akka.actor.default-dispatcher-4:Slf4jLogger$$anonfun$receive$1$$anonfun$applyOrElse$1@65] - failed to bind to My-HadoopServer/10.98.105.11:0, shutting down Netty transport 2015-07-17 17:11:04,210 - WARN [main:Logging$class@71] - Service 'sparkYarnAM' could not bind on port 0. Attempting port 1. ... ... ... 2015-07-17 17:11:05,123 - ERROR [main:Logging$class@96] - Uncaught exception: java.net.BindException: Failed to bind to: My-HadoopServer/HadoopServerIP:0: Service 'sparkYarnAM' failed after 16 retries! at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272) at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:393) at akka.remote.transport.netty.NettyTransport$$anonfun$listen$1.apply(NettyTransport.scala:389) at scala.util.Success$$anonfun$map$1.apply(Try.scala:206) ... I m using Spark 1.3, Hadoop 2.6 , and in spark-env.sh it points to my hadoop configuration: export HADOOP_CONF_DIR=/usr/hdp/2.2.4.4-16/hadoop/conf Is this probleme coming from spark configuration or yarn configuration (or spark with yarn confs) Any Ideas?? Amjad
Re: use S3-Compatible Storage with spark
Hi Schmirr, The part after the s3n:// is your bucket name and folder name, ie s3n://${bucket_name}/${folder_name}[/${subfolder_name}]*. Bucket names are unique across S3, so the resulting path is also unique. There is no concept of hostname in s3 urls as far as I know. -sujit On Fri, Jul 17, 2015 at 1:36 AM, Schmirr Wurst schmirrwu...@gmail.com wrote: Hi, I wonder how to use S3 compatible Storage in Spark ? If I'm using s3n:// url schema, the it will point to amazon, is there a way I can specify the host somewhere ? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.3.1 + Hive: write output to CSV with header on S3
Using a hive-site.xml file on the classpath. On Fri, Jul 17, 2015 at 8:37 AM, spark user spark_u...@yahoo.com.invalid wrote: Hi Roberto I have question regarding HiveContext . when you create HiveContext where you define Hive connection properties ? Suppose Hive is not in local machine i need to connect , how HiveConext will know the data base info like url ,username and password ? String username = ; String password = ; String url = jdbc:hive2://quickstart.cloudera:1/default; On Friday, July 17, 2015 2:29 AM, Roberto Coluccio roberto.coluc...@gmail.com wrote: Hello community, I'm currently using Spark 1.3.1 with Hive support for outputting processed data on an external Hive table backed on S3. I'm using a manual specification of the delimiter, but I'd want to know if is there any clean way to write in CSV format: *val* sparkConf = *new* SparkConf() *val* sc = *new* SparkContext(sparkConf) *val* hiveContext = *new* org.apache.spark.sql.hive.HiveContext(sc) *import* hiveContext.implicits._ hiveContext.sql( CREATE EXTERNAL TABLE IF NOT EXISTS table_name(field1 STRING, field2 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION ' + path_on_s3 + ') hiveContext.sql(an INSERT OVERWRITE query to write into the above table) I also need the header of the table to be printed on each written file. I tried with: hiveContext.sql(set hive.cli.print.header=true) But it didn't work. Any hint? Thank you. Best regards, Roberto
streaming and piping to R, sending all data in window to pipe()
Spark newbie here, using Spark 1.3.1. I’m consuming a stream and trying to pipe the data from the entire window to R for analysis. The R algorithm needs the entire dataset from the stream (everything in the window) in order to function properly; it can’t be broken up. So I tried doing a coalesce(1) before calling pipe(), but it still seems to be breaking up the data and invoking R, but it still seems to to be breaking up the data and invoking R multiple times with small pieces of data. Is there some other approach I should try? Here’s a small snippet: val inputs: DStream[String] = MQTTUtils.createStream(ssc, mqttBrokerUrl, inputsTopic, StorageLevel.MEMORY_AND_DISK_SER) .window(duration) inputs.foreachRDD { windowRdd = { if (windowRdd.count() 0) processWindow(windowRdd) } } ... def processWindow(windowRdd: RDD[String]) = { // call R script to process data windowRdd.coalesce(1) val outputsRdd: RDD[String] = windowRdd.pipe(SparkFiles.get(Paths.get(rScript).getFileName.toString)) outputsRdd.cache() if (outputsRdd.count() 0) processOutputs(outputsRdd) } ... This e-mail message may contain privileged and/or confidential information, and is intended to be received only by persons entitled to receive such information. If you have received this e-mail in error, please notify the sender immediately. Please delete it and all attachments from any servers, hard drives or any other media. Other use of this e-mail by you is strictly prohibited. All e-mails and attachments sent and received are subject to monitoring, reading and archival by Monsanto, including its subsidiaries. The recipient of this e-mail is solely responsible for checking for the presence of Viruses or other Malware. Monsanto, along with its subsidiaries, accepts no liability for any damage caused by any such code transmitted by or accompanying this e-mail or any attachment. The information contained in this email may be subject to the export control laws and regulations of the United States, potentially including but not limited to the Export Administration Regulations (EAR) and sanctions regulations issued by the U.S. Department of Treasury, Office of Foreign Asset Controls (OFAC). As a recipient of this information you are obligated to comply with all applicable U.S. export laws and regulations.
exception raised during large spark job against cassandra ring
Hello, I have been having trouble getting large Spark jobs to complete against my Cassandra ring. I’m finding that the CPU goes to 100% on one of the nodes, and then, after many hours, the job fails. Here are my Spark settings: .set(*spark.cassandra.input.split.size_in_mb*, *128*) .set(*spark.cassandra.output.batch.size.rows*, *300*) .set(*spark.network.timeout*, *21600*) .set(*spark.akka.frameSize*, *150*) .set(*spark.executor.heartbeatInterval*, *60*) .set(*spark.akka.timeout*, *300*) .set(*spark.akka.threads*, *24*) This Jira posting seems to discuss a 2 GB limit, but I cannot tell from the post what the suggested solution would be for my setup: https://issues.apache.org/jira/browse/SPARK-6190 Here is the exception details: Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 465 in stage 2.0 failed 4 times, most recent failure: Lost task 465.3 in stage 2.0 (TID 2855, 172.31.44.9): java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at
??????Nullpointer when saving as table with a timestamp column type
df: [name:String, Place:String, time: time:timestamp] why not df: [name:String, Place:String, time:timestamp] -- -- ??: Brandon White;bwwintheho...@gmail.com; : 2015??7??17??(??) 2:18 ??: useruser@spark.apache.org; : Nullpointer when saving as table with a timestamp column type So I have a very simple dataframe that looks like df: [name:String, Place:String, time: time:timestamp] I build this java.sql.Timestamp from a string and it works really well expect when I call saveAsTable(tableName) on this df. Without the timestamp, it saves fine but with the timestamp, it throws java.lang.NullPointerException Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1230) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1219) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1218) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1218) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:719) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:719) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:719) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1380) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) Any ideas how I can get around this?
Re: it seem like the exactly once feature not work on spark1.4
Yes. More information in my talk - https://www.youtube.com/watch?v=d5UJonrruHk On Fri, Jul 17, 2015 at 1:15 AM, JoneZhang joyoungzh...@gmail.com wrote: I see now. There are three steps in SparkStreaming + Kafka date processing 1.Receiving the data 2.Transforming the data 3.Pushing out the data SparkStreaming + Kafka only provide an exactly-once guarantee on step 1 2 We need to ensure exactly once on step 3 by myself. More details see base on http://spark.apache.org/docs/latest/streaming-programming-guide.html http://spark.apache.org/docs/latest/streaming-programming-guide.html -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/it-seem-like-the-exactly-once-feature-not-work-on-spark1-4-tp23871p23884.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.3.1 + Hive: write output to CSV with header on S3
Hello community, I'm currently using Spark 1.3.1 with Hive support for outputting processed data on an external Hive table backed on S3. I'm using a manual specification of the delimiter, but I'd want to know if is there any clean way to write in CSV format: *val* sparkConf = *new* SparkConf() *val* sc = *new* SparkContext(sparkConf) *val* hiveContext = *new* org.apache.spark.sql.hive.HiveContext(sc) *import* hiveContext.implicits._ hiveContext.sql( CREATE EXTERNAL TABLE IF NOT EXISTS table_name(field1 STRING, field2 STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION ' + path_on_s3 + ') hiveContext.sql(an INSERT OVERWRITE query to write into the above table) I also need the header of the table to be printed on each written file. I tried with: hiveContext.sql(set hive.cli.print.header=true) But it didn't work. Any hint? Thank you. Best regards, Roberto
Re: Spark streaming Processing time keeps increasing
Hi TD, Thanks for the response. I do believe I understand the concept and the need for the filterfunction now. I made the requisite code changes and keeping it running overnight to see the effect of it. Hopefully this should fix our issue. However, there was one place where I encountered a followup issue and had to disable that reduce operation for the moment in order to proceed with my testing for the rest of the changes. This particular reduceByKeyAndWindow operates on a key-value pair String, HashSetLong. Once the size of a HashSet drops to 0, we remove the corresponding Key with the filterfunction specified as ( p - ! p._2().isEmpty()) That looks about right to me. However, soon after the first slide occurs in this window, its throwing the following exceptions and aborting that batch. The stack trace is below. I am not quite sure what to make of it (perhaps partly due to the late hour :-D ). Any idea what could be wrong here? As far as I know, String and HashSetLong should hash quite consistently. Also, if there is no way to avoid this issue, I am thinking of rewriting that part of the code to use a foldByKey or combineByKey operation instead of reduceByKey. Thanks Nikunj java.lang.Exception: Neither previous window has value for key, nor new values found. Are you sure your key class hashes consistently? at org.apache.spark.streaming.dstream.ReducedWindowedDStream$$anonfun$4.apply(ReducedWindowedDStream.scala:147) at org.apache.spark.streaming.dstream.ReducedWindowedDStream$$anonfun$4.apply(ReducedWindowedDStream.scala:134) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:700) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:700) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:276) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:139) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:135) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:135) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) On Fri, Jul 17, 2015 at 12:39 AM, Tathagata Das t...@databricks.com wrote: Responses inline. On Thu, Jul 16, 2015 at 9:27 PM, N B nb.nos...@gmail.com wrote: Hi TD, Yes, we do have the invertible function provided. However, I am not sure I understood how to use the filterFunction. Is there an example somewhere showing its usage? The header comment on the function says : * @param filterFunc function to filter expired key-value pairs; * only pairs that satisfy the function are retained * set this to null if you do not want to filter
Re: what is : ParquetFileReader: reading summary file ?
Yeah, Spark SQL Parquet support need to do some metadata discovery when firstly importing a folder containing Parquet files, and discovered metadata is cached. Cheng On 7/17/15 1:56 PM, shsh...@tsmc.com wrote: Hi all, our scenario is to generate lots of folders containinig parquet file and then uses add partition to add these folder locations to a hive table; when trying to read the hive table using Spark, following logs would show up and took a lot of time on reading them; but this won't happen after second of third time of querying this table through sql in HiveContext; does that mean that parquet file has did some chaching by itself? Thanks! Jul 17, 2015 1:05:40 PM INFO: parquet.hadoop.ParquetFileReader: Initiating action with parallelism: 5 Jul 17, 2015 1:05:40 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150701/LDSN/_common_metadata Jul 17, 2015 1:05:40 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150702/MECC/_common_metadata Jul 17, 2015 1:05:40 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150702/MCOX/_common_metadata Jul 17, 2015 1:05:40 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150629/LCTE/_common_metadata Jul 17, 2015 1:05:40 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150630/MDNS/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150701/VSHM/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150624/LSCB/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150628/MPD8/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150703/VSHM/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150630/LIHI/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150701/LESE/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150626/MPD8/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150624/MDHK/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150628/VEMH/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150626/MDHK/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150628/LSCB/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150627/LESR/_common_metadata Jul 17, 2015 1:05:41 PM INFO: parquet.hadoop.ParquetFileReader: reading summary file: hdfs://f14ecat/HDFS/NEW_TCHART/20150703/LESE/_common_metadata --- TSMC PROPERTY This email communication (and any attachments) is proprietary information for the sole use of its intended recipient. Any unauthorized review, use or distribution by anyone other than the intended recipient is strictly prohibited. If you are not the intended recipient, please notify the sender by replying to this email, and then delete this email and any copies of it immediately. Thank you. --- - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: pyspark 1.4 udf change date values
Sure, I have created JIRA SPARK-9131 - UDF change data values https://issues.apache.org/jira/browse/SPARK-9131 On Thu, Jul 16, 2015 at 7:09 PM, Davies Liu dav...@databricks.com wrote: Thanks for reporting this, could you file a JIRA for it? On Thu, Jul 16, 2015 at 8:22 AM, Luis Guerra luispelay...@gmail.com wrote: Hi all, I am having some troubles when using a custom udf in dataframes with pyspark 1.4. I have rewritten the udf to simplify the problem and it gets even weirder. The udfs I am using do absolutely nothing, they just receive some value and output the same value with the same format. I show you my code below: c= a.join(b, a['ID'] == b['ID_new'], 'inner') c.filter(c['ID'] == 'XX').show() udf_A = UserDefinedFunction(lambda x: x, DateType()) udf_B = UserDefinedFunction(lambda x: x, DateType()) udf_C = UserDefinedFunction(lambda x: x, DateType()) d = c.select(c['ID'], c['t1'].alias('ta'), udf_A(vinc_muestra['t2']).alias('tb'), udf_B(vinc_muestra['t1']).alias('tc'), udf_C(vinc_muestra['t2']).alias('td')) d.filter(d['ID'] == 'XX').show() I am showing here the results from the outputs: +++--+--+ | ID | ID_new | t1 | t2 | +++--+--+ |62698917| 62698917| 2012-02-28| 2014-02-28| |62698917| 62698917| 2012-02-20| 2013-02-20| |62698917| 62698917| 2012-02-28| 2014-02-28| |62698917| 62698917| 2012-02-20| 2013-02-20| |62698917| 62698917| 2012-02-20| 2013-02-20| |62698917| 62698917| 2012-02-28| 2014-02-28| |62698917| 62698917| 2012-02-28| 2014-02-28| |62698917| 62698917| 2012-02-20| 2013-02-20| +++--+--+ ++---+---+++ | ID| ta |tb | tc| td | ++---+---+++ |62698917| 2012-02-28| 20070305|20030305| 20140228| |62698917| 2012-02-20| 20070215|20020215| 20130220| |62698917| 2012-02-28| 20070310|20050310| 20140228| |62698917| 2012-02-20| 20070305|20030305| 20130220| |62698917| 2012-02-20| 20130802|20130102| 20130220| |62698917| 2012-02-28| 20070215|20020215| 20140228| |62698917| 2012-02-28| 20070215|20020215| 20140228| |62698917| 2012-02-20| 20140102|20130102| 20130220| ++---+---+++ My problem here is that values at columns 'tb', 'tc' and 'td' in dataframe 'd' are completely different from values 't1' and 't2' in dataframe c even when my udfs are doing nothing. It seems like if values were somehow got from other registers (or just invented). Results are different between executions (apparently random). Any insight on this? Thanks in advance
Spark APIs memory usage?
Hi, I used createDataFrame API of SqlContext in python. and getting OutOfMemoryException. I am wondering if it is creating whole dataFrame in memory? I did not find any documentation describing memory usage of Spark APIs. Documentation given is nice but little more details (specially on memory usage/ data distribution etc.) will really help. -- Regards Harit Vishwakarma
Re: Spark APIs memory usage?
You can bump up number of partitions while creating the rdd you are using for df On 17 Jul 2015 21:03, Harit Vishwakarma harit.vishwaka...@gmail.com wrote: Hi, I used createDataFrame API of SqlContext in python. and getting OutOfMemoryException. I am wondering if it is creating whole dataFrame in memory? I did not find any documentation describing memory usage of Spark APIs. Documentation given is nice but little more details (specially on memory usage/ data distribution etc.) will really help. -- Regards Harit Vishwakarma
Re: What else is need to setup native support of BLAS/LAPACK with Spark?
Hi Sean, Thanks for the reply! I did double-check that the jar is one I think I am running: [image: Inline image 2] jar tf /hpc/users/ahujaa01/src/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep netlib | grep Native com/github/fommil/netlib/NativeRefARPACK.class com/github/fommil/netlib/NativeRefBLAS.class com/github/fommil/netlib/NativeRefLAPACK.class com/github/fommil/netlib/NativeSystemARPACK.class com/github/fommil/netlib/NativeSystemBLAS.class com/github/fommil/netlib/NativeSystemLAPACK.class Also, I checked the gfortran version on the cluster nodes and it is available and is 5.1 $ gfortran --version GNU Fortran (GCC) 5.1.0 Copyright (C) 2015 Free Software Foundation, Inc. and still see: 15/07/17 13:20:53 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/07/17 13:20:53 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS 15/07/17 13:20:53 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 15/07/17 13:20:53 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK Does anything need to be adjusted in my application POM? Thanks, Arun On Thu, Jul 16, 2015 at 5:26 PM, Sean Owen so...@cloudera.com wrote: Yes, that's most of the work, just getting the native libs into the assembly. netlib can find them from there even if you don't have BLAS libs on your OS, since it includes a reference implementation as a fallback. One common reason it won't load is not having libgfortran installed on your OSes though. It has to be 4.6+ too. That can't be shipped even in netlib and has to exist on your hosts. The other thing I'd double-check is whether you are really using this assembly you built for your job -- like, it's the actually the assembly the executors are using. On Tue, Jul 7, 2015 at 8:47 PM, Arun Ahuja aahuj...@gmail.com wrote: Is there more documentation on what is needed to setup BLAS/LAPACK native suport with Spark. I’ve built spark with the -Pnetlib-lgpl flag and see that the netlib classes are in the assembly jar. jar tvf spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep netlib | grep Native 6625 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefARPACK.class 21123 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefBLAS.class 178334 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefLAPACK.class 6640 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemARPACK.class 21138 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemBLAS.class 178349 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemLAPACK.class Also I see the following in /usr/lib64 ls /usr/lib64/libblas. libblas.a libblas.solibblas.so.3 libblas.so.3.2 libblas.so.3.2.1 ls /usr/lib64/liblapack liblapack.a liblapack_pic.a liblapack.so liblapack.so.3 liblapack.so.3.2liblapack.so.3.2.1 But I stil see the following in the Spark logs: 15/07/07 15:36:25 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/07/07 15:36:25 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS 15/07/07 15:36:26 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 15/07/07 15:36:26 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK Anything in this process I missed? Thanks, Arun
Re: use S3-Compatible Storage with spark
The endpoint is the property you want to set. I would look at the source for that. Sent from my iPhone On Jul 17, 2015, at 08:55, Sujit Pal sujitatgt...@gmail.com wrote: Hi Schmirr, The part after the s3n:// is your bucket name and folder name, ie s3n://${bucket_name}/${folder_name}[/${subfolder_name}]*. Bucket names are unique across S3, so the resulting path is also unique. There is no concept of hostname in s3 urls as far as I know. -sujit On Fri, Jul 17, 2015 at 1:36 AM, Schmirr Wurst schmirrwu...@gmail.com wrote: Hi, I wonder how to use S3 compatible Storage in Spark ? If I'm using s3n:// url schema, the it will point to amazon, is there a way I can specify the host somewhere ? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark and SQL Server
Hello, I am testing Spark interoperation with SQL Server via JDBC with Microsoft’s 4.2 JDBC Driver. Reading from the database works ok, but I have encountered a couple of issues writing back. In Scala 2.10 I can write back to the database except for a couple of types. 1. When I read a DataFrame from a table that contains a datetime column it comes in as a java.sql.Timestamp object in the DataFrame. This is alright for Spark purposes, but when I go to write this back to the database with df.write.jdbc(…) it errors out because it is trying to write the TimeStamp type to SQL Server, which is not a date/time storing type in TSQL. I think it should be writing a datetime, but I’m not sure how to tell Spark this. 2. A related misunderstanding happens when I try to write a java.lang.boolean to the database; it errors out because Spark is trying to specify the width of the bit type, which is illegal in SQL Server (error msg: Cannot specify a column width on data type bit). Do I need to edit Spark source to fix this behavior, or is there a configuration option somewhere that I am not aware of? When I attempt to write back to SQL Server in an IPython notebook, py4j seems unable to convert a Python dict into a Java hashmap, which is necessary for parameter passing. I’ve documented details of this problem with code examples herehttp://stackoverflow.com/questions/31417653/java-util-hashmap-missing-in-pyspark-session. Any advice would be appreciated. Thank you for your time, -- Matthew Young - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: What else is need to setup native support of BLAS/LAPACK with Spark?
Make sure /usr/lib64 contains libgfortran.so.3; that's really the issue. I'm pretty sure the answer is 'yes', but, make sure the assembly has jniloader too. I don't see why it wouldn't, but, that's needed. What is your env like -- local, standalone, YARN? how are you running? Just want to make sure you are using this assembly across your cluster. On Fri, Jul 17, 2015 at 6:26 PM, Arun Ahuja aahuj...@gmail.com wrote: Hi Sean, Thanks for the reply! I did double-check that the jar is one I think I am running: [image: Inline image 2] jar tf /hpc/users/ahujaa01/src/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep netlib | grep Native com/github/fommil/netlib/NativeRefARPACK.class com/github/fommil/netlib/NativeRefBLAS.class com/github/fommil/netlib/NativeRefLAPACK.class com/github/fommil/netlib/NativeSystemARPACK.class com/github/fommil/netlib/NativeSystemBLAS.class com/github/fommil/netlib/NativeSystemLAPACK.class Also, I checked the gfortran version on the cluster nodes and it is available and is 5.1 $ gfortran --version GNU Fortran (GCC) 5.1.0 Copyright (C) 2015 Free Software Foundation, Inc. and still see: 15/07/17 13:20:53 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/07/17 13:20:53 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS 15/07/17 13:20:53 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 15/07/17 13:20:53 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK Does anything need to be adjusted in my application POM? Thanks, Arun On Thu, Jul 16, 2015 at 5:26 PM, Sean Owen so...@cloudera.com wrote: Yes, that's most of the work, just getting the native libs into the assembly. netlib can find them from there even if you don't have BLAS libs on your OS, since it includes a reference implementation as a fallback. One common reason it won't load is not having libgfortran installed on your OSes though. It has to be 4.6+ too. That can't be shipped even in netlib and has to exist on your hosts. The other thing I'd double-check is whether you are really using this assembly you built for your job -- like, it's the actually the assembly the executors are using. On Tue, Jul 7, 2015 at 8:47 PM, Arun Ahuja aahuj...@gmail.com wrote: Is there more documentation on what is needed to setup BLAS/LAPACK native suport with Spark. I’ve built spark with the -Pnetlib-lgpl flag and see that the netlib classes are in the assembly jar. jar tvf spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep netlib | grep Native 6625 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefARPACK.class 21123 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefBLAS.class 178334 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefLAPACK.class 6640 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemARPACK.class 21138 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemBLAS.class 178349 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemLAPACK.class Also I see the following in /usr/lib64 ls /usr/lib64/libblas. libblas.a libblas.solibblas.so.3 libblas.so.3.2 libblas.so.3.2.1 ls /usr/lib64/liblapack liblapack.a liblapack_pic.a liblapack.so liblapack.so.3 liblapack.so.3.2liblapack.so.3.2.1 But I stil see the following in the Spark logs: 15/07/07 15:36:25 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/07/07 15:36:25 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS 15/07/07 15:36:26 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 15/07/07 15:36:26 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK Anything in this process I missed? Thanks, Arun
Data frames select and where clause dependency
I'd like to understand why the where field must exist in the select clause. For example, the following select statement works fine - df.select(field1, filter_field).filter(df(filter_field) === value).show() However, the next one fails with the error in operator !Filter (filter_field#60 = value); - df.select(field1).filter(df(filter_field) === value).show() As a work-around, it seems that I can do the following - df.select(field1, filter_field).filter(df(filter_field) === value).drop(filter_field).show() Thanks, Mike.
Re: What else is need to setup native support of BLAS/LAPACK with Spark?
Can you try setting the spark.yarn.jar property to make sure it points to the jar you're thinking of? -Sandy On Fri, Jul 17, 2015 at 11:32 AM, Arun Ahuja aahuj...@gmail.com wrote: Yes, it's a YARN cluster and using spark-submit to run. I have SPARK_HOME set to the directory above and using the spark-submit script from there. bin/spark-submit --master yarn-client --executor-memory 10g --driver-memory 8g --num-executors 400 --executor-cores 1 --class org.hammerlab.guacamole.Guacamole --conf spark.default.parallelism=4000 --conf spark.storage.memoryFraction=0.15 libgfortran.so.3 is also there ls /usr/lib64/libgfortran.so.3 /usr/lib64/libgfortran.so.3 These are jniloader files in the jar jar tf /hpc/users/ahujaa01/src/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep jniloader META-INF/maven/com.github.fommil/jniloader/ META-INF/maven/com.github.fommil/jniloader/pom.xml META-INF/maven/com.github.fommil/jniloader/pom.properties Thanks, Arun On Fri, Jul 17, 2015 at 1:30 PM, Sean Owen so...@cloudera.com wrote: Make sure /usr/lib64 contains libgfortran.so.3; that's really the issue. I'm pretty sure the answer is 'yes', but, make sure the assembly has jniloader too. I don't see why it wouldn't, but, that's needed. What is your env like -- local, standalone, YARN? how are you running? Just want to make sure you are using this assembly across your cluster. On Fri, Jul 17, 2015 at 6:26 PM, Arun Ahuja aahuj...@gmail.com wrote: Hi Sean, Thanks for the reply! I did double-check that the jar is one I think I am running: [image: Inline image 2] jar tf /hpc/users/ahujaa01/src/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep netlib | grep Native com/github/fommil/netlib/NativeRefARPACK.class com/github/fommil/netlib/NativeRefBLAS.class com/github/fommil/netlib/NativeRefLAPACK.class com/github/fommil/netlib/NativeSystemARPACK.class com/github/fommil/netlib/NativeSystemBLAS.class com/github/fommil/netlib/NativeSystemLAPACK.class Also, I checked the gfortran version on the cluster nodes and it is available and is 5.1 $ gfortran --version GNU Fortran (GCC) 5.1.0 Copyright (C) 2015 Free Software Foundation, Inc. and still see: 15/07/17 13:20:53 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/07/17 13:20:53 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS 15/07/17 13:20:53 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 15/07/17 13:20:53 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK Does anything need to be adjusted in my application POM? Thanks, Arun On Thu, Jul 16, 2015 at 5:26 PM, Sean Owen so...@cloudera.com wrote: Yes, that's most of the work, just getting the native libs into the assembly. netlib can find them from there even if you don't have BLAS libs on your OS, since it includes a reference implementation as a fallback. One common reason it won't load is not having libgfortran installed on your OSes though. It has to be 4.6+ too. That can't be shipped even in netlib and has to exist on your hosts. The other thing I'd double-check is whether you are really using this assembly you built for your job -- like, it's the actually the assembly the executors are using. On Tue, Jul 7, 2015 at 8:47 PM, Arun Ahuja aahuj...@gmail.com wrote: Is there more documentation on what is needed to setup BLAS/LAPACK native suport with Spark. I’ve built spark with the -Pnetlib-lgpl flag and see that the netlib classes are in the assembly jar. jar tvf spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep netlib | grep Native 6625 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefARPACK.class 21123 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefBLAS.class 178334 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefLAPACK.class 6640 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemARPACK.class 21138 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemBLAS.class 178349 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemLAPACK.class Also I see the following in /usr/lib64 ls /usr/lib64/libblas. libblas.a libblas.solibblas.so.3 libblas.so.3.2 libblas.so.3.2.1 ls /usr/lib64/liblapack liblapack.a liblapack_pic.a liblapack.so liblapack.so.3 liblapack.so.3.2liblapack.so.3.2.1 But I stil see the following in the Spark logs: 15/07/07 15:36:25 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/07/07 15:36:25 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS 15/07/07 15:36:26 WARN LAPACK: Failed to load implementation
MapType vs StructType
I notice JSON objects are all parsed as Map[String,Any] in Jackson but for some reason, the inferSchema tools in Spark SQL extracts the schema of nested JSON objects as StructTypes. This makes it really confusing when trying to rectify the object hierarchy when I have maps because the Catalyst conversion layer underneath is expecting a Row or Product and not a Map. Why wasn't MapType used here? Is there any significant difference between the two of these types that would cause me not to use a MapType when I'm constructing my own schema representing a set of nested Map[String,_]'s?
Re: Store DStreams into Hive using Hive Streaming
Hi I have similar use case did you found solution for this problem of loading DStreams in Hive using Spark Streaming. Please guide. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Store-DStreams-into-Hive-using-Hive-Streaming-tp18307p23885.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: What else is need to setup native support of BLAS/LAPACK with Spark?
Yes, it's a YARN cluster and using spark-submit to run. I have SPARK_HOME set to the directory above and using the spark-submit script from there. bin/spark-submit --master yarn-client --executor-memory 10g --driver-memory 8g --num-executors 400 --executor-cores 1 --class org.hammerlab.guacamole.Guacamole --conf spark.default.parallelism=4000 --conf spark.storage.memoryFraction=0.15 libgfortran.so.3 is also there ls /usr/lib64/libgfortran.so.3 /usr/lib64/libgfortran.so.3 These are jniloader files in the jar jar tf /hpc/users/ahujaa01/src/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep jniloader META-INF/maven/com.github.fommil/jniloader/ META-INF/maven/com.github.fommil/jniloader/pom.xml META-INF/maven/com.github.fommil/jniloader/pom.properties Thanks, Arun On Fri, Jul 17, 2015 at 1:30 PM, Sean Owen so...@cloudera.com wrote: Make sure /usr/lib64 contains libgfortran.so.3; that's really the issue. I'm pretty sure the answer is 'yes', but, make sure the assembly has jniloader too. I don't see why it wouldn't, but, that's needed. What is your env like -- local, standalone, YARN? how are you running? Just want to make sure you are using this assembly across your cluster. On Fri, Jul 17, 2015 at 6:26 PM, Arun Ahuja aahuj...@gmail.com wrote: Hi Sean, Thanks for the reply! I did double-check that the jar is one I think I am running: [image: Inline image 2] jar tf /hpc/users/ahujaa01/src/spark/assembly/target/scala-2.10/spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep netlib | grep Native com/github/fommil/netlib/NativeRefARPACK.class com/github/fommil/netlib/NativeRefBLAS.class com/github/fommil/netlib/NativeRefLAPACK.class com/github/fommil/netlib/NativeSystemARPACK.class com/github/fommil/netlib/NativeSystemBLAS.class com/github/fommil/netlib/NativeSystemLAPACK.class Also, I checked the gfortran version on the cluster nodes and it is available and is 5.1 $ gfortran --version GNU Fortran (GCC) 5.1.0 Copyright (C) 2015 Free Software Foundation, Inc. and still see: 15/07/17 13:20:53 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/07/17 13:20:53 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS 15/07/17 13:20:53 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 15/07/17 13:20:53 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK Does anything need to be adjusted in my application POM? Thanks, Arun On Thu, Jul 16, 2015 at 5:26 PM, Sean Owen so...@cloudera.com wrote: Yes, that's most of the work, just getting the native libs into the assembly. netlib can find them from there even if you don't have BLAS libs on your OS, since it includes a reference implementation as a fallback. One common reason it won't load is not having libgfortran installed on your OSes though. It has to be 4.6+ too. That can't be shipped even in netlib and has to exist on your hosts. The other thing I'd double-check is whether you are really using this assembly you built for your job -- like, it's the actually the assembly the executors are using. On Tue, Jul 7, 2015 at 8:47 PM, Arun Ahuja aahuj...@gmail.com wrote: Is there more documentation on what is needed to setup BLAS/LAPACK native suport with Spark. I’ve built spark with the -Pnetlib-lgpl flag and see that the netlib classes are in the assembly jar. jar tvf spark-assembly-1.5.0-SNAPSHOT-hadoop2.6.0.jar | grep netlib | grep Native 6625 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefARPACK.class 21123 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefBLAS.class 178334 Tue Jul 07 15:22:08 EDT 2015 com/github/fommil/netlib/NativeRefLAPACK.class 6640 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemARPACK.class 21138 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemBLAS.class 178349 Tue Jul 07 15:22:10 EDT 2015 com/github/fommil/netlib/NativeSystemLAPACK.class Also I see the following in /usr/lib64 ls /usr/lib64/libblas. libblas.a libblas.solibblas.so.3 libblas.so.3.2 libblas.so.3.2.1 ls /usr/lib64/liblapack liblapack.a liblapack_pic.a liblapack.so liblapack.so.3 liblapack.so.3.2liblapack.so.3.2.1 But I stil see the following in the Spark logs: 15/07/07 15:36:25 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS 15/07/07 15:36:25 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS 15/07/07 15:36:26 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK 15/07/07 15:36:26 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK Anything in this process I missed?
Has anyone run Python Spark application on Yarn-cluster mode ? (which has 3rd party Python modules (i.e., numpy) to be shipped with)
Hi all, After SPARK-5479 https://issues.apache.org/jira/browse/SPARK-5479 issue fix (thanks to Marcelo Vanzin), now pyspark handles several python files (or in zip folder with __init__.py) addition to PYTHONPATH correctly in yarn-cluster mode. But adding python module as zip folder, still fails - if that zip folder have other file types (compiled byte code or c code) other than python files. Let's say you want to provide numpy package to --py-files flag, which is downloaded from as numpy-1.9.2.zip from this link https://pypi.python.org/pypi/numpy does not work - complaining import numpy line has failed. numpy module need to be *installed* before importing it in Spark Python script. So does that mean you need to install on all machines required python modules before using pyspark ? Or what is best pattern using any python 3rd party module in Spark Python job ? Thanks. On Thu, Jun 25, 2015 at 12:55 PM, Marcelo Vanzin van...@cloudera.com wrote: Please take a look at the pull request with the actual fix; that will explain why it's the same issue. On Thu, Jun 25, 2015 at 12:51 PM, Elkhan Dadashov elkhan8...@gmail.com wrote: Thanks Marcelo. But my case is different. My mypython/libs/numpy-1.9.2.zip is in *local directory* (can also put in HDFS), but still fails. But SPARK-5479 https://issues.apache.org/jira/browse/SPARK-5479 is : PySpark on yarn mode need to support *non-local* python files. The job fails only when i try to include 3rd party dependency from local computer with --py-files (in Spark 1.4) Both of these commands succeed: ./bin/spark-submit --master yarn-cluster --verbose hdfs:///pi.py ./bin/spark-submit --master yarn-cluster --deploy-mode cluster --verbose examples/src/main/python/pi.py But in this particular example with 3rd party numpy module: ./bin/spark-submit --verbose --master yarn-cluster --py-files mypython/libs/numpy-1.9.2.zip --deploy-mode cluster mypython/scripts/kmeans.py /kmeans_data.txt 5 1.0 All these files : mypython/libs/numpy-1.9.2.zip, mypython/scripts/kmeans.py are local files, kmeans_data.txt is in HDFS. Thanks. On Thu, Jun 25, 2015 at 12:22 PM, Marcelo Vanzin van...@cloudera.com wrote: That sounds like SPARK-5479 which is not in 1.4... On Thu, Jun 25, 2015 at 12:17 PM, Elkhan Dadashov elkhan8...@gmail.com wrote: In addition to previous emails, when i try to execute this command from command line: ./bin/spark-submit --verbose --master yarn-cluster --py-files mypython/libs/numpy-1.9.2.zip --deploy-mode cluster mypython/scripts/kmeans.py /kmeans_data.txt 5 1.0 - numpy-1.9.2.zip - is downloaded numpy package - kmeans.py is default example which comes with Spark 1.4 - kmeans_data.txt - is default data file which comes with Spark 1.4 It fails saying that it could not find numpy: File kmeans.py, line 31, in module import numpy ImportError: No module named numpy Has anyone run Python Spark application on Yarn-cluster mode ? (which has 3rd party Python modules to be shipped with) What are the configurations or installations to be done before running Python Spark job with 3rd party dependencies on Yarn-cluster ? Thanks in advance. -- Marcelo -- Best regards, Elkhan Dadashov -- Marcelo -- Best regards, Elkhan Dadashov
Model Save function (ML-Lib)
Hi: I'm using pyspark 1.3 and it seems that the model.save is not implemented for everyone. Here is what I have so far: *Model Name* *Model Class* *save available* Logistic Regression LogisticRegressionModel NO Random Forest TreeEnsembleModel OK GBM GradientBoostedTreesModel OK SVM SVMModel NO What is the recommended route to save a logistic regression or SVM ? I tried to pickle the SVM but it failed at loading it back. Any advice appreciated. Thanks ! Best, Guillaume Guy * +1 919 - 972 - 8750*
Re: Command builder problem when running worker in Windows
Run Spark with --verbose flag, to see what it read for that path. I guess in Windows if you are using backslash, you need 2 of them (\\), or just use forward slashes everywhere. On Fri, Jul 17, 2015 at 2:40 PM, Julien Beaudan jbeau...@stottlerhenke.com wrote: Hi, I running a stand-alone cluster in Windows 7, and when I try to run any worker on the machine, I get the following error: 15/07/17 14:14:43 ERROR ExecutorRunner: Error running executor java.io.IOException: Cannot run program C:\cygdrive\c\Users\jbeaudan\Spark\spark-1.3.1-bin-hadoop2.4/bin/compute-classpath.cmd (in directory .): CreateProcess error=2, The system cannot find the file specified at java.lang.ProcessBuilder.start(Unknown Source) at org.apache.spark.util.Utils$.executeCommand(Utils.scala:1067) at org.apache.spark.util.Utils$.executeAndGetOutput(Utils.scala:1084) at org.apache.spark.deploy.worker.CommandUtils$.buildJavaOpts(CommandUtils.scala:112) at org.apache.spark.deploy.worker.CommandUtils$.buildCommandSeq(CommandUtils.scala:61) at org.apache.spark.deploy.worker.CommandUtils$.buildProcessBuilder(CommandUtils.scala:47) at org.apache.spark.deploy.worker.ExecutorRunner.fetchAndRunExecutor(ExecutorRunner.scala:132) at org.apache.spark.deploy.worker.ExecutorRunner$$anon$1.run(ExecutorRunner.scala:68) Caused by: java.io.IOException: CreateProcess error=2, The system cannot find the file specified at java.lang.ProcessImpl.create(Native Method) at java.lang.ProcessImpl.init(Unknown Source) at java.lang.ProcessImpl.start(Unknown Source) ... 8 more I'm pretty sure the problem is that Spark is looking for the following path, which mixes forward and back slashes: C:\cygdrive\c\Users\jbeaudan\Spark\spark-1.3.1-bin-hadoop2.4/bin/compute-classpath.cmd Is there anyway to fix this? (Also, I have also tried running this from a normal terminal, instead of from cygwin, and I get the same issue, except this time the path is: C:\Users\jbeaudan\Spark\spark-1.3.1-bin-hadoop2.4\bin../bin/compute-classpath.cmd ) Thank you! Julien -- Best regards, Elkhan Dadashov
Re: Command builder problem when running worker in Windows
Are you running it from command line (CLI) or through SparkLauncher ? If you can share the command (./bin/spark-submit ...) or the code snippet you are running, then it can give some clue. On Fri, Jul 17, 2015 at 3:30 PM, Julien Beaudan jbeau...@stottlerhenke.com wrote: Hi Elkhan, I ran Spark with --verbose, but the output looked the same to me - what should I be looking for? At the beginning, the system properties which are set are: System properties: SPARK_SUBMIT - true spark.app.name - tests.testFileReader spark.jars - file:/C:/Users/jbeaudan/Spark/spark-1.3.1-bin-hadoop2.4/sparkTest1.jar spark.master - spark://192.168.194.128:7077 Classpath elements: file:/C:/Users/jbeaudan/Spark/spark-1.3.1-bin-hadoop2.4/sparkTest1.jar I'm not sure why, but the file paths here seem formatted correctly (it is same from the command terminal and Cygwin), so the path must get edited afterwards? Julien On 07/17/2015 03:00 PM, Elkhan Dadashov wrote: Run Spark with --verbose flag, to see what it read for that path. I guess in Windows if you are using backslash, you need 2 of them (\\), or just use forward slashes everywhere. On Fri, Jul 17, 2015 at 2:40 PM, Julien Beaudan jbeau...@stottlerhenke.com wrote: Hi, I running a stand-alone cluster in Windows 7, and when I try to run any worker on the machine, I get the following error: 15/07/17 14:14:43 ERROR ExecutorRunner: Error running executor java.io.IOException: Cannot run program C:\cygdrive\c\Users\jbeaudan\Spark\spark-1.3.1-bin-hadoop2.4/bin/compute-classpath.cmd (in directory .): CreateProcess error=2, The system cannot find the file specified at java.lang.ProcessBuilder.start(Unknown Source) at org.apache.spark.util.Utils$.executeCommand(Utils.scala:1067) at org.apache.spark.util.Utils$.executeAndGetOutput(Utils.scala:1084) at org.apache.spark.deploy.worker.CommandUtils$.buildJavaOpts(CommandUtils.scala:112) at org.apache.spark.deploy.worker.CommandUtils$.buildCommandSeq(CommandUtils.scala:61) at org.apache.spark.deploy.worker.CommandUtils$.buildProcessBuilder(CommandUtils.scala:47) at org.apache.spark.deploy.worker.ExecutorRunner.fetchAndRunExecutor(ExecutorRunner.scala:132) at org.apache.spark.deploy.worker.ExecutorRunner$$anon$1.run(ExecutorRunner.scala:68) Caused by: java.io.IOException: CreateProcess error=2, The system cannot find the file specified at java.lang.ProcessImpl.create(Native Method) at java.lang.ProcessImpl.init(Unknown Source) at java.lang.ProcessImpl.start(Unknown Source) ... 8 more I'm pretty sure the problem is that Spark is looking for the following path, which mixes forward and back slashes: C:\cygdrive\c\Users\jbeaudan\Spark\spark-1.3.1-bin-hadoop2.4/bin/compute-classpath.cmd Is there anyway to fix this? (Also, I have also tried running this from a normal terminal, instead of from cygwin, and I get the same issue, except this time the path is: C:\Users\jbeaudan\Spark\spark-1.3.1-bin-hadoop2.4\bin../bin/compute-classpath.cmd ) Thank you! Julien -- Best regards, Elkhan Dadashov -- Best regards, Elkhan Dadashov
Re: BroadCast on Interval ( eg every 10 min )
The simple answer is you should not update broadcast variable. If you can post the problem you are handling, people here should be able to provide better suggestions. On 18 Jul 2015 13:53, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: Broadcasted variables are immutable. Anyway, how are you getting that data which you want to broadcast at regular intervals. On Jul 16, 2015 9:33 PM, Ashish Soni asoni.le...@gmail.com wrote: Hi All , How can i broadcast a data change to all the executor ever other 10 min or 1 min Ashish
Re: MapType vs StructType
I'll add there is a JIRA to override the default past some threshold of # of unique keys: https://issues.apache.org/jira/browse/SPARK-4476 https://issues.apache.org/jira/browse/SPARK-4476 On Fri, Jul 17, 2015 at 1:32 PM, Michael Armbrust mich...@databricks.com wrote: The difference between a map and a struct here is that in a struct all possible keys are defined as part of the schema and can each can have a different type (and we don't support union types). JSON doesn't have differentiated data structures so we go with the one that gives you more information when doing inference by default. If you pass in a schema to JSON however, you can override this and have a JSON object parsed as a map. On Fri, Jul 17, 2015 at 11:02 AM, Corey Nolet cjno...@gmail.com wrote: I notice JSON objects are all parsed as Map[String,Any] in Jackson but for some reason, the inferSchema tools in Spark SQL extracts the schema of nested JSON objects as StructTypes. This makes it really confusing when trying to rectify the object hierarchy when I have maps because the Catalyst conversion layer underneath is expecting a Row or Product and not a Map. Why wasn't MapType used here? Is there any significant difference between the two of these types that would cause me not to use a MapType when I'm constructing my own schema representing a set of nested Map[String,_]'s?
Re: MapType vs StructType
The difference between a map and a struct here is that in a struct all possible keys are defined as part of the schema and can each can have a different type (and we don't support union types). JSON doesn't have differentiated data structures so we go with the one that gives you more information when doing inference by default. If you pass in a schema to JSON however, you can override this and have a JSON object parsed as a map. On Fri, Jul 17, 2015 at 11:02 AM, Corey Nolet cjno...@gmail.com wrote: I notice JSON objects are all parsed as Map[String,Any] in Jackson but for some reason, the inferSchema tools in Spark SQL extracts the schema of nested JSON objects as StructTypes. This makes it really confusing when trying to rectify the object hierarchy when I have maps because the Catalyst conversion layer underneath is expecting a Row or Product and not a Map. Why wasn't MapType used here? Is there any significant difference between the two of these types that would cause me not to use a MapType when I'm constructing my own schema representing a set of nested Map[String,_]'s?
Re: MapType vs StructType
This helps immensely. Thanks Michael! On Fri, Jul 17, 2015 at 4:33 PM, Michael Armbrust mich...@databricks.com wrote: I'll add there is a JIRA to override the default past some threshold of # of unique keys: https://issues.apache.org/jira/browse/SPARK-4476 https://issues.apache.org/jira/browse/SPARK-4476 On Fri, Jul 17, 2015 at 1:32 PM, Michael Armbrust mich...@databricks.com wrote: The difference between a map and a struct here is that in a struct all possible keys are defined as part of the schema and can each can have a different type (and we don't support union types). JSON doesn't have differentiated data structures so we go with the one that gives you more information when doing inference by default. If you pass in a schema to JSON however, you can override this and have a JSON object parsed as a map. On Fri, Jul 17, 2015 at 11:02 AM, Corey Nolet cjno...@gmail.com wrote: I notice JSON objects are all parsed as Map[String,Any] in Jackson but for some reason, the inferSchema tools in Spark SQL extracts the schema of nested JSON objects as StructTypes. This makes it really confusing when trying to rectify the object hierarchy when I have maps because the Catalyst conversion layer underneath is expecting a Row or Product and not a Map. Why wasn't MapType used here? Is there any significant difference between the two of these types that would cause me not to use a MapType when I'm constructing my own schema representing a set of nested Map[String,_]'s?
Cleanup when tasks generate errors
I've observed a number of cases where Spark does not clean HDFS side-effects on errors, especially out of memory conditions. Here is an example from the following code snippet executed in spark-shell: import org.apache.spark.sql.hive.HiveContextimport org.apache.spark.sql.SaveModeval ctx = sqlContext.asInstanceOf[HiveContext]import ctx.implicits._ctx. jsonFile(file:///test_data/*/*/*/*.gz). saveAsTable(test_data, SaveMode.Overwrite) First run: saveAsTable terminates with an out of memory exception. Second run (with more RAM to driver executor): fails with many variations of java.lang.RuntimeException: hdfs://localhost:54310/user/hive/warehouse/test_data/_temporary/0/_temporary/attempt_201507171538_0008_r_21_0/part-r-00022.parquet is not a Parquet file (too small) Third run (after hdfs dfs -rm -r hdfs:///user/hive/warehouse/test_data) succeeds. What are the best practices for dealing with these types of cleanup failures? Do they tend to come in known varieties? Thanks, Sim -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cleanup-when-tasks-generate-errors-tp23890.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Command builder problem when running worker in Windows
Hi Elkhan, I ran Spark with --verbose, but the output looked the same to me - what should I be looking for? At the beginning, the system properties which are set are: System properties: SPARK_SUBMIT - true spark.app.name - tests.testFileReader spark.jars - file:/C:/Users/jbeaudan/Spark/spark-1.3.1-bin-hadoop2.4/sparkTest1.jar spark.master - spark://192.168.194.128:7077 Classpath elements: file:/C:/Users/jbeaudan/Spark/spark-1.3.1-bin-hadoop2.4/sparkTest1.jar I'm not sure why, but the file paths here seem formatted correctly (it is same from the command terminal and Cygwin), so the path must get edited afterwards? Julien On 07/17/2015 03:00 PM, Elkhan Dadashov wrote: Run Spark with --verbose flag, to see what it read for that path. I guess in Windows if you are using backslash, you need 2 of them (\\), or just use forward slashes everywhere. On Fri, Jul 17, 2015 at 2:40 PM, Julien Beaudan jbeau...@stottlerhenke.com mailto:jbeau...@stottlerhenke.com wrote: Hi, I running a stand-alone cluster in Windows 7, and when I try to run any worker on the machine, I get the following error: 15/07/17 14:14:43 ERROR ExecutorRunner: Error running executor java.io.IOException: Cannot run program C:\cygdrive\c\Users\jbeaudan\Spark\spark-1.3.1-bin-hadoop2.4/bin/compute-classpath.cmd (in directory .): CreateProcess error=2, The system cannot find the file specified at java.lang.ProcessBuilder.start(Unknown Source) at org.apache.spark.util.Utils$.executeCommand(Utils.scala:1067) at org.apache.spark.util.Utils$.executeAndGetOutput(Utils.scala:1084) at org.apache.spark.deploy.worker.CommandUtils$.buildJavaOpts(CommandUtils.scala:112) at org.apache.spark.deploy.worker.CommandUtils$.buildCommandSeq(CommandUtils.scala:61) at org.apache.spark.deploy.worker.CommandUtils$.buildProcessBuilder(CommandUtils.scala:47) at org.apache.spark.deploy.worker.ExecutorRunner.fetchAndRunExecutor(ExecutorRunner.scala:132) at org.apache.spark.deploy.worker.ExecutorRunner$$anon$1.run(ExecutorRunner.scala:68) Caused by: java.io.IOException: CreateProcess error=2, The system cannot find the file specified at java.lang.ProcessImpl.create(Native Method) at java.lang.ProcessImpl.init(Unknown Source) at java.lang.ProcessImpl.start(Unknown Source) ... 8 more I'm pretty sure the problem is that Spark is looking for the following path, which mixes forward and back slashes: C:\cygdrive\c\Users\jbeaudan\Spark\spark-1.3.1-bin-hadoop2.4/bin/compute-classpath.cmd Is there anyway to fix this? (Also, I have also tried running this from a normal terminal, instead of from cygwin, and I get the same issue, except this time the path is: C:\Users\jbeaudan\Spark\spark-1.3.1-bin-hadoop2.4\bin../bin/compute-classpath.cmd ) Thank you! Julien -- Best regards, Elkhan Dadashov smime.p7s Description: S/MIME Cryptographic Signature
Re: What is java.sql.SQLException: Unsupported type -101?
Looking at getCatalystType(): * Maps a JDBC type to a Catalyst type. This function is called only when * the JdbcDialect class corresponding to your database driver returns null. sqlType was carrying value of -101 However, I couldn't find -101 in http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b14/java/sql/Types.java FYI On Fri, Jul 17, 2015 at 2:01 PM, Sambit Tripathy (RBEI/EDS1) sambit.tripa...@in.bosch.com wrote: Hi, I was trying to get a oracle table using JDBC datasource val jdbcDF = sqlContext.load(jdbc, Map( url - jdbc:oracle:thin:USER/p...@host.com:1517:sid, dbtable - USER.TABLE,driver - oracle.jdbc.OracleDriver)) and got the error below java.sql.SQLException: Unsupported type -101 at org.apache.spark.sql.jdbc.JDBCRDD$.getCatalystType(JDBCRDD.scala:78) at org.apache.spark.sql.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:112) at org.apache.spark.sql.jdbc.JDBCRelation.init(JDBCRelation.scala:133) at org.apache.spark.sql.jdbc.DefaultSource.createRelation(JDBCRelation.scala:121) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:219) at org.apache.spark.sql.SQLContext.load(SQLContext.scala:697) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:23) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:28) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:30) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:32) at $iwC$$iwC$$iwC$$iwC.init(console:34) at $iwC$$iwC$$iwC.init(console:36) at $iwC$$iwC.init(console:38) at $iwC.init(console:40) at init(console:42) at .init(console:46) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) Any idea what it could be? Regards, Sambit.
Re: The auxService:spark_shuffle does not exist
Hi all, Did you forget to restart the node managers after editing yarn-site.xml by any chance? -Andrew 2015-07-17 8:32 GMT-07:00 Andrew Lee alee...@hotmail.com: I have encountered the same problem after following the document. Here's my spark-defaults.conf spark.shuffle.service.enabled true spark.dynamicAllocation.enabled true spark.dynamicAllocation.executorIdleTimeout 60 spark.dynamicAllocation.cachedExecutorIdleTimeout 120 spark.dynamicAllocation.initialExecutors 2 spark.dynamicAllocation.maxExecutors 8 spark.dynamicAllocation.minExecutors 1 spark.dynamicAllocation.schedulerBacklogTimeout 10 and yarn-site.xml configured. property nameyarn.nodemanager.aux-services/name valuespark_shuffle,mapreduce_shuffle/value /property ... property nameyarn.nodemanager.aux-services.spark_shuffle.class/name valueorg.apache.spark.network.yarn.YarnShuffleService/value /property and deployed the 2 JARs to NodeManager's classpath /opt/hadoop/share/hadoop/mapreduce/. (I also checked the NodeManager log and the JARs appear in the classpath). I notice that the JAR location is not the same as the document in 1.4. I found them under network/yarn/target and network/shuffle/target/ after building it with -Phadoop-2.4 -Psparkr -Pyarn -Phive -Phive-thriftserver in maven. spark-network-yarn_2.10-1.4.1.jar spark-network-shuffle_2.10-1.4.1.jar and still getting the following exception. Exception in thread ContainerLauncher #0 java.lang.Error: org.apache.spark.SparkException: Exception while starting container container_1437141440985_0003_01_02 on host alee-ci-2058-slave-2.test.altiscale.com at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: org.apache.spark.SparkException: Exception while starting container container_1437141440985_0003_01_02 on host alee-ci-2058-slave-2.test.altiscale.com at org.apache.spark.deploy.yarn.ExecutorRunnable.startContainer(ExecutorRunnable.scala:116) at org.apache.spark.deploy.yarn.ExecutorRunnable.run(ExecutorRunnable.scala:67) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) ... 2 more Caused by: org.apache.hadoop.yarn.exceptions.InvalidAuxServiceException: The auxService:spark_shuffle does not exist at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.instantiateException(SerializedExceptionPBImpl.java:152) at org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl.deSerialize(SerializedExceptionPBImpl.java:106) Not sure what else am I missing here or doing wrong? Appreciate any insights or feedback, thanks. -- Date: Wed, 8 Jul 2015 09:25:39 +0800 Subject: Re: The auxService:spark_shuffle does not exist From: zjf...@gmail.com To: rp...@njit.edu CC: user@spark.apache.org Did you enable the dynamic resource allocation ? You can refer to this page for how to configure spark shuffle service for yarn. https://spark.apache.org/docs/1.4.0/job-scheduling.html On Tue, Jul 7, 2015 at 10:55 PM, roy rp...@njit.edu wrote: we tried --master yarn-client with no different result. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/The-auxService-spark-shuffle-does-not-exist-tp23662p23689.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Best Regards Jeff Zhang
Re: Command builder problem when running worker in Windows
Oh, yeah of course. I'm writing from the command line (I haven't tried the SparkLauncher), using bin/spark-submit --class tests.testFileReader --master spark://192.168.194.128:7077 --verbose ./sparkTest1.jar All that the testFileReader class does is create an RDD from a few text files - just a sanity check to make sure that my set up works. Julien On 07/17/2015 03:35 PM, Elkhan Dadashov wrote: Are you running it from command line (CLI) or through SparkLauncher ? If you can share the command (./bin/spark-submit ...) or the code snippet you are running, then it can give some clue. On Fri, Jul 17, 2015 at 3:30 PM, Julien Beaudan jbeau...@stottlerhenke.com mailto:jbeau...@stottlerhenke.com wrote: Hi Elkhan, I ran Spark with --verbose, but the output looked the same to me - what should I be looking for? At the beginning, the system properties which are set are: System properties: SPARK_SUBMIT - true spark.app.name http://spark.app.name - tests.testFileReader spark.jars - file:/C:/Users/jbeaudan/Spark/spark-1.3.1-bin-hadoop2.4/sparkTest1.jar spark.master - spark://192.168.194.128:7077 http://192.168.194.128:7077 Classpath elements: file:/C:/Users/jbeaudan/Spark/spark-1.3.1-bin-hadoop2.4/sparkTest1.jar I'm not sure why, but the file paths here seem formatted correctly (it is same from the command terminal and Cygwin), so the path must get edited afterwards? Julien On 07/17/2015 03:00 PM, Elkhan Dadashov wrote: Run Spark with --verbose flag, to see what it read for that path. I guess in Windows if you are using backslash, you need 2 of them (\\), or just use forward slashes everywhere. On Fri, Jul 17, 2015 at 2:40 PM, Julien Beaudan jbeau...@stottlerhenke.com mailto:jbeau...@stottlerhenke.com wrote: Hi, I running a stand-alone cluster in Windows 7, and when I try to run any worker on the machine, I get the following error: 15/07/17 14:14:43 ERROR ExecutorRunner: Error running executor java.io.IOException: Cannot run program C:\cygdrive\c\Users\jbeaudan\Spark\spark-1.3.1-bin-hadoop2.4/bin/compute-classpath.cmd (in directory .): CreateProcess error=2, The system cannot find the file specified at java.lang.ProcessBuilder.start(Unknown Source) at org.apache.spark.util.Utils$.executeCommand(Utils.scala:1067) at org.apache.spark.util.Utils$.executeAndGetOutput(Utils.scala:1084) at org.apache.spark.deploy.worker.CommandUtils$.buildJavaOpts(CommandUtils.scala:112) at org.apache.spark.deploy.worker.CommandUtils$.buildCommandSeq(CommandUtils.scala:61) at org.apache.spark.deploy.worker.CommandUtils$.buildProcessBuilder(CommandUtils.scala:47) at org.apache.spark.deploy.worker.ExecutorRunner.fetchAndRunExecutor(ExecutorRunner.scala:132) at org.apache.spark.deploy.worker.ExecutorRunner$$anon$1.run(ExecutorRunner.scala:68) Caused by: java.io.IOException: CreateProcess error=2, The system cannot find the file specified at java.lang.ProcessImpl.create(Native Method) at java.lang.ProcessImpl.init(Unknown Source) at java.lang.ProcessImpl.start(Unknown Source) ... 8 more I'm pretty sure the problem is that Spark is looking for the following path, which mixes forward and back slashes: C:\cygdrive\c\Users\jbeaudan\Spark\spark-1.3.1-bin-hadoop2.4/bin/compute-classpath.cmd Is there anyway to fix this? (Also, I have also tried running this from a normal terminal, instead of from cygwin, and I get the same issue, except this time the path is: C:\Users\jbeaudan\Spark\spark-1.3.1-bin-hadoop2.4\bin../bin/compute-classpath.cmd ) Thank you! Julien -- Best regards, Elkhan Dadashov -- Best regards, Elkhan Dadashov smime.p7s Description: S/MIME Cryptographic Signature
What is java.sql.SQLException: Unsupported type -101?
Hi, I was trying to get a oracle table using JDBC datasource val jdbcDF = sqlContext.load(jdbc, Map( url - jdbc:oracle:thin:USER/p...@host.com:1517:sid, dbtable - USER.TABLE,driver - oracle.jdbc.OracleDriver)) and got the error below java.sql.SQLException: Unsupported type -101 at org.apache.spark.sql.jdbc.JDBCRDD$.getCatalystType(JDBCRDD.scala:78) at org.apache.spark.sql.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:112) at org.apache.spark.sql.jdbc.JDBCRelation.init(JDBCRelation.scala:133) at org.apache.spark.sql.jdbc.DefaultSource.createRelation(JDBCRelation.scala:121) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:219) at org.apache.spark.sql.SQLContext.load(SQLContext.scala:697) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:23) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:28) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:30) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:32) at $iwC$$iwC$$iwC$$iwC.init(console:34) at $iwC$$iwC$$iwC.init(console:36) at $iwC$$iwC.init(console:38) at $iwC.init(console:40) at init(console:42) at .init(console:46) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) Any idea what it could be? Regards, Sambit.
Command builder problem when running worker in Windows
Hi, I running a stand-alone cluster in Windows 7, and when I try to run any worker on the machine, I get the following error: 15/07/17 14:14:43 ERROR ExecutorRunner: Error running executor java.io.IOException: Cannot run program C:\cygdrive\c\Users\jbeaudan\Spark\spark-1.3.1-bin-hadoop2.4/bin/compute-classpath.cmd (in directory .): CreateProcess error=2, The system cannot find the file specified at java.lang.ProcessBuilder.start(Unknown Source) at org.apache.spark.util.Utils$.executeCommand(Utils.scala:1067) at org.apache.spark.util.Utils$.executeAndGetOutput(Utils.scala:1084) at org.apache.spark.deploy.worker.CommandUtils$.buildJavaOpts(CommandUtils.scala:112) at org.apache.spark.deploy.worker.CommandUtils$.buildCommandSeq(CommandUtils.scala:61) at org.apache.spark.deploy.worker.CommandUtils$.buildProcessBuilder(CommandUtils.scala:47) at org.apache.spark.deploy.worker.ExecutorRunner.fetchAndRunExecutor(ExecutorRunner.scala:132) at org.apache.spark.deploy.worker.ExecutorRunner$$anon$1.run(ExecutorRunner.scala:68) Caused by: java.io.IOException: CreateProcess error=2, The system cannot find the file specified at java.lang.ProcessImpl.create(Native Method) at java.lang.ProcessImpl.init(Unknown Source) at java.lang.ProcessImpl.start(Unknown Source) ... 8 more I'm pretty sure the problem is that Spark is looking for the following path, which mixes forward and back slashes: C:\cygdrive\c\Users\jbeaudan\Spark\spark-1.3.1-bin-hadoop2.4/bin/compute-classpath.cmd Is there anyway to fix this? (Also, I have also tried running this from a normal terminal, instead of from cygwin, and I get the same issue, except this time the path is: C:\Users\jbeaudan\Spark\spark-1.3.1-bin-hadoop2.4\bin../bin/compute-classpath.cmd ) Thank you! Julien smime.p7s Description: S/MIME Cryptographic Signature
Re: Data frames select and where clause dependency
Each operation on a dataframe is completely independent and doesn't know what operations happened before it. When you do a selection, you are removing other columns from the dataframe and so the filter has nothing to operate on. On Fri, Jul 17, 2015 at 11:55 AM, Mike Trienis mike.trie...@orcsol.com wrote: I'd like to understand why the where field must exist in the select clause. For example, the following select statement works fine - df.select(field1, filter_field).filter(df(filter_field) === value).show() However, the next one fails with the error in operator !Filter (filter_field#60 = value); - df.select(field1).filter(df(filter_field) === value).show() As a work-around, it seems that I can do the following - df.select(field1, filter_field).filter(df(filter_field) === value).drop(filter_field).show() Thanks, Mike.
RE: What is java.sql.SQLException: Unsupported type -101?
Does this mean there is a possible mismatch of jdbc driver with oracle? From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Friday, July 17, 2015 2:09 PM To: Sambit Tripathy (RBEI/EDS1) Cc: user@spark.apache.org Subject: Re: What is java.sql.SQLException: Unsupported type -101? Looking at getCatalystType(): * Maps a JDBC type to a Catalyst type. This function is called only when * the JdbcDialect class corresponding to your database driver returns null. sqlType was carrying value of -101 However, I couldn't find -101 in http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b14/java/sql/Types.java FYI On Fri, Jul 17, 2015 at 2:01 PM, Sambit Tripathy (RBEI/EDS1) sambit.tripa...@in.bosch.commailto:sambit.tripa...@in.bosch.com wrote: Hi, I was trying to get a oracle table using JDBC datasource val jdbcDF = sqlContext.load(jdbc, Map( url - jdbc:oracle:thin:USER/p...@host.com:1517:sid, dbtable - USER.TABLE,driver - oracle.jdbc.OracleDriver)) and got the error below java.sql.SQLException: Unsupported type -101 at org.apache.spark.sql.jdbc.JDBCRDD$.getCatalystType(JDBCRDD.scala:78) at org.apache.spark.sql.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:112) at org.apache.spark.sql.jdbc.JDBCRelation.init(JDBCRelation.scala:133) at org.apache.spark.sql.jdbc.DefaultSource.createRelation(JDBCRelation.scala:121) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:219) at org.apache.spark.sql.SQLContext.load(SQLContext.scala:697) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:23) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:28) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:30) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:32) at $iwC$$iwC$$iwC$$iwC.init(console:34) at $iwC$$iwC$$iwC.init(console:36) at $iwC$$iwC.init(console:38) at $iwC.init(console:40) at init(console:42) at .init(console:46) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) Any idea what it could be? Regards, Sambit.