Re: Null pointer exception while replying WAL
Please find below code def main(args: Array[String]): Unit = { val config: Config = ConfigFactory.load() val streamC = StreamingContext.getOrCreate( checkpointDirectory, () => functionToCreateContext(config, checkpointDirectory) ) streamC.start() streamC.awaitTermination() } def functionToCreateContext(config: Config, checkpointDirectory: String): StreamingContext = { val brokerUrl = config.getString("streaming.solace.brokerURL") val username = config.getString("streaming.solace.userName") val passwordSol = config.getString("streaming.solace.password") val vpn = config.getString("streaming.solace.vpn") val queue = config.getString("streaming.solace.queueName") val connectionFactory = config.getString("streaming.solace.connectionFactory") val spark = SparkSession .builder() .appName("rem-Streaming-Consumer") .config("spark.streaming.receiver.writeAheadLog.enable", "true") .config("spark.streaming.blockInterval", blockInterval) .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.streaming.receiver.writeAheadLog.enable", "true") .enableHiveSupport .getOrCreate() val sc = spark.sparkContext val ssc = new StreamingContext(sc, Seconds(batchInterval)) ssc.checkpoint(checkpointDirectory) val converter: Message => Option[String] = { case msg: TextMessage => Some(msg.getText) case _ => None } val props = new Properties() props.setProperty( Context.INITIAL_CONTEXT_FACTORY, "com.solacesystems.jndi.SolJNDIInitialContextFactory" ) props.setProperty(Context.PROVIDER_URL, brokerUrl) props.setProperty(Context.SECURITY_PRINCIPAL, username) props.setProperty(Context.SECURITY_PRINCIPAL, passwordSol) props.setProperty(SupportedProperty.SOLACE_JMS_VPN, vpn) val msgs = JmsStreamUtils.createSynchronousJmsQueueStream( ssc, JndiMessageConsumerFactory(props,QueueJmsDestinationInfo(queue), connectionFactoryName = connectionFactory,messageSelector = ""),converter,1000, 1.second,10.second,StorageLevel.MEMORY_AND_DISK_SER_2 ) msgs.foreachRDD(rdd => if (rdd.take(1).length > 0) { val messages: RDD[String] = rdd.map { sr => if (sr == null) { println("NO records found") "NO records found" } else { println("Input Records from Solace queue : " + sr.toString) sr.toString } } Thread.sleep(12) try{ * val messagesJson = spark.read.json(messages) ===> getting NPE here after restarting using WAL* messagesJson.write.mode("append").parquet(data) } catch { case ex => ex.printStackTrace() } }) ssc } Thanks & Regards, Nayan Sharma *+91-8095382952* <https://www.linkedin.com/in/nayan-sharma> <http://stackoverflow.com/users/3687426/nayan-sharma?tab=profile> On Mon, Feb 12, 2024 at 4:23 AM Mich Talebzadeh wrote: > Hi, > > It is challenging to make a recommendation without further details. I am > guessing you are trying to build a fault-tolerant spark application (spark > structured streaming) that consumes messages from Solace? > To address *NullPointerException* in the context of the provided > information, you need to review the part of the code where the exception is > thrown and identifying which object or method call is resulting in *null* can > help the debugging process plus checking the logs. > > HTH > > Mich Talebzadeh, > Dad | Technologist | Solutions Architect | Engineer > London > United Kingdom > > >view my Linkedin profile > <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> > > > https://en.everybodywiki.com/Mich_Talebzadeh > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Sat, 10 Feb 2024 at 05:29, nayan sharma > wrote: > >> Hi Users, >> >> I am trying to build fault tolerant spark solace consumer. >> >> Issue :- we have to take restart of the job due to multiple issue load >> average is one of them. At that time whatever spark is processing or >> batches in the queue is lost. We can't replay it because we already had >> send ack while calling store(). >> >> Solution:- I have tried implementing WAL and checkpointing in the >> solution. Job is able to identify the lost batches, records are not being >> written in the log file but throwing NPE. >> >> We are creating sparkcontext using sc.getorcreate() >> >> >> Thanks, >> Nayan >> >
Null pointer exception while replying WAL
Hi Users, I am trying to build fault tolerant spark solace consumer. Issue :- we have to take restart of the job due to multiple issue load average is one of them. At that time whatever spark is processing or batches in the queue is lost. We can't replay it because we already had send ack while calling store(). Solution:- I have tried implementing WAL and checkpointing in the solution. Job is able to identify the lost batches, records are not being written in the log file but throwing NPE. We are creating sparkcontext using sc.getorcreate() Thanks, Nayan
Kafka Spark Structure Streaming Error
) at org.apache.spark.sql.execution.datasources.orc.OrcOutputWriter.close(OrcOutputWriter.scala:58) at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57) at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.abort(FileFormatDataWriter.scala:83) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$1.apply$mcV$sp(FileFormatWriter.scala:255) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1377) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:253) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:174) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:413) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1334) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:419) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Thanks & Regards, Nayan Sharma *+91-8095382952* <https://www.linkedin.com/in/nayan-sharma> <http://stackoverflow.com/users/3687426/nayan-sharma?tab=profile>
Re: Spark Druid Ingestion
Hey Jorge, Thanks for responding. Can you elaborate on the user permission part ? HDFS or local ? As of now, hdfs path -> hdfs://n2pl-pa-hdn220.xxx.xxx:8020/user/yarn/.sparkStaging/application_1521457397747_0013/__spark_libs__8247917347016008883.zip already has complete access for yarn user and my job is also running from the same user. Thanks, Nayan > On Mar 22, 2018, at 12:54 PM, Jorge Machado <jom...@me.com> wrote: > > Seems to me permissions problems ! Can you check your user / folder > permissions ? > > Jorge Machado > > > > > >> On 22 Mar 2018, at 08:21, nayan sharma <nayansharm...@gmail.com >> <mailto:nayansharm...@gmail.com>> wrote: >> >> Hi All, >> As druid uses Hadoop MapReduce to ingest batch data but I am trying spark >> for ingesting data into druid taking reference from >> https://github.com/metamx/druid-spark-batch >> <https://github.com/metamx/druid-spark-batch> >> But we are stuck at the following error. >> Application Log:—> >> 2018-03-20T07:54:28,782 INFO [task-runner-0-priority-0] >> org.apache.spark.deploy.yarn.Client - Will allocate AM container, with 896 >> MB memory including 384 MB overhead >> 2018-03-20T07:54:28,782 INFO [task-runner-0-priority-0] >> org.apache.spark.deploy.yarn.Client - Setting up container launch context >> for our AM >> 2018-03-20T07:54:28,785 INFO [task-runner-0-priority-0] >> org.apache.spark.deploy.yarn.Client - Setting up the launch environment for >> our AM container >> 2018-03-20T07:54:28,793 INFO [task-runner-0-priority-0] >> org.apache.spark.deploy.yarn.Client - Preparing resources for our AM >> container >> 2018-03-20T07:54:29,364 WARN [task-runner-0-priority-0] >> org.apache.spark.deploy.yarn.Client - Neither spark.yarn.jars nor >> spark.yarn.archive is set, falling back to uploading libraries under >> SPARK_HOME. >> 2018-03-20T07:54:29,371 INFO [task-runner-0-priority-0] >> org.apache.spark.deploy.yarn.Client - Uploading resource >> file:/hdfs1/druid-0.11.0/var/tmp/spark-49af67df-1a21-4790-a02b-c737c7a44946/__spark_libs__8247917347016008883.zip >> -> >> hdfs://n2pl-pa-hdn220.xxx.xxx:8020/user/yarn/.sparkStaging/application_1521457397747_0013/__spark_libs__8247917347016008883.zip >> >> >> 2018-03-20T07:54:29,607 INFO [task-runner-0-priority-0] >> org.apache.spark.deploy.yarn.Client - Uploading resource >> file:/hdfs1/druid-0.11.0/var/tmp/spark-49af67df-1a21-4790-a02b-c737c7a44946/__spark_conf__2240950972346324291.zip >> -> >> hdfs://n2pl-pa-hdn220.xxx.xxx:8020/user/yarn/.sparkStaging/application_1521457397747_0013/__spark_conf__.zip >> >> >> 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] >> org.apache.spark.SecurityManager - Changing view acls to: yarn >> 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] >> org.apache.spark.SecurityManager - Changing modify acls to: yarn >> 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] >> org.apache.spark.SecurityManager - Changing view acls groups to: >> 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] >> org.apache.spark.SecurityManager - Changing modify acls groups to: >> 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] >> org.apache.spark.SecurityManager - SecurityManager: authentication disabled; >> ui acls disabled; users with view permissions: Set(yarn); groups with view >> permissions: Set(); users with modify permissions: Set(yarn); groups with >> modify permissions: Set() >> 2018-03-20T07:54:29,679 INFO [task-runner-0-priority-0] >> org.apache.spark.deploy.yarn.Client - Submitting application >> application_1521457397747_0013 to ResourceManager >> 2018-03-20T07:54:29,709 INFO [task-runner-0-priority-0] >> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted >> application application_1521457397747_0013 >> 2018-03-20T07:54:29,713 INFO [task-runner-0-priority-0] >> org.apache.spark.scheduler.cluster.SchedulerExtensionServices - Starting >> Yarn extension services with app application_1521457397747_0013 and >> attemptId None >> 2018-03-20T07:54:30,722 INFO [task-runner-0-priority-0] >> org.apache.spark.deploy.yarn.Client - Application report for >> application_1521457397747_0013 (state: FAILED) >> 2018-03-20T07:54:30,729 INFO [task-runner-0-priority-0] >> org.apache.spark.deploy.yarn.Client - >> client token: N/A >> diagnostics: Application application_1521457397747_0013 failed 2 times >> due to AM Container for appattempt_1521457397747_0013_02 exited with >> exitCode: -10
Spark Druid Ingestion
Hi All,As druid uses Hadoop MapReduce to ingest batch data but I am trying spark for ingesting data into druid taking reference from https://github.com/metamx/druid-spark-batchBut we are stuck at the following error.Application Log:—>2018-03-20T07:54:28,782 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Will allocate AM container, with 896 MB memory including 384 MB overhead2018-03-20T07:54:28,782 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Setting up container launch context for our AM 2018-03-20T07:54:28,785 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Setting up the launch environment for our AM container 2018-03-20T07:54:28,793 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Preparing resources for our AM container 2018-03-20T07:54:29,364 WARN [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME. 2018-03-20T07:54:29,371 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Uploading resource file:/hdfs1/druid-0.11.0/var/tmp/spark-49af67df-1a21-4790-a02b-c737c7a44946/__spark_libs__8247917347016008883.zip -> hdfs://n2pl-pa-hdn220.xxx.xxx:8020/user/yarn/.sparkStaging/application_1521457397747_0013/__spark_libs__8247917347016008883.zip 2018-03-20T07:54:29,607 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Uploading resource file:/hdfs1/druid-0.11.0/var/tmp/spark-49af67df-1a21-4790-a02b-c737c7a44946/__spark_conf__2240950972346324291.zip -> hdfs://n2pl-pa-hdn220.xxx.xxx:8020/user/yarn/.sparkStaging/application_1521457397747_0013/__spark_conf__.zip 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] org.apache.spark.SecurityManager - Changing view acls to: yarn 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] org.apache.spark.SecurityManager - Changing modify acls to: yarn 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] org.apache.spark.SecurityManager - Changing view acls groups to: 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] org.apache.spark.SecurityManager - Changing modify acls groups to: 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] org.apache.spark.SecurityManager - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(yarn); groups with view permissions: Set(); users with modify permissions: Set(yarn); groups with modify permissions: Set() 2018-03-20T07:54:29,679 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Submitting application application_1521457397747_0013 to ResourceManager 2018-03-20T07:54:29,709 INFO [task-runner-0-priority-0] org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1521457397747_0013 2018-03-20T07:54:29,713 INFO [task-runner-0-priority-0] org.apache.spark.scheduler.cluster.SchedulerExtensionServices - Starting Yarn extension services with app application_1521457397747_0013 and attemptId None 2018-03-20T07:54:30,722 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Application report for application_1521457397747_0013 (state: FAILED) 2018-03-20T07:54:30,729 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - client token: N/A diagnostics: Application application_1521457397747_0013 failed 2 times due to AM Container for appattempt_1521457397747_0013_02 exited with exitCode: -1000 For more detailed output, check the application tracking page: http://n-pa-hdn220.xxx.:8088/cluster/app/application_1521457397747_0013 Then click on links to logs of each attempt. Diagnostics: No such file or directory ENOENT: No such file or directory at org.apache.hadoop.io.nativeio.NativeIO$POSIX.chmodImpl(Native Method) at org.apache.hadoop.io.nativeio.NativeIO$POSIX.chmod(NativeIO.java:230) at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:756) at org.apache.hadoop.fs.DelegateToFileSystem.setPermission(DelegateToFileSystem.java:211) at org.apache.hadoop.fs.FilterFs.setPermission(FilterFs.java:252) at org.apache.hadoop.fs.FileContext$11.next(FileContext.java:1003) at org.apache.hadoop.fs.FileContext$11.next(FileContext.java:999) at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90) at org.apache.hadoop.fs.FileContext.setPermission(FileContext.java:1006) at org.apache.hadoop.yarn.util.FSDownload$3.run(FSDownload.java:421) at org.apache.hadoop.yarn.util.FSDownload$3.run(FSDownload.java:419) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866) at org.apache.hadoop.yarn.util.FSDownload.changePermissions(FSDownload.java:419) at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:365) at
Re: splitting columns into new columns
Hi Pralabh, Thanks for your help. val xx = columnList.map(x => x->0).toMap val opMap = dataFrame.rdd.flatMap { row => columnList.foldLeft(xx) { case (y, col) => val s = row.getAs[String](col).split("\\^").length if (y(col) < s) y.updated(col, s) else y }.toList } val colMaxSizeMap = opMap.groupBy(x => x._1).map(x => x._2.toList.maxBy(x => x._2)).collect().toMap val x = dataFrame.rdd.map{x => val op = columnList.flatMap{ y => val op = x.getAs[String](y).split("\\^") op++List.fill(colMaxSizeMap(y)-op.size)("") } Row.fromSeq(op) } val structFieldList = columnList.flatMap{colName => List.range(0,colMaxSizeMap(colName),1).map{ i => StructField(s"$colName"+s"$i",StringType) } } val schema = StructType(structFieldList) val data1=spark.createDataFrame(x,schema) opMap res13: org.apache.spark.rdd.RDD[(String, Int)] But It is failing when opMap has null value.It is throwing java.lang.NullPointerException trying to figure out. val opMap1=opMap.filter(_._2 !="") tried doing this but it is also failing with same exception. Thanks, Nayan > On 17-Jul-2017, at 4:54 PM, Pralabh Kumar <pralabhku...@gmail.com> wrote: > > Hi Nayan > > Please find the solution of your problem which work on spark 2. > > val spark = > SparkSession.builder().appName("practice").enableHiveSupport().getOrCreate() > val sc = spark.sparkContext > val sqlContext = spark.sqlContext > import spark.implicits._ > val dataFrame = > sc.parallelize(List("ERN~58XX7~^EPN~5X551~|C~MXXX~MSO~^CAxxE~~3XXX5")) > .map(s=>s.split("\\|")).map(s=>(s(0),s(1))) > .toDF("phone","contact") > dataFrame.show() > val newDataSet= dataFrame.rdd.map(data=>{ > val t1 = ArrayBuffer[String] () > for (i <- 0.to <http://0.to/>(1)) { > val col = data.get(i).asInstanceOf[String] > val dd= col.split("\\^").toSeq > for(col<-dd){ > t1 +=(col) > } > } > Row.fromSeq(t1.seq) > }) > > val firtRow = dataFrame.select("*").take(1)(0) > dataFrame.schema.fieldNames > var schema ="" > > for ((colNames,idx) <- dataFrame.schema.fieldNames.zipWithIndex.view) { > val data = firtRow(idx).asInstanceOf[String].split("\\^") > var j = 0 > for(d<-data){ > schema = schema + colNames + j + "," > j = j+1 > } > } > schema=schema.substring(0,schema.length-1) > val sqlSchema = > StructType(schema.split(",").map(s=>StructField(s,StringType,false))) > sqlContext.createDataFrame(newDataSet,sqlSchema).show() > > Regards > Pralabh Kumar > > > On Mon, Jul 17, 2017 at 1:55 PM, nayan sharma <nayansharm...@gmail.com > <mailto:nayansharm...@gmail.com>> wrote: > If I have 2-3 values in a column then I can easily separate it and create new > columns with withColumn option. > but I am trying to achieve it in loop and dynamically generate the new > columns as many times the ^ has occurred in column values > > Can it be achieve in this way. > >> On 17-Jul-2017, at 3:29 AM, ayan guha <guha.a...@gmail.com >> <mailto:guha.a...@gmail.com>> wrote: >> >> You are looking for explode function. >> >> On Mon, 17 Jul 2017 at 4:25 am, nayan sharma <nayansharm...@gmail.com >> <mailto:nayansharm...@gmail.com>> wrote: >> I’ve a Dataframe where in some columns there are multiple values, always >> separated by ^ >> >> phone|contact| >> ERN~58XX7~^EPN~5X551~|C~MXXX~MSO~^CAxxE~~3XXX5| >> >> phone1|phone2|contact1|contact2| >> ERN~5XXX7|EPN~5891551~|C~MXXXH~MSO~|CAxxE~~3XXX5| >> How can this be achieved using loop as the separator between column values >> are not constant. >> >> data.withColumn("phone",split($"phone","\\^")).select($"phone".getItem(0).as("phone1"),$"phone".getItem(1).as("phone2”)) >> I though of doing this way but the problem is column are having 100+ >> separator between the column values >> >> >> >> Thank you, >> Nayan >> -- >> Best Regards, >> Ayan Guha > >
Re: splitting columns into new columns
If I have 2-3 values in a column then I can easily separate it and create new columns with withColumn option. but I am trying to achieve it in loop and dynamically generate the new columns as many times the ^ has occurred in column values Can it be achieve in this way. > On 17-Jul-2017, at 3:29 AM, ayan guha <guha.a...@gmail.com> wrote: > > You are looking for explode function. > > On Mon, 17 Jul 2017 at 4:25 am, nayan sharma <nayansharm...@gmail.com > <mailto:nayansharm...@gmail.com>> wrote: > I’ve a Dataframe where in some columns there are multiple values, always > separated by ^ > > phone|contact| > ERN~58XX7~^EPN~5X551~|C~MXXX~MSO~^CAxxE~~3XXX5| > > phone1|phone2|contact1|contact2| > ERN~5XXX7|EPN~5891551~|C~MXXXH~MSO~|CAxxE~~3XXX5| > How can this be achieved using loop as the separator between column values > are not constant. > > data.withColumn("phone",split($"phone","\\^")).select($"phone".getItem(0).as("phone1"),$"phone".getItem(1).as("phone2”)) > I though of doing this way but the problem is column are having 100+ > separator between the column values > > > > Thank you, > Nayan > -- > Best Regards, > Ayan Guha
splitting columns into new columns
I’ve a Dataframe where in some columns there are multiple values, always separated by ^ phone|contact| ERN~58XX7~^EPN~5X551~|C~MXXX~MSO~^CAxxE~~3XXX5| phone1|phone2|contact1|contact2| ERN~5XXX7|EPN~5891551~|C~MXXXH~MSO~|CAxxE~~3XXX5| How can this be achieved using loop as the separator between column values are not constant. data.withColumn("phone",split($"phone","\\^")).select($"phone".getItem(0).as("phone1"),$"phone".getItem(1).as("phone2”)) I though of doing this way but the problem is column are having 100+ separator between the column values Thank you, Nayan
ElasticSearch Spark error
Hi All, ERROR:- Caused by: org.apache.spark.util.TaskCompletionListenerException: Connection error (check network and/or proxy settings)- all nodes failed; tried [[10.0.1.8*:9200, 10.0.1.**:9200, 10.0.1.***:9200]] I am getting this error while trying to show the dataframe. df.count =5190767 and df.printSchema both are working fine. It has 329 columns. Do any one have any idea regarding this.Please help me to fix this. Thanks, Nayan
Test
Test - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Fwd: isin query
Thanks for responding. df.filter($”msrid”===“m_123” || $”msrid”===“m_111”) there are lots of workaround to my question but Can you let know whats wrong with the “isin” query. Regards, Nayan > Begin forwarded message: > > From: ayan guha <guha.a...@gmail.com> > Subject: Re: isin query > Date: 17 April 2017 at 8:13:24 PM IST > To: nayan sharma <nayansharm...@gmail.com>, user@spark.apache.org > > How about using OR operator in filter? > > On Tue, 18 Apr 2017 at 12:35 am, nayan sharma <nayansharm...@gmail.com > <mailto:nayansharm...@gmail.com>> wrote: > Dataframe (df) having column msrid(String) having values > m_123,m_111,m_145,m_098,m_666 > > I wanted to filter out rows which are having values m_123,m_111,m_145 > > df.filter($"msrid".isin("m_123","m_111","m_145")).count > count =0 > while > df.filter($"msrid".isin("m_123")).count > count=121212 > I have tried using queries like > df.filter($"msrid" isin (List("m_123","m_111","m_145"):_*)) > count =0 > but > > df.filter($"msrid" isin (List("m_123"):_*)) > count=121212 > > Any suggestion will do a great help to me. > > Thanks, > Nayan > -- > Best Regards, > Ayan Guha
filter operation using isin
Dataframe (df) having column msrid(String) having values m_123,m_111,m_145,m_098,m_666 I wanted to filter out rows which are having values m_123,m_111,m_145 df.filter($"msrid".isin("m_123","m_111","m_145")).count count =0 while df.filter($"msrid".isin("m_123")).count count=121212 I have tried using queries like df.filter($"msrid" isin (List("m_123","m_111","m_145"):_*)) count =0 but df.filter($"msrid" isin (List("m_123"):_*)) count=121212 Any suggestion will do a great help to me. Best Regards, Nayan
isin query
Dataframe (df) having column msrid(String) having values m_123,m_111,m_145,m_098,m_666 I wanted to filter out rows which are having values m_123,m_111,m_145 df.filter($"msrid".isin("m_123","m_111","m_145")).count count =0 while df.filter($"msrid".isin("m_123")).count count=121212 I have tried using queries like df.filter($"msrid" isin (List("m_123","m_111","m_145"):_*)) count =0 but df.filter($"msrid" isin (List("m_123"):_*)) count=121212 Any suggestion will do a great help to me. Thanks, Nayan
Re: Error while reading the CSV
Hi Yash, I know this will work perfect but here I wanted to read the csv using the assembly jar file. Thanks, Nayan > On 07-Apr-2017, at 10:02 AM, Yash Sharma <yash...@gmail.com> wrote: > > Hi Nayan, > I use the --packages with the spark shell and the spark submit. Could you > please try that and let us know: > Command: > spark-submit --packages com.databricks:spark-csv_2.11:1.4.0 > > On Fri, 7 Apr 2017 at 00:39 nayan sharma <nayansharm...@gmail.com > <mailto:nayansharm...@gmail.com>> wrote: > spark version 1.6.2 > scala version 2.10.5 > >> On 06-Apr-2017, at 8:05 PM, Jörn Franke <jornfra...@gmail.com >> <mailto:jornfra...@gmail.com>> wrote: >> >> And which version does your Spark cluster use? >> >> On 6. Apr 2017, at 16:11, nayan sharma <nayansharm...@gmail.com >> <mailto:nayansharm...@gmail.com>> wrote: >> >>> scalaVersion := “2.10.5" >>> >>> >>> >>> >>>> On 06-Apr-2017, at 7:35 PM, Jörn Franke <jornfra...@gmail.com >>>> <mailto:jornfra...@gmail.com>> wrote: >>>> >>>> Maybe your Spark is based on scala 2.11, but you compile it for 2.10 or >>>> the other way around? >>>> >>>> On 6. Apr 2017, at 15:54, nayan sharma <nayansharm...@gmail.com >>>> <mailto:nayansharm...@gmail.com>> wrote: >>>> >>>>> In addition I am using spark version 1.6.2 >>>>> Is there any chance of error coming because of Scala version or >>>>> dependencies are not matching.?I just guessed. >>>>> >>>>> Thanks, >>>>> Nayan >>>>> >>>>> >>>>>> On 06-Apr-2017, at 7:16 PM, nayan sharma <nayansharm...@gmail.com >>>>>> <mailto:nayansharm...@gmail.com>> wrote: >>>>>> >>>>>> Hi Jorn, >>>>>> Thanks for replying. >>>>>> >>>>>> jar -tf catalyst-data-prepration-assembly-1.0.jar | grep csv >>>>>> >>>>>> after doing this I have found a lot of classes under >>>>>> com/databricks/spark/csv/ >>>>>> >>>>>> do I need to check for any specific class ?? >>>>>> >>>>>> Regards, >>>>>> Nayan >>>>>>> On 06-Apr-2017, at 6:42 PM, Jörn Franke <jornfra...@gmail.com >>>>>>> <mailto:jornfra...@gmail.com>> wrote: >>>>>>> >>>>>>> Is the library in your assembly jar? >>>>>>> >>>>>>> On 6. Apr 2017, at 15:06, nayan sharma <nayansharm...@gmail.com >>>>>>> <mailto:nayansharm...@gmail.com>> wrote: >>>>>>> >>>>>>>> Hi All, >>>>>>>> I am getting error while loading CSV file. >>>>>>>> >>>>>>>> val >>>>>>>> datacsv=sqlContext.read.format("com.databricks.spark.csv").option("header", >>>>>>>> "true").load("timeline.csv") >>>>>>>> java.lang.NoSuchMethodError: >>>>>>>> org.apache.commons.csv.CSVFormat.withQuote(Ljava/lang/Character;)Lorg/apache/commons/csv/CSVFormat; >>>>>>>> >>>>>>>> >>>>>>>> I have added the dependencies in sbt file >>>>>>>> // Spark Additional Library - CSV Read as DF >>>>>>>> libraryDependencies += "com.databricks" %% "spark-csv" % “1.5.0" >>>>>>>> and starting the spark-shell with command >>>>>>>> >>>>>>>> spark-shell --master yarn-client --jars >>>>>>>> /opt/packages/-data-prepration/target/scala-2.10/-data-prepration-assembly-1.0.jar >>>>>>>> --name nayan >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Thanks for any help!! >>>>>>>> >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Nayan >>>>>> >>>>> >>> >
Re: Error while reading the CSV
spark version 1.6.2 scala version 2.10.5 > On 06-Apr-2017, at 8:05 PM, Jörn Franke <jornfra...@gmail.com> wrote: > > And which version does your Spark cluster use? > > On 6. Apr 2017, at 16:11, nayan sharma <nayansharm...@gmail.com > <mailto:nayansharm...@gmail.com>> wrote: > >> scalaVersion := “2.10.5" >> >> >> >> >>> On 06-Apr-2017, at 7:35 PM, Jörn Franke <jornfra...@gmail.com >>> <mailto:jornfra...@gmail.com>> wrote: >>> >>> Maybe your Spark is based on scala 2.11, but you compile it for 2.10 or the >>> other way around? >>> >>> On 6. Apr 2017, at 15:54, nayan sharma <nayansharm...@gmail.com >>> <mailto:nayansharm...@gmail.com>> wrote: >>> >>>> In addition I am using spark version 1.6.2 >>>> Is there any chance of error coming because of Scala version or >>>> dependencies are not matching.?I just guessed. >>>> >>>> Thanks, >>>> Nayan >>>> >>>> >>>>> On 06-Apr-2017, at 7:16 PM, nayan sharma <nayansharm...@gmail.com >>>>> <mailto:nayansharm...@gmail.com>> wrote: >>>>> >>>>> Hi Jorn, >>>>> Thanks for replying. >>>>> >>>>> jar -tf catalyst-data-prepration-assembly-1.0.jar | grep csv >>>>> >>>>> after doing this I have found a lot of classes under >>>>> com/databricks/spark/csv/ >>>>> >>>>> do I need to check for any specific class ?? >>>>> >>>>> Regards, >>>>> Nayan >>>>>> On 06-Apr-2017, at 6:42 PM, Jörn Franke <jornfra...@gmail.com >>>>>> <mailto:jornfra...@gmail.com>> wrote: >>>>>> >>>>>> Is the library in your assembly jar? >>>>>> >>>>>> On 6. Apr 2017, at 15:06, nayan sharma <nayansharm...@gmail.com >>>>>> <mailto:nayansharm...@gmail.com>> wrote: >>>>>> >>>>>>> Hi All, >>>>>>> I am getting error while loading CSV file. >>>>>>> >>>>>>> val >>>>>>> datacsv=sqlContext.read.format("com.databricks.spark.csv").option("header", >>>>>>> "true").load("timeline.csv") >>>>>>> java.lang.NoSuchMethodError: >>>>>>> org.apache.commons.csv.CSVFormat.withQuote(Ljava/lang/Character;)Lorg/apache/commons/csv/CSVFormat; >>>>>>> >>>>>>> >>>>>>> I have added the dependencies in sbt file >>>>>>> // Spark Additional Library - CSV Read as DF >>>>>>> libraryDependencies += "com.databricks" %% "spark-csv" % “1.5.0" >>>>>>> and starting the spark-shell with command >>>>>>> >>>>>>> spark-shell --master yarn-client --jars >>>>>>> /opt/packages/-data-prepration/target/scala-2.10/-data-prepration-assembly-1.0.jar >>>>>>> --name nayan >>>>>>> >>>>>>> >>>>>>> >>>>>>> Thanks for any help!! >>>>>>> >>>>>>> >>>>>>> Thanks, >>>>>>> Nayan >>>>> >>>> >>
Re: Error while reading the CSV
scalaVersion := “2.10.5" > On 06-Apr-2017, at 7:35 PM, Jörn Franke <jornfra...@gmail.com> wrote: > > Maybe your Spark is based on scala 2.11, but you compile it for 2.10 or the > other way around? > > On 6. Apr 2017, at 15:54, nayan sharma <nayansharm...@gmail.com > <mailto:nayansharm...@gmail.com>> wrote: > >> In addition I am using spark version 1.6.2 >> Is there any chance of error coming because of Scala version or dependencies >> are not matching.?I just guessed. >> >> Thanks, >> Nayan >> >> >>> On 06-Apr-2017, at 7:16 PM, nayan sharma <nayansharm...@gmail.com >>> <mailto:nayansharm...@gmail.com>> wrote: >>> >>> Hi Jorn, >>> Thanks for replying. >>> >>> jar -tf catalyst-data-prepration-assembly-1.0.jar | grep csv >>> >>> after doing this I have found a lot of classes under >>> com/databricks/spark/csv/ >>> >>> do I need to check for any specific class ?? >>> >>> Regards, >>> Nayan >>>> On 06-Apr-2017, at 6:42 PM, Jörn Franke <jornfra...@gmail.com >>>> <mailto:jornfra...@gmail.com>> wrote: >>>> >>>> Is the library in your assembly jar? >>>> >>>> On 6. Apr 2017, at 15:06, nayan sharma <nayansharm...@gmail.com >>>> <mailto:nayansharm...@gmail.com>> wrote: >>>> >>>>> Hi All, >>>>> I am getting error while loading CSV file. >>>>> >>>>> val >>>>> datacsv=sqlContext.read.format("com.databricks.spark.csv").option("header", >>>>> "true").load("timeline.csv") >>>>> java.lang.NoSuchMethodError: >>>>> org.apache.commons.csv.CSVFormat.withQuote(Ljava/lang/Character;)Lorg/apache/commons/csv/CSVFormat; >>>>> >>>>> >>>>> I have added the dependencies in sbt file >>>>> // Spark Additional Library - CSV Read as DF >>>>> libraryDependencies += "com.databricks" %% "spark-csv" % “1.5.0" >>>>> and starting the spark-shell with command >>>>> >>>>> spark-shell --master yarn-client --jars >>>>> /opt/packages/-data-prepration/target/scala-2.10/-data-prepration-assembly-1.0.jar >>>>> --name nayan >>>>> >>>>> >>>>> >>>>> Thanks for any help!! >>>>> >>>>> >>>>> Thanks, >>>>> Nayan >>> >>
Re: Error while reading the CSV
In addition I am using spark version 1.6.2 Is there any chance of error coming because of Scala version or dependencies are not matching.?I just guessed. Thanks, Nayan > On 06-Apr-2017, at 7:16 PM, nayan sharma <nayansharm...@gmail.com> wrote: > > Hi Jorn, > Thanks for replying. > > jar -tf catalyst-data-prepration-assembly-1.0.jar | grep csv > > after doing this I have found a lot of classes under > com/databricks/spark/csv/ > > do I need to check for any specific class ?? > > Regards, > Nayan >> On 06-Apr-2017, at 6:42 PM, Jörn Franke <jornfra...@gmail.com >> <mailto:jornfra...@gmail.com>> wrote: >> >> Is the library in your assembly jar? >> >> On 6. Apr 2017, at 15:06, nayan sharma <nayansharm...@gmail.com >> <mailto:nayansharm...@gmail.com>> wrote: >> >>> Hi All, >>> I am getting error while loading CSV file. >>> >>> val >>> datacsv=sqlContext.read.format("com.databricks.spark.csv").option("header", >>> "true").load("timeline.csv") >>> java.lang.NoSuchMethodError: >>> org.apache.commons.csv.CSVFormat.withQuote(Ljava/lang/Character;)Lorg/apache/commons/csv/CSVFormat; >>> >>> >>> I have added the dependencies in sbt file >>> // Spark Additional Library - CSV Read as DF >>> libraryDependencies += "com.databricks" %% "spark-csv" % “1.5.0" >>> and starting the spark-shell with command >>> >>> spark-shell --master yarn-client --jars >>> /opt/packages/-data-prepration/target/scala-2.10/-data-prepration-assembly-1.0.jar >>> --name nayan >>> >>> >>> >>> Thanks for any help!! >>> >>> >>> Thanks, >>> Nayan >
Re: Error while reading the CSV
Hi Jorn, Thanks for replying. jar -tf catalyst-data-prepration-assembly-1.0.jar | grep csv after doing this I have found a lot of classes under com/databricks/spark/csv/ do I need to check for any specific class ?? Regards, Nayan > On 06-Apr-2017, at 6:42 PM, Jörn Franke <jornfra...@gmail.com> wrote: > > Is the library in your assembly jar? > > On 6. Apr 2017, at 15:06, nayan sharma <nayansharm...@gmail.com > <mailto:nayansharm...@gmail.com>> wrote: > >> Hi All, >> I am getting error while loading CSV file. >> >> val >> datacsv=sqlContext.read.format("com.databricks.spark.csv").option("header", >> "true").load("timeline.csv") >> java.lang.NoSuchMethodError: >> org.apache.commons.csv.CSVFormat.withQuote(Ljava/lang/Character;)Lorg/apache/commons/csv/CSVFormat; >> >> >> I have added the dependencies in sbt file >> // Spark Additional Library - CSV Read as DF >> libraryDependencies += "com.databricks" %% "spark-csv" % “1.5.0" >> and starting the spark-shell with command >> >> spark-shell --master yarn-client --jars >> /opt/packages/-data-prepration/target/scala-2.10/-data-prepration-assembly-1.0.jar >> --name nayan >> >> >> >> Thanks for any help!! >> >> >> Thanks, >> Nayan
Error while reading the CSV
Hi All, I am getting error while loading CSV file. val datacsv=sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("timeline.csv") java.lang.NoSuchMethodError: org.apache.commons.csv.CSVFormat.withQuote(Ljava/lang/Character;)Lorg/apache/commons/csv/CSVFormat; I have added the dependencies in sbt file // Spark Additional Library - CSV Read as DF libraryDependencies += "com.databricks" %% "spark-csv" % “1.5.0" and starting the spark-shell with command spark-shell --master yarn-client --jars /opt/packages/-data-prepration/target/scala-2.10/-data-prepration-assembly-1.0.jar --name nayan Thanks for any help!! Thanks, Nayan
skipping header in multiple files
Hi, I wanted to skip all the headers of CSVs present in a directory. After searching on Google I got to know that it can be done using sc.wholetextfiles. Can any one suggest me how to do that in Scala.? Thanks & Regards, Nayan Sharma - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Persist RDD doubt
In case of task failures,does spark clear the persisted RDD (StorageLevel.MEMORY_ONLY_SER) and recompute them again when the task is attempted to start from beginning. Or will the cached RDD be appended. How does spark checks whether the RDD has been cached and skips the caching step for a particular task. > On 23-Mar-2017, at 3:36 PM, Artur R <ar...@gpnxgroup.com> wrote: > > I am not pretty sure, but: > - if RDD persisted in memory then on task fail executor JVM process fails > too, so the memory is released > - if RDD persisted on disk then on task fail Spark shutdown hook just wipes > temp files > > On Thu, Mar 23, 2017 at 10:55 AM, Jörn Franke <jornfra...@gmail.com > <mailto:jornfra...@gmail.com>> wrote: > What do you mean by clear ? What is the use case? > > On 23 Mar 2017, at 10:16, nayan sharma <nayansharm...@gmail.com > <mailto:nayansharm...@gmail.com>> wrote: > >> Does Spark clears the persisted RDD in case if the task fails ? >> >> Regards, >> >> Nayan >
Persist RDD doubt
Does Spark clears the persisted RDD in case if the task fails ? Regards, Nayan