Re: Null pointer exception while replying WAL
Please find below code def main(args: Array[String]): Unit = { val config: Config = ConfigFactory.load() val streamC = StreamingContext.getOrCreate( checkpointDirectory, () => functionToCreateContext(config, checkpointDirectory) ) streamC.start() streamC.awaitTermination() } def functionToCreateContext(config: Config, checkpointDirectory: String): StreamingContext = { val brokerUrl = config.getString("streaming.solace.brokerURL") val username = config.getString("streaming.solace.userName") val passwordSol = config.getString("streaming.solace.password") val vpn = config.getString("streaming.solace.vpn") val queue = config.getString("streaming.solace.queueName") val connectionFactory = config.getString("streaming.solace.connectionFactory") val spark = SparkSession .builder() .appName("rem-Streaming-Consumer") .config("spark.streaming.receiver.writeAheadLog.enable", "true") .config("spark.streaming.blockInterval", blockInterval) .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.streaming.receiver.writeAheadLog.enable", "true") .enableHiveSupport .getOrCreate() val sc = spark.sparkContext val ssc = new StreamingContext(sc, Seconds(batchInterval)) ssc.checkpoint(checkpointDirectory) val converter: Message => Option[String] = { case msg: TextMessage => Some(msg.getText) case _ => None } val props = new Properties() props.setProperty( Context.INITIAL_CONTEXT_FACTORY, "com.solacesystems.jndi.SolJNDIInitialContextFactory" ) props.setProperty(Context.PROVIDER_URL, brokerUrl) props.setProperty(Context.SECURITY_PRINCIPAL, username) props.setProperty(Context.SECURITY_PRINCIPAL, passwordSol) props.setProperty(SupportedProperty.SOLACE_JMS_VPN, vpn) val msgs = JmsStreamUtils.createSynchronousJmsQueueStream( ssc, JndiMessageConsumerFactory(props,QueueJmsDestinationInfo(queue), connectionFactoryName = connectionFactory,messageSelector = ""),converter,1000, 1.second,10.second,StorageLevel.MEMORY_AND_DISK_SER_2 ) msgs.foreachRDD(rdd => if (rdd.take(1).length > 0) { val messages: RDD[String] = rdd.map { sr => if (sr == null) { println("NO records found") "NO records found" } else { println("Input Records from Solace queue : " + sr.toString) sr.toString } } Thread.sleep(12) try{ * val messagesJson = spark.read.json(messages) ===> getting NPE here after restarting using WAL* messagesJson.write.mode("append").parquet(data) } catch { case ex => ex.printStackTrace() } }) ssc } Thanks & Regards, Nayan Sharma *+91-8095382952* <https://www.linkedin.com/in/nayan-sharma> <http://stackoverflow.com/users/3687426/nayan-sharma?tab=profile> On Mon, Feb 12, 2024 at 4:23 AM Mich Talebzadeh wrote: > Hi, > > It is challenging to make a recommendation without further details. I am > guessing you are trying to build a fault-tolerant spark application (spark > structured streaming) that consumes messages from Solace? > To address *NullPointerException* in the context of the provided > information, you need to review the part of the code where the exception is > thrown and identifying which object or method call is resulting in *null* can > help the debugging process plus checking the logs. > > HTH > > Mich Talebzadeh, > Dad | Technologist | Solutions Architect | Engineer > London > United Kingdom > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sat, 10 Feb 2024 at 05:29, nayan sharma > wrote: > >> Hi Users, >> >> I am trying to build fault tolerant spark solace consumer. >> >> Issue :- we have to take restart of the job due to multiple issue load >> average is one of them. At that time whatever spark is processing or >> batches in the queue is lost. We can't replay it because we already had >> send ack while calling store(). >> >> Solution:- I have tried implementing WAL and checkpointing in the >> solution. Job is able to identify the lost batches, records are not being >> written in the log file but throwing NPE. >> >> We are creating sparkcontext using sc.getorcreate() >> >> >> Thanks, >> Nayan >> >
Null pointer exception while replying WAL
Hi Users, I am trying to build fault tolerant spark solace consumer. Issue :- we have to take restart of the job due to multiple issue load average is one of them. At that time whatever spark is processing or batches in the queue is lost. We can't replay it because we already had send ack while calling store(). Solution:- I have tried implementing WAL and checkpointing in the solution. Job is able to identify the lost batches, records are not being written in the log file but throwing NPE. We are creating sparkcontext using sc.getorcreate() Thanks, Nayan
Nifi flowfile monitoring
Hi Users, Is there anyway through which I can monitor or raise alert if any flow file got stuck in nifi queue. For now operation team needs to manually check for these. If you can suggest way through which I can achieve this that would be great. Thanks, Nayan -- Thanks & Regards, Nayan Sharma *+91-8095382952* <https://www.linkedin.com/in/nayan-sharma> <http://stackoverflow.com/users/3687426/nayan-sharma?tab=profile>
Kafka Spark Structure Streaming Error
) at org.apache.spark.sql.execution.datasources.orc.OrcOutputWriter.close(OrcOutputWriter.scala:58) at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57) at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.abort(FileFormatDataWriter.scala:83) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$1.apply$mcV$sp(FileFormatWriter.scala:255) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1377) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:253) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:174) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:413) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1334) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:419) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Thanks & Regards, Nayan Sharma *+91-8095382952* <https://www.linkedin.com/in/nayan-sharma> <http://stackoverflow.com/users/3687426/nayan-sharma?tab=profile>
Re: ConsumeJMS causing multiple BIND/UNBIND request
NiFi 1.11 .0 On Thu, 13 Jan 2022 at 18:27, Juan Pablo Gardella < gardellajuanpa...@gmail.com> wrote: > Which Nifi version are you using? > > On Thu, 13 Jan 2022 at 09:14, nayan sharma > wrote: > >> Hi All, >> >> >> >> We are using consumeJMS to consume messages from the Solaxce system from >> multiple queues but we are getting multiple bind/unbind requests. >> >> As we are getting alerts *SYSTEM_LOGGING_LOST_EVENT* alerts on solace >> production appliance leading to buffer pool is exhausted. >> >> >> >> >> >> We have seen that high *CLIENT_CLIENT_UNBIND* requests from applications >> and that leads to very high solace logging events. >> >> >> Any suggestion would be appreciated. >> >> >> Thanks & Regards, >> Nayan Sharma >> *+91-8095382952* >> >> <https://www.linkedin.com/in/nayan-sharma> >> <http://stackoverflow.com/users/3687426/nayan-sharma?tab=profile> >> > -- Thanks & Regards, Nayan Sharma *+91-8095382952* <https://www.linkedin.com/in/nayan-sharma> <http://stackoverflow.com/users/3687426/nayan-sharma?tab=profile>
ConsumeJMS causing multiple BIND/UNBIND request
Hi All, We are using consumeJMS to consume messages from the Solaxce system from multiple queues but we are getting multiple bind/unbind requests. As we are getting alerts *SYSTEM_LOGGING_LOST_EVENT* alerts on solace production appliance leading to buffer pool is exhausted. We have seen that high *CLIENT_CLIENT_UNBIND* requests from applications and that leads to very high solace logging events. Any suggestion would be appreciated. Thanks & Regards, Nayan Sharma *+91-8095382952* <https://www.linkedin.com/in/nayan-sharma> <http://stackoverflow.com/users/3687426/nayan-sharma?tab=profile>
Re: mergeContent not working
https://imgur.com/aCzkWfu On 2020/07/30 17:32:47, nayan sharma wrote: > Hi Users, > I am using mergeContent for emitting flow files when size will be greater > than 1 Gb. It is scheduled to run/check for files every 30sec. MergeContent > has following configuration >https://imgur.com/undefined > but it doesn't wait for anything. It emits files as soon as it wakes. It is > creating files in mbs. > Any ideas what configuration i should do to make it work. > > Thanks, > Nayan > > >
mergeContent not working
Hi Users, I am using mergeContent for emitting flow files when size will be greater than 1 Gb. It is scheduled to run/check for files every 30sec. MergeContent has following configuration https://imgur.com/undefined but it doesn't wait for anything. It emits files as soon as it wakes. It is creating files in mbs. Any ideas what configuration i should do to make it work. Thanks, Nayan
Re: NIFI 1.9.2 stuck in cluster mode
Hi Mark, Thanks for clarifying things for me. Do we have any workaround ? Because load balancer between listSftp and fetchSftp is also not working seamlessly in 1.9.2. Out of 60 files it processes only 30 and remaining stuck in the queue. Is there any way where I can debug things. I have taken thread-dump also but it doesn't gave me much clarity. So even if we have cluster we are not taking full advantage of it. Please suggest me. Thanks, Nayan On Tue, 10 Dec 2019, 21:56 Mark Payne, wrote: > Nayan, > > Yes, version 1.10.0 does have a dependency on 3.5.5. > > Thanks > -Mark > > On Dec 10, 2019, at 11:13 AM, nayan sharma > wrote: > > Hi Mark, > > I was trying to update NIFI cluster to 1.10.0 but I am getting error > *Connection > State changed to SUSPENDED,RECONNECTED and > zookeeper.KeeperException$ConnectionLossException : > KeeperErrorCode=ConnectionLoss* > > I am not using embedded zookeeper. Zookeeper version 3.4.6. > Do NIFI 1.10.0 require zookeeper 3.5.5 ? > > Thanks & Regards, > Nayan Sharma > *+91-8095382952* > > <https://www.linkedin.com/in/nayan-sharma> > <http://stackoverflow.com/users/3687426/nayan-sharma?tab=profile> > > > On Wed, Dec 4, 2019 at 4:18 PM nayan sharma > wrote: > >> Hi Mark, >> One more thing I have noticed about the putHDFS that only two files has >> been written and rest of them are in complete. Please refer image. >> https://i.imgur.com/hu26hQG.png >> >> Thank you, >> Mark >> >> On 2019/12/04 07:30:03, nayan sharma wrote: >> > Hi Mark, >> > Thanks for your valuable suggestion. It worked a lot. Now I can >> understand, there is no point in load balancing between FetchSFTP and >> CompressContent. >> > >> > After making all the changes it worked but some of the flow files are >> stuck between CompressContent and putHDFS >> https://i.imgur.com/oSYkYuA.png >> > >> > And 2nd thing is that 10 FlowFiles between ListSFTP and FetchSFTP is >> there for long time >> > https://i.imgur.com/Q44VDW6.png >> > >> > Please suggested where I can start debugging these two issues. >> > >> > Meanwhile we are migrating to 1.10.0. This time we are doing through >> HDF and it has NIFI 1.9.0 as latest version. We are planing to replace the >> library and content of 1.9.0 with 1.10.0. Can we go ahead with this >> approach or is there are other way. >> > >> > Currently 1.9.2 is an independent cluster. >> > >> > >> > >> > On 2019/12/03 14:30:43, Mark Payne wrote: >> > > Nayan, >> > > >> > > Looking at the screenshot, I can see two different connections there >> that are load balanced. One of them holds the nearly 100 GB of data. >> > > >> > > There are a handful of bugs related to load-balanced connections in >> 1.9.2 that were addressed in 1.10.0. If you're relying on load-balanced >> connections to spread data across the cluster (and this particular flow >> clearly is), then I would strongly encourage you to upgrade to 1.10.0 >> because at least one of these bugs does cause the flow to appear to stop >> flowing. >> > > >> > > That being said, there are two other things that you may want to >> consider: >> > > >> > > 1. You're trying to load balance 100 GB of data spread across 6 >> files. So each file is nearly 20 GB of data. It may take a little while to >> push that from Node A to Node B. If the data is queued up, waiting to go to >> another node, or is on the way to another node, it will not be shown in the >> FlowFile listing. That will only show FlowFiles that are queued up to be >> processed on the node that it currently lives on. >> > > >> > > 2. You should not be using a load balanced connection between >> FetchSFTP and CompressContent. The way that these processors are designed, >> the listing should be performed, and then the connection between ListSFTP >> and FetchSFTP should be load balanced. Once that has happened, the listing >> has been federated across the cluster, so whichever node receives the >> listing for File A should be responsible for fetching and processing it. >> Since the listing has already been spread across the cluster, there is no >> benefit to fetching the data, and then re-spreading it across the cluster. >> This will be very expensive with little to no benefit. Similarly, you don't >> want to load balance between CompressContent and PutHDFS. Simply load >> balance the listing itself (which is very cheap because the FlowFiles have >
Re: NIFI 1.9.2 stuck in cluster mode
Hi Mark, I was trying to update NIFI cluster to 1.10.0 but I am getting error *Connection State changed to SUSPENDED,RECONNECTED and zookeeper.KeeperException$ConnectionLossException : KeeperErrorCode=ConnectionLoss* I am not using embedded zookeeper. Zookeeper version 3.4.6. Do NIFI 1.10.0 require zookeeper 3.5.5 ? Thanks & Regards, Nayan Sharma *+91-8095382952* <https://www.linkedin.com/in/nayan-sharma> <http://stackoverflow.com/users/3687426/nayan-sharma?tab=profile> On Wed, Dec 4, 2019 at 4:18 PM nayan sharma wrote: > Hi Mark, > One more thing I have noticed about the putHDFS that only two files has > been written and rest of them are in complete. Please refer image. > https://i.imgur.com/hu26hQG.png > > Thank you, > Mark > > On 2019/12/04 07:30:03, nayan sharma wrote: > > Hi Mark, > > Thanks for your valuable suggestion. It worked a lot. Now I can > understand, there is no point in load balancing between FetchSFTP and > CompressContent. > > > > After making all the changes it worked but some of the flow files are > stuck between CompressContent and putHDFS https://i.imgur.com/oSYkYuA.png > > > > And 2nd thing is that 10 FlowFiles between ListSFTP and FetchSFTP is > there for long time > > https://i.imgur.com/Q44VDW6.png > > > > Please suggested where I can start debugging these two issues. > > > > Meanwhile we are migrating to 1.10.0. This time we are doing through HDF > and it has NIFI 1.9.0 as latest version. We are planing to replace the > library and content of 1.9.0 with 1.10.0. Can we go ahead with this > approach or is there are other way. > > > > Currently 1.9.2 is an independent cluster. > > > > > > > > On 2019/12/03 14:30:43, Mark Payne wrote: > > > Nayan, > > > > > > Looking at the screenshot, I can see two different connections there > that are load balanced. One of them holds the nearly 100 GB of data. > > > > > > There are a handful of bugs related to load-balanced connections in > 1.9.2 that were addressed in 1.10.0. If you're relying on load-balanced > connections to spread data across the cluster (and this particular flow > clearly is), then I would strongly encourage you to upgrade to 1.10.0 > because at least one of these bugs does cause the flow to appear to stop > flowing. > > > > > > That being said, there are two other things that you may want to > consider: > > > > > > 1. You're trying to load balance 100 GB of data spread across 6 files. > So each file is nearly 20 GB of data. It may take a little while to push > that from Node A to Node B. If the data is queued up, waiting to go to > another node, or is on the way to another node, it will not be shown in the > FlowFile listing. That will only show FlowFiles that are queued up to be > processed on the node that it currently lives on. > > > > > > 2. You should not be using a load balanced connection between > FetchSFTP and CompressContent. The way that these processors are designed, > the listing should be performed, and then the connection between ListSFTP > and FetchSFTP should be load balanced. Once that has happened, the listing > has been federated across the cluster, so whichever node receives the > listing for File A should be responsible for fetching and processing it. > Since the listing has already been spread across the cluster, there is no > benefit to fetching the data, and then re-spreading it across the cluster. > This will be very expensive with little to no benefit. Similarly, you don't > want to load balance between CompressContent and PutHDFS. Simply load > balance the listing itself (which is very cheap because the FlowFiles have > no content) and the data will automatically be balanced across the cluster. > > > > > > Thanks > > > -Mark > > > > > > > > > > On Dec 3, 2019, at 9:18 AM, nayan sharma > wrote: > > > > > > > > Hi, > > > > Thanks for your reply. > > > > Please find the attachment. Flow files has been for last 7 days. And > while listing flow files it says The queue has no Flow Files. > > > > Let me know your thoughts. > > > > > > > > Thanks & Regards, > > > > Nayan Sharma > > > > +91-8095382952 > > > > > > > > <https://www.linkedin.com/in/nayan-sharma> < > http://stackoverflow.com/users/3687426/nayan-sharma?tab=profile> > > > > > > > > On Tue, Dec 3, 2019 at 7:34 PM Bryan Bende <mailto:bbe...@gmail.com>> wrote: > > > > Hello, > > > > > > > > It would be helpful if you coul
Re: NIFI 1.9.2 stuck in cluster mode
Hi Mark, One more thing I have noticed about the putHDFS that only two files has been written and rest of them are in complete. Please refer image. https://i.imgur.com/hu26hQG.png Thank you, Mark On 2019/12/04 07:30:03, nayan sharma wrote: > Hi Mark, > Thanks for your valuable suggestion. It worked a lot. Now I can understand, > there is no point in load balancing between FetchSFTP and CompressContent. > > After making all the changes it worked but some of the flow files are stuck > between CompressContent and putHDFS https://i.imgur.com/oSYkYuA.png > > And 2nd thing is that 10 FlowFiles between ListSFTP and FetchSFTP is there > for long time > https://i.imgur.com/Q44VDW6.png > > Please suggested where I can start debugging these two issues. > > Meanwhile we are migrating to 1.10.0. This time we are doing through HDF and > it has NIFI 1.9.0 as latest version. We are planing to replace the library > and content of 1.9.0 with 1.10.0. Can we go ahead with this approach or is > there are other way. > > Currently 1.9.2 is an independent cluster. > > > > On 2019/12/03 14:30:43, Mark Payne wrote: > > Nayan, > > > > Looking at the screenshot, I can see two different connections there that > > are load balanced. One of them holds the nearly 100 GB of data. > > > > There are a handful of bugs related to load-balanced connections in 1.9.2 > > that were addressed in 1.10.0. If you're relying on load-balanced > > connections to spread data across the cluster (and this particular flow > > clearly is), then I would strongly encourage you to upgrade to 1.10.0 > > because at least one of these bugs does cause the flow to appear to stop > > flowing. > > > > That being said, there are two other things that you may want to consider: > > > > 1. You're trying to load balance 100 GB of data spread across 6 files. So > > each file is nearly 20 GB of data. It may take a little while to push that > > from Node A to Node B. If the data is queued up, waiting to go to another > > node, or is on the way to another node, it will not be shown in the > > FlowFile listing. That will only show FlowFiles that are queued up to be > > processed on the node that it currently lives on. > > > > 2. You should not be using a load balanced connection between FetchSFTP and > > CompressContent. The way that these processors are designed, the listing > > should be performed, and then the connection between ListSFTP and FetchSFTP > > should be load balanced. Once that has happened, the listing has been > > federated across the cluster, so whichever node receives the listing for > > File A should be responsible for fetching and processing it. Since the > > listing has already been spread across the cluster, there is no benefit to > > fetching the data, and then re-spreading it across the cluster. This will > > be very expensive with little to no benefit. Similarly, you don't want to > > load balance between CompressContent and PutHDFS. Simply load balance the > > listing itself (which is very cheap because the FlowFiles have no content) > > and the data will automatically be balanced across the cluster. > > > > Thanks > > -Mark > > > > > > > On Dec 3, 2019, at 9:18 AM, nayan sharma wrote: > > > > > > Hi, > > > Thanks for your reply. > > > Please find the attachment. Flow files has been for last 7 days. And > > > while listing flow files it says The queue has no Flow Files. > > > Let me know your thoughts. > > > > > > Thanks & Regards, > > > Nayan Sharma > > > +91-8095382952 > > > > > > <https://www.linkedin.com/in/nayan-sharma> > > > <http://stackoverflow.com/users/3687426/nayan-sharma?tab=profile> > > > > > > On Tue, Dec 3, 2019 at 7:34 PM Bryan Bende > > <mailto:bbe...@gmail.com>> wrote: > > > Hello, > > > > > > It would be helpful if you could upload a screenshot of your flow > > > somewhere and send a link. > > > > > > Thanks, > > > > > > Bryan > > > > > > On Tue, Dec 3, 2019 at 6:06 AM nayan sharma > > <mailto:nayansharm...@gmail.com>> wrote: > > > > > > > > Hi, > > > > I am using 2 nodes cluster. > > > > nodes config Heap(max) 48gb & 64 core machine > > > > Processor flow > > > > ListSFTP--->FetchSFTP(all nodes with 10 threads)--->CompressContent(all > > > > nodes,10 threads)-->PutHDFS > > > > > > > > Queues shows it has 96gb in queue but when I do listing it shows no > > > > flow files. > > > > > > > > Everything seems stuck, nothing is moving. > > > > > > > > I was wondering and curious also even if with such heavy machines, > > > > What I am doing wrong or with which config parameter. > > > > > > > > I couldn't find out solution for by myself so I reached here. Any help > > > > or suggestion will be much highly appreciated. > > > > > > > > Thanks, > > > > Nayan > > > > > > > >
Re: NIFI 1.9.2 stuck in cluster mode
Hi Mark, Thanks for your valuable suggestion. It worked a lot. Now I can understand, there is no point in load balancing between FetchSFTP and CompressContent. After making all the changes it worked but some of the flow files are stuck between CompressContent and putHDFS https://i.imgur.com/oSYkYuA.png And 2nd thing is that 10 FlowFiles between ListSFTP and FetchSFTP is there for long time https://i.imgur.com/Q44VDW6.png Please suggested where I can start debugging these two issues. Meanwhile we are migrating to 1.10.0. This time we are doing through HDF and it has NIFI 1.9.0 as latest version. We are planing to replace the library and content of 1.9.0 with 1.10.0. Can we go ahead with this approach or is there are other way. Currently 1.9.2 is an independent cluster. On 2019/12/03 14:30:43, Mark Payne wrote: > Nayan, > > Looking at the screenshot, I can see two different connections there that are > load balanced. One of them holds the nearly 100 GB of data. > > There are a handful of bugs related to load-balanced connections in 1.9.2 > that were addressed in 1.10.0. If you're relying on load-balanced connections > to spread data across the cluster (and this particular flow clearly is), then > I would strongly encourage you to upgrade to 1.10.0 because at least one of > these bugs does cause the flow to appear to stop flowing. > > That being said, there are two other things that you may want to consider: > > 1. You're trying to load balance 100 GB of data spread across 6 files. So > each file is nearly 20 GB of data. It may take a little while to push that > from Node A to Node B. If the data is queued up, waiting to go to another > node, or is on the way to another node, it will not be shown in the FlowFile > listing. That will only show FlowFiles that are queued up to be processed on > the node that it currently lives on. > > 2. You should not be using a load balanced connection between FetchSFTP and > CompressContent. The way that these processors are designed, the listing > should be performed, and then the connection between ListSFTP and FetchSFTP > should be load balanced. Once that has happened, the listing has been > federated across the cluster, so whichever node receives the listing for File > A should be responsible for fetching and processing it. Since the listing has > already been spread across the cluster, there is no benefit to fetching the > data, and then re-spreading it across the cluster. This will be very > expensive with little to no benefit. Similarly, you don't want to load > balance between CompressContent and PutHDFS. Simply load balance the listing > itself (which is very cheap because the FlowFiles have no content) and the > data will automatically be balanced across the cluster. > > Thanks > -Mark > > > > On Dec 3, 2019, at 9:18 AM, nayan sharma wrote: > > > > Hi, > > Thanks for your reply. > > Please find the attachment. Flow files has been for last 7 days. And while > > listing flow files it says The queue has no Flow Files. > > Let me know your thoughts. > > > > Thanks & Regards, > > Nayan Sharma > > +91-8095382952 > > > > <https://www.linkedin.com/in/nayan-sharma> > > <http://stackoverflow.com/users/3687426/nayan-sharma?tab=profile> > > > > On Tue, Dec 3, 2019 at 7:34 PM Bryan Bende > <mailto:bbe...@gmail.com>> wrote: > > Hello, > > > > It would be helpful if you could upload a screenshot of your flow > > somewhere and send a link. > > > > Thanks, > > > > Bryan > > > > On Tue, Dec 3, 2019 at 6:06 AM nayan sharma > <mailto:nayansharm...@gmail.com>> wrote: > > > > > > Hi, > > > I am using 2 nodes cluster. > > > nodes config Heap(max) 48gb & 64 core machine > > > Processor flow > > > ListSFTP--->FetchSFTP(all nodes with 10 threads)--->CompressContent(all > > > nodes,10 threads)-->PutHDFS > > > > > > Queues shows it has 96gb in queue but when I do listing it shows no flow > > > files. > > > > > > Everything seems stuck, nothing is moving. > > > > > > I was wondering and curious also even if with such heavy machines, What > > > I am doing wrong or with which config parameter. > > > > > > I couldn't find out solution for by myself so I reached here. Any help or > > > suggestion will be much highly appreciated. > > > > > > Thanks, > > > Nayan > > > >
NIFI 1.9.2 stuck in cluster mode
Hi, I am using 2 nodes cluster. nodes config Heap(max) 48gb & 64 core machine Processor flow ListSFTP--->FetchSFTP(all nodes with 10 threads)--->CompressContent(all nodes,10 threads)-->PutHDFS Queues shows it has 96gb in queue but when I do listing it shows no flow files. Everything seems stuck, nothing is moving. I was wondering and curious also even if with such heavy machines, What I am doing wrong or with which config parameter. I couldn't find out solution for by myself so I reached here. Any help or suggestion will be much highly appreciated. Thanks, Nayan
Re: Spark Druid Ingestion
Hey Jorge, Thanks for responding. Can you elaborate on the user permission part ? HDFS or local ? As of now, hdfs path -> hdfs://n2pl-pa-hdn220.xxx.xxx:8020/user/yarn/.sparkStaging/application_1521457397747_0013/__spark_libs__8247917347016008883.zip already has complete access for yarn user and my job is also running from the same user. Thanks, Nayan > On Mar 22, 2018, at 12:54 PM, Jorge Machado <jom...@me.com> wrote: > > Seems to me permissions problems ! Can you check your user / folder > permissions ? > > Jorge Machado > > > > > >> On 22 Mar 2018, at 08:21, nayan sharma <nayansharm...@gmail.com >> <mailto:nayansharm...@gmail.com>> wrote: >> >> Hi All, >> As druid uses Hadoop MapReduce to ingest batch data but I am trying spark >> for ingesting data into druid taking reference from >> https://github.com/metamx/druid-spark-batch >> <https://github.com/metamx/druid-spark-batch> >> But we are stuck at the following error. >> Application Log:—> >> 2018-03-20T07:54:28,782 INFO [task-runner-0-priority-0] >> org.apache.spark.deploy.yarn.Client - Will allocate AM container, with 896 >> MB memory including 384 MB overhead >> 2018-03-20T07:54:28,782 INFO [task-runner-0-priority-0] >> org.apache.spark.deploy.yarn.Client - Setting up container launch context >> for our AM >> 2018-03-20T07:54:28,785 INFO [task-runner-0-priority-0] >> org.apache.spark.deploy.yarn.Client - Setting up the launch environment for >> our AM container >> 2018-03-20T07:54:28,793 INFO [task-runner-0-priority-0] >> org.apache.spark.deploy.yarn.Client - Preparing resources for our AM >> container >> 2018-03-20T07:54:29,364 WARN [task-runner-0-priority-0] >> org.apache.spark.deploy.yarn.Client - Neither spark.yarn.jars nor >> spark.yarn.archive is set, falling back to uploading libraries under >> SPARK_HOME. >> 2018-03-20T07:54:29,371 INFO [task-runner-0-priority-0] >> org.apache.spark.deploy.yarn.Client - Uploading resource >> file:/hdfs1/druid-0.11.0/var/tmp/spark-49af67df-1a21-4790-a02b-c737c7a44946/__spark_libs__8247917347016008883.zip >> -> >> hdfs://n2pl-pa-hdn220.xxx.xxx:8020/user/yarn/.sparkStaging/application_1521457397747_0013/__spark_libs__8247917347016008883.zip >> >> >> 2018-03-20T07:54:29,607 INFO [task-runner-0-priority-0] >> org.apache.spark.deploy.yarn.Client - Uploading resource >> file:/hdfs1/druid-0.11.0/var/tmp/spark-49af67df-1a21-4790-a02b-c737c7a44946/__spark_conf__2240950972346324291.zip >> -> >> hdfs://n2pl-pa-hdn220.xxx.xxx:8020/user/yarn/.sparkStaging/application_1521457397747_0013/__spark_conf__.zip >> >> >> 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] >> org.apache.spark.SecurityManager - Changing view acls to: yarn >> 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] >> org.apache.spark.SecurityManager - Changing modify acls to: yarn >> 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] >> org.apache.spark.SecurityManager - Changing view acls groups to: >> 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] >> org.apache.spark.SecurityManager - Changing modify acls groups to: >> 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] >> org.apache.spark.SecurityManager - SecurityManager: authentication disabled; >> ui acls disabled; users with view permissions: Set(yarn); groups with view >> permissions: Set(); users with modify permissions: Set(yarn); groups with >> modify permissions: Set() >> 2018-03-20T07:54:29,679 INFO [task-runner-0-priority-0] >> org.apache.spark.deploy.yarn.Client - Submitting application >> application_1521457397747_0013 to ResourceManager >> 2018-03-20T07:54:29,709 INFO [task-runner-0-priority-0] >> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted >> application application_1521457397747_0013 >> 2018-03-20T07:54:29,713 INFO [task-runner-0-priority-0] >> org.apache.spark.scheduler.cluster.SchedulerExtensionServices - Starting >> Yarn extension services with app application_1521457397747_0013 and >> attemptId None >> 2018-03-20T07:54:30,722 INFO [task-runner-0-priority-0] >> org.apache.spark.deploy.yarn.Client - Application report for >> application_1521457397747_0013 (state: FAILED) >> 2018-03-20T07:54:30,729 INFO [task-runner-0-priority-0] >> org.apache.spark.deploy.yarn.Client - >> client token: N/A >> diagnostics: Application application_1521457397747_0013 failed 2 times >> due to AM Container for appattempt_1521457397747_0013_02 exited with >> exitCode: -10
Spark Druid Ingestion
Hi All,As druid uses Hadoop MapReduce to ingest batch data but I am trying spark for ingesting data into druid taking reference from https://github.com/metamx/druid-spark-batchBut we are stuck at the following error.Application Log:—>2018-03-20T07:54:28,782 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Will allocate AM container, with 896 MB memory including 384 MB overhead2018-03-20T07:54:28,782 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Setting up container launch context for our AM 2018-03-20T07:54:28,785 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Setting up the launch environment for our AM container 2018-03-20T07:54:28,793 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Preparing resources for our AM container 2018-03-20T07:54:29,364 WARN [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME. 2018-03-20T07:54:29,371 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Uploading resource file:/hdfs1/druid-0.11.0/var/tmp/spark-49af67df-1a21-4790-a02b-c737c7a44946/__spark_libs__8247917347016008883.zip -> hdfs://n2pl-pa-hdn220.xxx.xxx:8020/user/yarn/.sparkStaging/application_1521457397747_0013/__spark_libs__8247917347016008883.zip 2018-03-20T07:54:29,607 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Uploading resource file:/hdfs1/druid-0.11.0/var/tmp/spark-49af67df-1a21-4790-a02b-c737c7a44946/__spark_conf__2240950972346324291.zip -> hdfs://n2pl-pa-hdn220.xxx.xxx:8020/user/yarn/.sparkStaging/application_1521457397747_0013/__spark_conf__.zip 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] org.apache.spark.SecurityManager - Changing view acls to: yarn 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] org.apache.spark.SecurityManager - Changing modify acls to: yarn 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] org.apache.spark.SecurityManager - Changing view acls groups to: 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] org.apache.spark.SecurityManager - Changing modify acls groups to: 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] org.apache.spark.SecurityManager - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn); groups with view permissions: Set(); users with modify permissions: Set(yarn); groups with modify permissions: Set() 2018-03-20T07:54:29,679 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Submitting application application_1521457397747_0013 to ResourceManager 2018-03-20T07:54:29,709 INFO [task-runner-0-priority-0] org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1521457397747_0013 2018-03-20T07:54:29,713 INFO [task-runner-0-priority-0] org.apache.spark.scheduler.cluster.SchedulerExtensionServices - Starting Yarn extension services with app application_1521457397747_0013 and attemptId None 2018-03-20T07:54:30,722 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Application report for application_1521457397747_0013 (state: FAILED) 2018-03-20T07:54:30,729 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - client token: N/A diagnostics: Application application_1521457397747_0013 failed 2 times due to AM Container for appattempt_1521457397747_0013_02 exited with exitCode: -1000 For more detailed output, check the application tracking page: http://n-pa-hdn220.xxx.:8088/cluster/app/application_1521457397747_0013 Then click on links to logs of each attempt. Diagnostics: No such file or directory ENOENT: No such file or directory at org.apache.hadoop.io.nativeio.NativeIO$POSIX.chmodImpl(Native Method) at org.apache.hadoop.io.nativeio.NativeIO$POSIX.chmod(NativeIO.java:230) at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:756) at org.apache.hadoop.fs.DelegateToFileSystem.setPermission(DelegateToFileSystem.java:211) at org.apache.hadoop.fs.FilterFs.setPermission(FilterFs.java:252) at org.apache.hadoop.fs.FileContext$11.next(FileContext.java:1003) at org.apache.hadoop.fs.FileContext$11.next(FileContext.java:999) at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) at org.apache.hadoop.fs.FileContext.setPermission(FileContext.java:1006) at org.apache.hadoop.yarn.util.FSDownload$3.run(FSDownload.java:421) at org.apache.hadoop.yarn.util.FSDownload$3.run(FSDownload.java:419) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866) at org.apache.hadoop.yarn.util.FSDownload.changePermissions(FSDownload.java:419) at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:365) at
Re: splitting columns into new columns
Hi Pralabh, Thanks for your help. val xx = columnList.map(x => x->0).toMap val opMap = dataFrame.rdd.flatMap { row => columnList.foldLeft(xx) { case (y, col) => val s = row.getAs[String](col).split("\\^").length if (y(col) < s) y.updated(col, s) else y }.toList } val colMaxSizeMap = opMap.groupBy(x => x._1).map(x => x._2.toList.maxBy(x => x._2)).collect().toMap val x = dataFrame.rdd.map{x => val op = columnList.flatMap{ y => val op = x.getAs[String](y).split("\\^") op++List.fill(colMaxSizeMap(y)-op.size)("") } Row.fromSeq(op) } val structFieldList = columnList.flatMap{colName => List.range(0,colMaxSizeMap(colName),1).map{ i => StructField(s"$colName"+s"$i",StringType) } } val schema = StructType(structFieldList) val data1=spark.createDataFrame(x,schema) opMap res13: org.apache.spark.rdd.RDD[(String, Int)] But It is failing when opMap has null value.It is throwing java.lang.NullPointerException trying to figure out. val opMap1=opMap.filter(_._2 !="") tried doing this but it is also failing with same exception. Thanks, Nayan > On 17-Jul-2017, at 4:54 PM, Pralabh Kumar <pralabhku...@gmail.com> wrote: > > Hi Nayan > > Please find the solution of your problem which work on spark 2. > > val spark = > SparkSession.builder().appName("practice").enableHiveSupport().getOrCreate() > val sc = spark.sparkContext > val sqlContext = spark.sqlContext > import spark.implicits._ > val dataFrame = > sc.parallelize(List("ERN~58XX7~^EPN~5X551~|C~MXXX~MSO~^CAxxE~~3XXX5")) > .map(s=>s.split("\\|")).map(s=>(s(0),s(1))) > .toDF("phone","contact") > dataFrame.show() > val newDataSet= dataFrame.rdd.map(data=>{ > val t1 = ArrayBuffer[String] () > for (i <- 0.to <http://0.to/>(1)) { > val col = data.get(i).asInstanceOf[String] > val dd= col.split("\\^").toSeq > for(col<-dd){ > t1 +=(col) > } > } > Row.fromSeq(t1.seq) > }) > > val firtRow = dataFrame.select("*").take(1)(0) > dataFrame.schema.fieldNames > var schema ="" > > for ((colNames,idx) <- dataFrame.schema.fieldNames.zipWithIndex.view) { > val data = firtRow(idx).asInstanceOf[String].split("\\^") > var j = 0 > for(d<-data){ > schema = schema + colNames + j + "," > j = j+1 > } > } > schema=schema.substring(0,schema.length-1) > val sqlSchema = > StructType(schema.split(",").map(s=>StructField(s,StringType,false))) > sqlContext.createDataFrame(newDataSet,sqlSchema).show() > > Regards > Pralabh Kumar > > > On Mon, Jul 17, 2017 at 1:55 PM, nayan sharma <nayansharm...@gmail.com > <mailto:nayansharm...@gmail.com>> wrote: > If I have 2-3 values in a column then I can easily separate it and create new > columns with withColumn option. > but I am trying to achieve it in loop and dynamically generate the new > columns as many times the ^ has occurred in column values > > Can it be achieve in this way. > >> On 17-Jul-2017, at 3:29 AM, ayan guha <guha.a...@gmail.com >> <mailto:guha.a...@gmail.com>> wrote: >> >> You are looking for explode function. >> >> On Mon, 17 Jul 2017 at 4:25 am, nayan sharma <nayansharm...@gmail.com >> <mailto:nayansharm...@gmail.com>> wrote: >> I’ve a Dataframe where in some columns there are multiple values, always >> separated by ^ >> >> phone|contact| >> ERN~58XX7~^EPN~5X551~|C~MXXX~MSO~^CAxxE~~3XXX5| >> >> phone1|phone2|contact1|contact2| >> ERN~5XXX7|EPN~5891551~|C~MXXXH~MSO~|CAxxE~~3XXX5| >> How can this be achieved using loop as the separator between column values >> are not constant. >> >> data.withColumn("phone",split($"phone","\\^")).select($"phone".getItem(0).as("phone1"),$"phone".getItem(1).as("phone2”)) >> I though of doing this way but the problem is column are having 100+ >> separator between the column values >> >> >> >> Thank you, >> Nayan >> -- >> Best Regards, >> Ayan Guha > >
Re: splitting columns into new columns
If I have 2-3 values in a column then I can easily separate it and create new columns with withColumn option. but I am trying to achieve it in loop and dynamically generate the new columns as many times the ^ has occurred in column values Can it be achieve in this way. > On 17-Jul-2017, at 3:29 AM, ayan guha <guha.a...@gmail.com> wrote: > > You are looking for explode function. > > On Mon, 17 Jul 2017 at 4:25 am, nayan sharma <nayansharm...@gmail.com > <mailto:nayansharm...@gmail.com>> wrote: > I’ve a Dataframe where in some columns there are multiple values, always > separated by ^ > > phone|contact| > ERN~58XX7~^EPN~5X551~|C~MXXX~MSO~^CAxxE~~3XXX5| > > phone1|phone2|contact1|contact2| > ERN~5XXX7|EPN~5891551~|C~MXXXH~MSO~|CAxxE~~3XXX5| > How can this be achieved using loop as the separator between column values > are not constant. > > data.withColumn("phone",split($"phone","\\^")).select($"phone".getItem(0).as("phone1"),$"phone".getItem(1).as("phone2”)) > I though of doing this way but the problem is column are having 100+ > separator between the column values > > > > Thank you, > Nayan > -- > Best Regards, > Ayan Guha
splitting columns into new columns
I’ve a Dataframe where in some columns there are multiple values, always separated by ^ phone|contact| ERN~58XX7~^EPN~5X551~|C~MXXX~MSO~^CAxxE~~3XXX5| phone1|phone2|contact1|contact2| ERN~5XXX7|EPN~5891551~|C~MXXXH~MSO~|CAxxE~~3XXX5| How can this be achieved using loop as the separator between column values are not constant. data.withColumn("phone",split($"phone","\\^")).select($"phone".getItem(0).as("phone1"),$"phone".getItem(1).as("phone2”)) I though of doing this way but the problem is column are having 100+ separator between the column values Thank you, Nayan
ElasticSearch Spark error
Hi All, ERROR:- Caused by: org.apache.spark.util.TaskCompletionListenerException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[10.0.1.8*:9200, 10.0.1.**:9200, 10.0.1.***:9200]] I am getting this error while trying to show the dataframe. df.count =5190767 and df.printSchema both are working fine. It has 329 columns. Do any one have any idea regarding this.Please help me to fix this. Thanks, Nayan
Test
Test - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Fwd: isin query
Thanks for responding. df.filter($”msrid”===“m_123” || $”msrid”===“m_111”) there are lots of workaround to my question but Can you let know whats wrong with the “isin” query. Regards, Nayan > Begin forwarded message: > > From: ayan guha <guha.a...@gmail.com> > Subject: Re: isin query > Date: 17 April 2017 at 8:13:24 PM IST > To: nayan sharma <nayansharm...@gmail.com>, user@spark.apache.org > > How about using OR operator in filter? > > On Tue, 18 Apr 2017 at 12:35 am, nayan sharma <nayansharm...@gmail.com > <mailto:nayansharm...@gmail.com>> wrote: > Dataframe (df) having column msrid(String) having values > m_123,m_111,m_145,m_098,m_666 > > I wanted to filter out rows which are having values m_123,m_111,m_145 > > df.filter($"msrid".isin("m_123","m_111","m_145")).count > count =0 > while > df.filter($"msrid".isin("m_123")).count > count=121212 > I have tried using queries like > df.filter($"msrid" isin (List("m_123","m_111","m_145"):_*)) > count =0 > but > > df.filter($"msrid" isin (List("m_123"):_*)) > count=121212 > > Any suggestion will do a great help to me. > > Thanks, > Nayan > -- > Best Regards, > Ayan Guha
filter operation using isin
Dataframe (df) having column msrid(String) having values m_123,m_111,m_145,m_098,m_666 I wanted to filter out rows which are having values m_123,m_111,m_145 df.filter($"msrid".isin("m_123","m_111","m_145")).count count =0 while df.filter($"msrid".isin("m_123")).count count=121212 I have tried using queries like df.filter($"msrid" isin (List("m_123","m_111","m_145"):_*)) count =0 but df.filter($"msrid" isin (List("m_123"):_*)) count=121212 Any suggestion will do a great help to me. Best Regards, Nayan
isin query
Dataframe (df) having column msrid(String) having values m_123,m_111,m_145,m_098,m_666 I wanted to filter out rows which are having values m_123,m_111,m_145 df.filter($"msrid".isin("m_123","m_111","m_145")).count count =0 while df.filter($"msrid".isin("m_123")).count count=121212 I have tried using queries like df.filter($"msrid" isin (List("m_123","m_111","m_145"):_*)) count =0 but df.filter($"msrid" isin (List("m_123"):_*)) count=121212 Any suggestion will do a great help to me. Thanks, Nayan
Re: Error while reading the CSV
Hi Yash, I know this will work perfect but here I wanted to read the csv using the assembly jar file. Thanks, Nayan > On 07-Apr-2017, at 10:02 AM, Yash Sharma <yash...@gmail.com> wrote: > > Hi Nayan, > I use the --packages with the spark shell and the spark submit. Could you > please try that and let us know: > Command: > spark-submit --packages com.databricks:spark-csv_2.11:1.4.0 > > On Fri, 7 Apr 2017 at 00:39 nayan sharma <nayansharm...@gmail.com > <mailto:nayansharm...@gmail.com>> wrote: > spark version 1.6.2 > scala version 2.10.5 > >> On 06-Apr-2017, at 8:05 PM, Jörn Franke <jornfra...@gmail.com >> <mailto:jornfra...@gmail.com>> wrote: >> >> And which version does your Spark cluster use? >> >> On 6. Apr 2017, at 16:11, nayan sharma <nayansharm...@gmail.com >> <mailto:nayansharm...@gmail.com>> wrote: >> >>> scalaVersion := “2.10.5" >>> >>> >>> >>> >>>> On 06-Apr-2017, at 7:35 PM, Jörn Franke <jornfra...@gmail.com >>>> <mailto:jornfra...@gmail.com>> wrote: >>>> >>>> Maybe your Spark is based on scala 2.11, but you compile it for 2.10 or >>>> the other way around? >>>> >>>> On 6. Apr 2017, at 15:54, nayan sharma <nayansharm...@gmail.com >>>> <mailto:nayansharm...@gmail.com>> wrote: >>>> >>>>> In addition I am using spark version 1.6.2 >>>>> Is there any chance of error coming because of Scala version or >>>>> dependencies are not matching.?I just guessed. >>>>> >>>>> Thanks, >>>>> Nayan >>>>> >>>>> >>>>>> On 06-Apr-2017, at 7:16 PM, nayan sharma <nayansharm...@gmail.com >>>>>> <mailto:nayansharm...@gmail.com>> wrote: >>>>>> >>>>>> Hi Jorn, >>>>>> Thanks for replying. >>>>>> >>>>>> jar -tf catalyst-data-prepration-assembly-1.0.jar | grep csv >>>>>> >>>>>> after doing this I have found a lot of classes under >>>>>> com/databricks/spark/csv/ >>>>>> >>>>>> do I need to check for any specific class ?? >>>>>> >>>>>> Regards, >>>>>> Nayan >>>>>>> On 06-Apr-2017, at 6:42 PM, Jörn Franke <jornfra...@gmail.com >>>>>>> <mailto:jornfra...@gmail.com>> wrote: >>>>>>> >>>>>>> Is the library in your assembly jar? >>>>>>> >>>>>>> On 6. Apr 2017, at 15:06, nayan sharma <nayansharm...@gmail.com >>>>>>> <mailto:nayansharm...@gmail.com>> wrote: >>>>>>> >>>>>>>> Hi All, >>>>>>>> I am getting error while loading CSV file. >>>>>>>> >>>>>>>> val >>>>>>>> datacsv=sqlContext.read.format("com.databricks.spark.csv").option("header", >>>>>>>> "true").load("timeline.csv") >>>>>>>> java.lang.NoSuchMethodError: >>>>>>>> org.apache.commons.csv.CSVFormat.withQuote(Ljava/lang/Character;)Lorg/apache/commons/csv/CSVFormat; >>>>>>>> >>>>>>>> >>>>>>>> I have added the dependencies in sbt file >>>>>>>> // Spark Additional Library - CSV Read as DF >>>>>>>> libraryDependencies += "com.databricks" %% "spark-csv" % “1.5.0" >>>>>>>> and starting the spark-shell with command >>>>>>>> >>>>>>>> spark-shell --master yarn-client --jars >>>>>>>> /opt/packages/-data-prepration/target/scala-2.10/-data-prepration-assembly-1.0.jar >>>>>>>> --name nayan >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Thanks for any help!! >>>>>>>> >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Nayan >>>>>> >>>>> >>> >
Re: Error while reading the CSV
spark version 1.6.2 scala version 2.10.5 > On 06-Apr-2017, at 8:05 PM, Jörn Franke <jornfra...@gmail.com> wrote: > > And which version does your Spark cluster use? > > On 6. Apr 2017, at 16:11, nayan sharma <nayansharm...@gmail.com > <mailto:nayansharm...@gmail.com>> wrote: > >> scalaVersion := “2.10.5" >> >> >> >> >>> On 06-Apr-2017, at 7:35 PM, Jörn Franke <jornfra...@gmail.com >>> <mailto:jornfra...@gmail.com>> wrote: >>> >>> Maybe your Spark is based on scala 2.11, but you compile it for 2.10 or the >>> other way around? >>> >>> On 6. Apr 2017, at 15:54, nayan sharma <nayansharm...@gmail.com >>> <mailto:nayansharm...@gmail.com>> wrote: >>> >>>> In addition I am using spark version 1.6.2 >>>> Is there any chance of error coming because of Scala version or >>>> dependencies are not matching.?I just guessed. >>>> >>>> Thanks, >>>> Nayan >>>> >>>> >>>>> On 06-Apr-2017, at 7:16 PM, nayan sharma <nayansharm...@gmail.com >>>>> <mailto:nayansharm...@gmail.com>> wrote: >>>>> >>>>> Hi Jorn, >>>>> Thanks for replying. >>>>> >>>>> jar -tf catalyst-data-prepration-assembly-1.0.jar | grep csv >>>>> >>>>> after doing this I have found a lot of classes under >>>>> com/databricks/spark/csv/ >>>>> >>>>> do I need to check for any specific class ?? >>>>> >>>>> Regards, >>>>> Nayan >>>>>> On 06-Apr-2017, at 6:42 PM, Jörn Franke <jornfra...@gmail.com >>>>>> <mailto:jornfra...@gmail.com>> wrote: >>>>>> >>>>>> Is the library in your assembly jar? >>>>>> >>>>>> On 6. Apr 2017, at 15:06, nayan sharma <nayansharm...@gmail.com >>>>>> <mailto:nayansharm...@gmail.com>> wrote: >>>>>> >>>>>>> Hi All, >>>>>>> I am getting error while loading CSV file. >>>>>>> >>>>>>> val >>>>>>> datacsv=sqlContext.read.format("com.databricks.spark.csv").option("header", >>>>>>> "true").load("timeline.csv") >>>>>>> java.lang.NoSuchMethodError: >>>>>>> org.apache.commons.csv.CSVFormat.withQuote(Ljava/lang/Character;)Lorg/apache/commons/csv/CSVFormat; >>>>>>> >>>>>>> >>>>>>> I have added the dependencies in sbt file >>>>>>> // Spark Additional Library - CSV Read as DF >>>>>>> libraryDependencies += "com.databricks" %% "spark-csv" % “1.5.0" >>>>>>> and starting the spark-shell with command >>>>>>> >>>>>>> spark-shell --master yarn-client --jars >>>>>>> /opt/packages/-data-prepration/target/scala-2.10/-data-prepration-assembly-1.0.jar >>>>>>> --name nayan >>>>>>> >>>>>>> >>>>>>> >>>>>>> Thanks for any help!! >>>>>>> >>>>>>> >>>>>>> Thanks, >>>>>>> Nayan >>>>> >>>> >>
Re: Error while reading the CSV
scalaVersion := “2.10.5" > On 06-Apr-2017, at 7:35 PM, Jörn Franke <jornfra...@gmail.com> wrote: > > Maybe your Spark is based on scala 2.11, but you compile it for 2.10 or the > other way around? > > On 6. Apr 2017, at 15:54, nayan sharma <nayansharm...@gmail.com > <mailto:nayansharm...@gmail.com>> wrote: > >> In addition I am using spark version 1.6.2 >> Is there any chance of error coming because of Scala version or dependencies >> are not matching.?I just guessed. >> >> Thanks, >> Nayan >> >> >>> On 06-Apr-2017, at 7:16 PM, nayan sharma <nayansharm...@gmail.com >>> <mailto:nayansharm...@gmail.com>> wrote: >>> >>> Hi Jorn, >>> Thanks for replying. >>> >>> jar -tf catalyst-data-prepration-assembly-1.0.jar | grep csv >>> >>> after doing this I have found a lot of classes under >>> com/databricks/spark/csv/ >>> >>> do I need to check for any specific class ?? >>> >>> Regards, >>> Nayan >>>> On 06-Apr-2017, at 6:42 PM, Jörn Franke <jornfra...@gmail.com >>>> <mailto:jornfra...@gmail.com>> wrote: >>>> >>>> Is the library in your assembly jar? >>>> >>>> On 6. Apr 2017, at 15:06, nayan sharma <nayansharm...@gmail.com >>>> <mailto:nayansharm...@gmail.com>> wrote: >>>> >>>>> Hi All, >>>>> I am getting error while loading CSV file. >>>>> >>>>> val >>>>> datacsv=sqlContext.read.format("com.databricks.spark.csv").option("header", >>>>> "true").load("timeline.csv") >>>>> java.lang.NoSuchMethodError: >>>>> org.apache.commons.csv.CSVFormat.withQuote(Ljava/lang/Character;)Lorg/apache/commons/csv/CSVFormat; >>>>> >>>>> >>>>> I have added the dependencies in sbt file >>>>> // Spark Additional Library - CSV Read as DF >>>>> libraryDependencies += "com.databricks" %% "spark-csv" % “1.5.0" >>>>> and starting the spark-shell with command >>>>> >>>>> spark-shell --master yarn-client --jars >>>>> /opt/packages/-data-prepration/target/scala-2.10/-data-prepration-assembly-1.0.jar >>>>> --name nayan >>>>> >>>>> >>>>> >>>>> Thanks for any help!! >>>>> >>>>> >>>>> Thanks, >>>>> Nayan >>> >>
Re: Error while reading the CSV
In addition I am using spark version 1.6.2 Is there any chance of error coming because of Scala version or dependencies are not matching.?I just guessed. Thanks, Nayan > On 06-Apr-2017, at 7:16 PM, nayan sharma <nayansharm...@gmail.com> wrote: > > Hi Jorn, > Thanks for replying. > > jar -tf catalyst-data-prepration-assembly-1.0.jar | grep csv > > after doing this I have found a lot of classes under > com/databricks/spark/csv/ > > do I need to check for any specific class ?? > > Regards, > Nayan >> On 06-Apr-2017, at 6:42 PM, Jörn Franke <jornfra...@gmail.com >> <mailto:jornfra...@gmail.com>> wrote: >> >> Is the library in your assembly jar? >> >> On 6. Apr 2017, at 15:06, nayan sharma <nayansharm...@gmail.com >> <mailto:nayansharm...@gmail.com>> wrote: >> >>> Hi All, >>> I am getting error while loading CSV file. >>> >>> val >>> datacsv=sqlContext.read.format("com.databricks.spark.csv").option("header", >>> "true").load("timeline.csv") >>> java.lang.NoSuchMethodError: >>> org.apache.commons.csv.CSVFormat.withQuote(Ljava/lang/Character;)Lorg/apache/commons/csv/CSVFormat; >>> >>> >>> I have added the dependencies in sbt file >>> // Spark Additional Library - CSV Read as DF >>> libraryDependencies += "com.databricks" %% "spark-csv" % “1.5.0" >>> and starting the spark-shell with command >>> >>> spark-shell --master yarn-client --jars >>> /opt/packages/-data-prepration/target/scala-2.10/-data-prepration-assembly-1.0.jar >>> --name nayan >>> >>> >>> >>> Thanks for any help!! >>> >>> >>> Thanks, >>> Nayan >
Re: Error while reading the CSV
Hi Jorn, Thanks for replying. jar -tf catalyst-data-prepration-assembly-1.0.jar | grep csv after doing this I have found a lot of classes under com/databricks/spark/csv/ do I need to check for any specific class ?? Regards, Nayan > On 06-Apr-2017, at 6:42 PM, Jörn Franke <jornfra...@gmail.com> wrote: > > Is the library in your assembly jar? > > On 6. Apr 2017, at 15:06, nayan sharma <nayansharm...@gmail.com > <mailto:nayansharm...@gmail.com>> wrote: > >> Hi All, >> I am getting error while loading CSV file. >> >> val >> datacsv=sqlContext.read.format("com.databricks.spark.csv").option("header", >> "true").load("timeline.csv") >> java.lang.NoSuchMethodError: >> org.apache.commons.csv.CSVFormat.withQuote(Ljava/lang/Character;)Lorg/apache/commons/csv/CSVFormat; >> >> >> I have added the dependencies in sbt file >> // Spark Additional Library - CSV Read as DF >> libraryDependencies += "com.databricks" %% "spark-csv" % “1.5.0" >> and starting the spark-shell with command >> >> spark-shell --master yarn-client --jars >> /opt/packages/-data-prepration/target/scala-2.10/-data-prepration-assembly-1.0.jar >> --name nayan >> >> >> >> Thanks for any help!! >> >> >> Thanks, >> Nayan
Error while reading the CSV
Hi All, I am getting error while loading CSV file. val datacsv=sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("timeline.csv") java.lang.NoSuchMethodError: org.apache.commons.csv.CSVFormat.withQuote(Ljava/lang/Character;)Lorg/apache/commons/csv/CSVFormat; I have added the dependencies in sbt file // Spark Additional Library - CSV Read as DF libraryDependencies += "com.databricks" %% "spark-csv" % “1.5.0" and starting the spark-shell with command spark-shell --master yarn-client --jars /opt/packages/-data-prepration/target/scala-2.10/-data-prepration-assembly-1.0.jar --name nayan Thanks for any help!! Thanks, Nayan
skipping header in multiple files
Hi, I wanted to skip all the headers of CSVs present in a directory. After searching on Google I got to know that it can be done using sc.wholetextfiles. Can any one suggest me how to do that in Scala.? Thanks & Regards, Nayan Sharma - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Persist RDD doubt
In case of task failures,does spark clear the persisted RDD (StorageLevel.MEMORY_ONLY_SER) and recompute them again when the task is attempted to start from beginning. Or will the cached RDD be appended. How does spark checks whether the RDD has been cached and skips the caching step for a particular task. > On 23-Mar-2017, at 3:36 PM, Artur R <ar...@gpnxgroup.com> wrote: > > I am not pretty sure, but: > - if RDD persisted in memory then on task fail executor JVM process fails > too, so the memory is released > - if RDD persisted on disk then on task fail Spark shutdown hook just wipes > temp files > > On Thu, Mar 23, 2017 at 10:55 AM, Jörn Franke <jornfra...@gmail.com > <mailto:jornfra...@gmail.com>> wrote: > What do you mean by clear ? What is the use case? > > On 23 Mar 2017, at 10:16, nayan sharma <nayansharm...@gmail.com > <mailto:nayansharm...@gmail.com>> wrote: > >> Does Spark clears the persisted RDD in case if the task fails ? >> >> Regards, >> >> Nayan >
Persist RDD doubt
Does Spark clears the persisted RDD in case if the task fails ? Regards, Nayan