Re: Null pointer exception while replying WAL

2024-02-12 Thread nayan sharma
Please find below code

 def main(args: Array[String]): Unit = {
val config: Config = ConfigFactory.load()
val streamC = StreamingContext.getOrCreate(
  checkpointDirectory,
  () => functionToCreateContext(config, checkpointDirectory)
)

streamC.start()
streamC.awaitTermination()
  }

  def functionToCreateContext(config: Config, checkpointDirectory: String):
StreamingContext = {

val brokerUrl = config.getString("streaming.solace.brokerURL")
val username = config.getString("streaming.solace.userName")
val passwordSol = config.getString("streaming.solace.password")
val vpn = config.getString("streaming.solace.vpn")
val queue = config.getString("streaming.solace.queueName")
val connectionFactory =
config.getString("streaming.solace.connectionFactory")



val spark = SparkSession
  .builder()
  .appName("rem-Streaming-Consumer")
  .config("spark.streaming.receiver.writeAheadLog.enable", "true")
  .config("spark.streaming.blockInterval", blockInterval)
  .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
  .config("spark.streaming.receiver.writeAheadLog.enable", "true")
   .enableHiveSupport
  .getOrCreate()
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(batchInterval))
ssc.checkpoint(checkpointDirectory)

val converter: Message => Option[String] = {
  case msg: TextMessage =>
Some(msg.getText)
  case _ =>
None
}

val props = new Properties()
props.setProperty(
  Context.INITIAL_CONTEXT_FACTORY,
  "com.solacesystems.jndi.SolJNDIInitialContextFactory"
)
props.setProperty(Context.PROVIDER_URL, brokerUrl)
props.setProperty(Context.SECURITY_PRINCIPAL, username)
props.setProperty(Context.SECURITY_PRINCIPAL, passwordSol)
props.setProperty(SupportedProperty.SOLACE_JMS_VPN, vpn)

val msgs = JmsStreamUtils.createSynchronousJmsQueueStream(
  ssc,

JndiMessageConsumerFactory(props,QueueJmsDestinationInfo(queue),
connectionFactoryName
= connectionFactory,messageSelector =
""),converter,1000, 1.second,10.second,StorageLevel.MEMORY_AND_DISK_SER_2 )

msgs.foreachRDD(rdd =>
  if (rdd.take(1).length > 0) {
val messages: RDD[String] = rdd.map { sr =>
  if (sr == null) {
println("NO records found")
"NO records found"
  } else {
println("Input Records from Solace queue : " + sr.toString)
sr.toString
  }
}
Thread.sleep(12)
try{
  * val messagesJson = spark.read.json(messages) ===> getting NPE here
after restarting using WAL*
  messagesJson.write.mode("append").parquet(data)
}
    catch {
  case ex => ex.printStackTrace()
}
  })
ssc
  }
Thanks & Regards,
Nayan Sharma
 *+91-8095382952*

<https://www.linkedin.com/in/nayan-sharma>
<http://stackoverflow.com/users/3687426/nayan-sharma?tab=profile>


On Mon, Feb 12, 2024 at 4:23 AM Mich Talebzadeh 
wrote:

> Hi,
>
> It is challenging to make a recommendation without further details. I am
> guessing you are trying to build a fault-tolerant spark application (spark
> structured streaming) that consumes messages from Solace?
> To address *NullPointerException* in the context of the provided
> information, you need to review the part of the code where the exception is
> thrown and identifying which object or method call is resulting in *null* can
> help the debugging process plus checking the logs.
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sat, 10 Feb 2024 at 05:29, nayan sharma 
> wrote:
>
>> Hi Users,
>>
>> I am trying to build fault tolerant spark solace consumer.
>>
>> Issue :- we have to take restart of the job due to multiple issue load
>> average is one of them. At that time whatever spark is processing or
>> batches in the queue is lost. We can't replay it because we already had
>> send ack while calling store().
>>
>> Solution:- I have tried implementing WAL and checkpointing in the
>> solution. Job is able to identify the lost batches, records are not being
>> written in the log file but throwing NPE.
>>
>> We are creating sparkcontext using sc.getorcreate()
>>
>>
>> Thanks,
>> Nayan
>>
>


Null pointer exception while replying WAL

2024-02-09 Thread nayan sharma
Hi Users,

I am trying to build fault tolerant spark solace consumer.

Issue :- we have to take restart of the job due to multiple issue load
average is one of them. At that time whatever spark is processing or
batches in the queue is lost. We can't replay it because we already had
send ack while calling store().

Solution:- I have tried implementing WAL and checkpointing in the solution.
Job is able to identify the lost batches, records are not being written in
the log file but throwing NPE.

We are creating sparkcontext using sc.getorcreate()


Thanks,
Nayan


Kafka Spark Structure Streaming Error

2022-05-05 Thread nayan sharma
)

at
org.apache.spark.sql.execution.datasources.orc.OrcOutputWriter.close(OrcOutputWriter.scala:58)

at
org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:57)

at
org.apache.spark.sql.execution.datasources.FileFormatDataWriter.abort(FileFormatDataWriter.scala:83)

at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$1.apply$mcV$sp(FileFormatWriter.scala:255)

at
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1377)

at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:253)

at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:174)

at
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)

at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)

at org.apache.spark.scheduler.Task.run(Task.scala:123)

at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:413)

at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1334)

at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:419)

at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

at java.base/java.lang.Thread.run(Thread.java:834)


Thanks & Regards,
Nayan Sharma
 *+91-8095382952*

<https://www.linkedin.com/in/nayan-sharma>
<http://stackoverflow.com/users/3687426/nayan-sharma?tab=profile>


Re: Spark Druid Ingestion

2018-03-22 Thread nayan sharma
Hey Jorge,

Thanks for responding.

Can you elaborate on the user permission part ? HDFS or local ?

As of now, hdfs path -> 
hdfs://n2pl-pa-hdn220.xxx.xxx:8020/user/yarn/.sparkStaging/application_1521457397747_0013/__spark_libs__8247917347016008883.zip
 

 already has complete access for yarn user and my job is also running from the 
same user.


Thanks,
Nayan


> On Mar 22, 2018, at 12:54 PM, Jorge Machado <jom...@me.com> wrote:
> 
> Seems to me permissions problems  ! Can you check your user / folder 
> permissions ? 
> 
> Jorge Machado
> 
> 
> 
> 
> 
>> On 22 Mar 2018, at 08:21, nayan sharma <nayansharm...@gmail.com 
>> <mailto:nayansharm...@gmail.com>> wrote:
>> 
>> Hi All,
>> As druid uses Hadoop MapReduce to ingest batch data but I am trying spark 
>> for ingesting data into druid taking reference from 
>> https://github.com/metamx/druid-spark-batch 
>> <https://github.com/metamx/druid-spark-batch>
>> But we are stuck at the following error.
>> Application Log:—>
>> 2018-03-20T07:54:28,782 INFO [task-runner-0-priority-0] 
>> org.apache.spark.deploy.yarn.Client - Will allocate AM container, with 896 
>> MB memory including 384 MB overhead
>> 2018-03-20T07:54:28,782 INFO [task-runner-0-priority-0] 
>> org.apache.spark.deploy.yarn.Client - Setting up container launch context 
>> for our AM
>> 2018-03-20T07:54:28,785 INFO [task-runner-0-priority-0] 
>> org.apache.spark.deploy.yarn.Client - Setting up the launch environment for 
>> our AM container
>> 2018-03-20T07:54:28,793 INFO [task-runner-0-priority-0] 
>> org.apache.spark.deploy.yarn.Client - Preparing resources for our AM 
>> container
>> 2018-03-20T07:54:29,364 WARN [task-runner-0-priority-0] 
>> org.apache.spark.deploy.yarn.Client - Neither spark.yarn.jars nor 
>> spark.yarn.archive is set, falling back to uploading libraries under 
>> SPARK_HOME.
>> 2018-03-20T07:54:29,371 INFO [task-runner-0-priority-0] 
>> org.apache.spark.deploy.yarn.Client - Uploading resource 
>> file:/hdfs1/druid-0.11.0/var/tmp/spark-49af67df-1a21-4790-a02b-c737c7a44946/__spark_libs__8247917347016008883.zip
>>  -> 
>> hdfs://n2pl-pa-hdn220.xxx.xxx:8020/user/yarn/.sparkStaging/application_1521457397747_0013/__spark_libs__8247917347016008883.zip
>>  
>> 
>> 2018-03-20T07:54:29,607 INFO [task-runner-0-priority-0] 
>> org.apache.spark.deploy.yarn.Client - Uploading resource 
>> file:/hdfs1/druid-0.11.0/var/tmp/spark-49af67df-1a21-4790-a02b-c737c7a44946/__spark_conf__2240950972346324291.zip
>>  -> 
>> hdfs://n2pl-pa-hdn220.xxx.xxx:8020/user/yarn/.sparkStaging/application_1521457397747_0013/__spark_conf__.zip
>>  
>> 
>> 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] 
>> org.apache.spark.SecurityManager - Changing view acls to: yarn
>> 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] 
>> org.apache.spark.SecurityManager - Changing modify acls to: yarn
>> 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] 
>> org.apache.spark.SecurityManager - Changing view acls groups to: 
>> 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] 
>> org.apache.spark.SecurityManager - Changing modify acls groups to: 
>> 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] 
>> org.apache.spark.SecurityManager - SecurityManager: authentication disabled; 
>> ui acls disabled; users  with view permissions: Set(yarn); groups with view 
>> permissions: Set(); users  with modify permissions: Set(yarn); groups with 
>> modify permissions: Set()
>> 2018-03-20T07:54:29,679 INFO [task-runner-0-priority-0] 
>> org.apache.spark.deploy.yarn.Client - Submitting application 
>> application_1521457397747_0013 to ResourceManager
>> 2018-03-20T07:54:29,709 INFO [task-runner-0-priority-0] 
>> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted 
>> application application_1521457397747_0013
>> 2018-03-20T07:54:29,713 INFO [task-runner-0-priority-0] 
>> org.apache.spark.scheduler.cluster.SchedulerExtensionServices - Starting 
>> Yarn extension services with app application_1521457397747_0013 and 
>> attemptId None
>> 2018-03-20T07:54:30,722 INFO [task-runner-0-priority-0] 
>> org.apache.spark.deploy.yarn.Client - Application report for 
>> application_1521457397747_0013 (state: FAILED)
>> 2018-03-20T07:54:30,729 INFO [task-runner-0-priority-0] 
>> org.apache.spark.deploy.yarn.Client - 
>>   client token: N/A
>>   diagnostics: Application application_1521457397747_0013 failed 2 times 
>> due to AM Container for appattempt_1521457397747_0013_02 exited with  
>> exitCode: -10

Spark Druid Ingestion

2018-03-22 Thread nayan sharma
Hi All,As druid uses Hadoop MapReduce to ingest batch data but I am trying spark for ingesting data into druid taking reference from https://github.com/metamx/druid-spark-batchBut we are stuck at the following error.Application Log:—>2018-03-20T07:54:28,782 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Will allocate AM container, with 896 MB memory including 384 MB overhead2018-03-20T07:54:28,782 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Setting up container launch context for our AM
2018-03-20T07:54:28,785 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Setting up the launch environment for our AM container
2018-03-20T07:54:28,793 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Preparing resources for our AM container
2018-03-20T07:54:29,364 WARN [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
2018-03-20T07:54:29,371 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Uploading resource file:/hdfs1/druid-0.11.0/var/tmp/spark-49af67df-1a21-4790-a02b-c737c7a44946/__spark_libs__8247917347016008883.zip -> hdfs://n2pl-pa-hdn220.xxx.xxx:8020/user/yarn/.sparkStaging/application_1521457397747_0013/__spark_libs__8247917347016008883.zip
2018-03-20T07:54:29,607 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Uploading resource file:/hdfs1/druid-0.11.0/var/tmp/spark-49af67df-1a21-4790-a02b-c737c7a44946/__spark_conf__2240950972346324291.zip -> hdfs://n2pl-pa-hdn220.xxx.xxx:8020/user/yarn/.sparkStaging/application_1521457397747_0013/__spark_conf__.zip
2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] org.apache.spark.SecurityManager - Changing view acls to: yarn
2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] org.apache.spark.SecurityManager - Changing modify acls to: yarn
2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] org.apache.spark.SecurityManager - Changing view acls groups to: 2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] org.apache.spark.SecurityManager - Changing modify acls groups to: 
2018-03-20T07:54:29,673 INFO [task-runner-0-priority-0] org.apache.spark.SecurityManager - SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(yarn); groups with view permissions: Set(); users  with modify permissions: Set(yarn); groups with modify permissions: Set()
2018-03-20T07:54:29,679 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Submitting application application_1521457397747_0013 to ResourceManager
2018-03-20T07:54:29,709 INFO [task-runner-0-priority-0] org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1521457397747_0013
2018-03-20T07:54:29,713 INFO [task-runner-0-priority-0] org.apache.spark.scheduler.cluster.SchedulerExtensionServices - Starting Yarn extension services with app application_1521457397747_0013 and attemptId None
2018-03-20T07:54:30,722 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - Application report for application_1521457397747_0013 (state: FAILED)
2018-03-20T07:54:30,729 INFO [task-runner-0-priority-0] org.apache.spark.deploy.yarn.Client - 
	 client token: N/A
	 diagnostics: Application application_1521457397747_0013 failed 2 times due to AM Container for appattempt_1521457397747_0013_02 exited with  exitCode: -1000
For more detailed output, check the application tracking page: http://n-pa-hdn220.xxx.:8088/cluster/app/application_1521457397747_0013 Then click on links to logs of each attempt.
Diagnostics: No such file or directory
ENOENT: No such file or directory
	at org.apache.hadoop.io.nativeio.NativeIO$POSIX.chmodImpl(Native Method)	at org.apache.hadoop.io.nativeio.NativeIO$POSIX.chmod(NativeIO.java:230)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:756)
	at org.apache.hadoop.fs.DelegateToFileSystem.setPermission(DelegateToFileSystem.java:211)
	at org.apache.hadoop.fs.FilterFs.setPermission(FilterFs.java:252)
	at org.apache.hadoop.fs.FileContext$11.next(FileContext.java:1003)
	at org.apache.hadoop.fs.FileContext$11.next(FileContext.java:999)
	at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
	at org.apache.hadoop.fs.FileContext.setPermission(FileContext.java:1006)
	at org.apache.hadoop.yarn.util.FSDownload$3.run(FSDownload.java:421)
	at org.apache.hadoop.yarn.util.FSDownload$3.run(FSDownload.java:419)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
	at org.apache.hadoop.yarn.util.FSDownload.changePermissions(FSDownload.java:419)
	at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:365)
	at 

Re: splitting columns into new columns

2017-07-17 Thread nayan sharma
Hi Pralabh,

Thanks for your help.

val xx = columnList.map(x => x->0).toMap
val opMap = dataFrame.rdd.flatMap { row =>
columnList.foldLeft(xx) { case (y, col) =>
val s = row.getAs[String](col).split("\\^").length
if (y(col) < s)
y.updated(col, s)
else
y
}.toList
}


val colMaxSizeMap = opMap.groupBy(x => x._1).map(x => x._2.toList.maxBy(x => 
x._2)).collect().toMap
val x = dataFrame.rdd.map{x =>
val op = columnList.flatMap{ y =>
val op = x.getAs[String](y).split("\\^")
op++List.fill(colMaxSizeMap(y)-op.size)("")
}
Row.fromSeq(op)
}

val structFieldList = columnList.flatMap{colName =>
List.range(0,colMaxSizeMap(colName),1).map{ i =>
StructField(s"$colName"+s"$i",StringType)
}
}
val schema = StructType(structFieldList)
val data1=spark.createDataFrame(x,schema)

opMap
res13: org.apache.spark.rdd.RDD[(String, Int)]

But It is failing when opMap has null value.It is throwing 
java.lang.NullPointerException
trying to figure out.

val opMap1=opMap.filter(_._2 !="")

tried doing this but it is also failing with same exception.

Thanks,
Nayan




> On 17-Jul-2017, at 4:54 PM, Pralabh Kumar <pralabhku...@gmail.com> wrote:
> 
> Hi Nayan
> 
> Please find the solution of your problem which work on spark 2.
> 
> val spark = 
> SparkSession.builder().appName("practice").enableHiveSupport().getOrCreate()
>   val sc = spark.sparkContext
>   val sqlContext = spark.sqlContext
>   import spark.implicits._
>   val dataFrame = 
> sc.parallelize(List("ERN~58XX7~^EPN~5X551~|C~MXXX~MSO~^CAxxE~~3XXX5"))
>   .map(s=>s.split("\\|")).map(s=>(s(0),s(1)))
> .toDF("phone","contact")
>   dataFrame.show()
>   val newDataSet= dataFrame.rdd.map(data=>{
> val  t1 =  ArrayBuffer[String] ()
> for (i <- 0.to <http://0.to/>(1)) {
>   val col = data.get(i).asInstanceOf[String]
>   val dd= col.split("\\^").toSeq
>   for(col<-dd){
> t1 +=(col)
>   }
> }
> Row.fromSeq(t1.seq)
>   })
> 
>   val firtRow = dataFrame.select("*").take(1)(0)
>   dataFrame.schema.fieldNames
>   var schema =""
> 
>   for ((colNames,idx) <- dataFrame.schema.fieldNames.zipWithIndex.view) {
> val data = firtRow(idx).asInstanceOf[String].split("\\^")
> var j = 0
> for(d<-data){
>   schema = schema + colNames + j + ","
>   j = j+1
> }
>   }
>   schema=schema.substring(0,schema.length-1)
>   val sqlSchema = 
> StructType(schema.split(",").map(s=>StructField(s,StringType,false)))
>   sqlContext.createDataFrame(newDataSet,sqlSchema).show()
> 
> Regards
> Pralabh Kumar
> 
> 
> On Mon, Jul 17, 2017 at 1:55 PM, nayan sharma <nayansharm...@gmail.com 
> <mailto:nayansharm...@gmail.com>> wrote:
> If I have 2-3 values in a column then I can easily separate it and create new 
> columns with withColumn option.
> but I am trying to achieve it in loop and dynamically generate the new 
> columns as many times the ^ has occurred in column values
> 
> Can it be achieve in this way.
> 
>> On 17-Jul-2017, at 3:29 AM, ayan guha <guha.a...@gmail.com 
>> <mailto:guha.a...@gmail.com>> wrote:
>> 
>> You are looking for explode function.
>> 
>> On Mon, 17 Jul 2017 at 4:25 am, nayan sharma <nayansharm...@gmail.com 
>> <mailto:nayansharm...@gmail.com>> wrote:
>> I’ve a Dataframe where in some columns there are multiple values, always 
>> separated by ^
>> 
>> phone|contact|
>> ERN~58XX7~^EPN~5X551~|C~MXXX~MSO~^CAxxE~~3XXX5|
>> 
>> phone1|phone2|contact1|contact2| 
>> ERN~5XXX7|EPN~5891551~|C~MXXXH~MSO~|CAxxE~~3XXX5|
>> How can this be achieved using loop as the separator between column values
>> are not constant.
>> 
>> data.withColumn("phone",split($"phone","\\^")).select($"phon‌​e".getItem(0).as("ph‌​one1"),$"phone".getI‌​tem(1).as("phone2”))
>>  I though of doing this way but the problem is  column are having 100+ 
>> separator between the column values
>> 
>> 
>> 
>> Thank you,
>> Nayan
>> -- 
>> Best Regards,
>> Ayan Guha
> 
> 



Re: splitting columns into new columns

2017-07-17 Thread nayan sharma
If I have 2-3 values in a column then I can easily separate it and create new 
columns with withColumn option.
but I am trying to achieve it in loop and dynamically generate the new columns 
as many times the ^ has occurred in column values

Can it be achieve in this way.

> On 17-Jul-2017, at 3:29 AM, ayan guha <guha.a...@gmail.com> wrote:
> 
> You are looking for explode function.
> 
> On Mon, 17 Jul 2017 at 4:25 am, nayan sharma <nayansharm...@gmail.com 
> <mailto:nayansharm...@gmail.com>> wrote:
> I’ve a Dataframe where in some columns there are multiple values, always 
> separated by ^
> 
> phone|contact|
> ERN~58XX7~^EPN~5X551~|C~MXXX~MSO~^CAxxE~~3XXX5|
> 
> phone1|phone2|contact1|contact2| 
> ERN~5XXX7|EPN~5891551~|C~MXXXH~MSO~|CAxxE~~3XXX5|
> How can this be achieved using loop as the separator between column values
> are not constant.
> 
> data.withColumn("phone",split($"phone","\\^")).select($"phon‌​e".getItem(0).as("ph‌​one1"),$"phone".getI‌​tem(1).as("phone2”))
>  I though of doing this way but the problem is  column are having 100+ 
> separator between the column values
> 
> 
> 
> Thank you,
> Nayan
> -- 
> Best Regards,
> Ayan Guha



splitting columns into new columns

2017-07-16 Thread nayan sharma
I’ve a Dataframe where in some columns there are multiple values, always 
separated by ^

phone|contact|
ERN~58XX7~^EPN~5X551~|C~MXXX~MSO~^CAxxE~~3XXX5|

phone1|phone2|contact1|contact2| 
ERN~5XXX7|EPN~5891551~|C~MXXXH~MSO~|CAxxE~~3XXX5|
How can this be achieved using loop as the separator between column values
are not constant.

data.withColumn("phone",split($"phone","\\^")).select($"phon‌​e".getItem(0).as("ph‌​one1"),$"phone".getI‌​tem(1).as("phone2”))
 I though of doing this way but the problem is  column are having 100+ 
separator between the column values



Thank you,
Nayan

ElasticSearch Spark error

2017-05-15 Thread nayan sharma
Hi All,

ERROR:-

Caused by: org.apache.spark.util.TaskCompletionListenerException: Connection 
error (check network and/or proxy settings)- all nodes failed; tried 
[[10.0.1.8*:9200, 10.0.1.**:9200, 10.0.1.***:9200]]

I am getting this error while trying to show the dataframe.

df.count =5190767 and df.printSchema both are working fine.
It has 329 columns.

Do any one have any idea regarding this.Please help me to fix this.


Thanks,
Nayan





Test

2017-05-15 Thread nayan sharma
Test

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Fwd: isin query

2017-04-17 Thread nayan sharma
Thanks for responding.
df.filter($”msrid”===“m_123” || $”msrid”===“m_111”)

there are lots of workaround to my question but Can you let know whats wrong 
with the “isin” query.

Regards,
Nayan

> Begin forwarded message:
> 
> From: ayan guha <guha.a...@gmail.com>
> Subject: Re: isin query
> Date: 17 April 2017 at 8:13:24 PM IST
> To: nayan sharma <nayansharm...@gmail.com>, user@spark.apache.org
> 
> How about using OR operator in filter? 
> 
> On Tue, 18 Apr 2017 at 12:35 am, nayan sharma <nayansharm...@gmail.com 
> <mailto:nayansharm...@gmail.com>> wrote:
> Dataframe (df) having column msrid(String) having values 
> m_123,m_111,m_145,m_098,m_666
> 
> I wanted to filter out rows which are having values m_123,m_111,m_145
> 
> df.filter($"msrid".isin("m_123","m_111","m_145")).count 
> count =0
> while 
> df.filter($"msrid".isin("m_123")).count 
> count=121212
> I have tried using queries like 
> df.filter($"msrid" isin (List("m_123","m_111","m_145"):_*))
> count =0
> but 
> 
> df.filter($"msrid" isin (List("m_123"):_*))
> count=121212
> 
> Any suggestion will do a great help to me.
> 
> Thanks,
> Nayan
> -- 
> Best Regards,
> Ayan Guha



filter operation using isin

2017-04-17 Thread nayan sharma
Dataframe (df) having column msrid(String) having values 
m_123,m_111,m_145,m_098,m_666

I wanted to filter out rows which are having values m_123,m_111,m_145

df.filter($"msrid".isin("m_123","m_111","m_145")).count 
count =0
while 
df.filter($"msrid".isin("m_123")).count 
count=121212
I have tried using queries like 
df.filter($"msrid" isin (List("m_123","m_111","m_145"):_*))
count =0
but 

df.filter($"msrid" isin (List("m_123"):_*))
count=121212

Any suggestion will do a great help to me.

Best Regards,
Nayan

isin query

2017-04-17 Thread nayan sharma
Dataframe (df) having column msrid(String) having values 
m_123,m_111,m_145,m_098,m_666

I wanted to filter out rows which are having values m_123,m_111,m_145

df.filter($"msrid".isin("m_123","m_111","m_145")).count 
count =0
while 
df.filter($"msrid".isin("m_123")).count 
count=121212
I have tried using queries like 
df.filter($"msrid" isin (List("m_123","m_111","m_145"):_*))
count =0
but 

df.filter($"msrid" isin (List("m_123"):_*))
count=121212

Any suggestion will do a great help to me.

Thanks,
Nayan

Re: Error while reading the CSV

2017-04-07 Thread nayan sharma
Hi Yash,
I know this will work perfect but here I wanted  to read the csv using the 
assembly jar file.

Thanks,
Nayan

> On 07-Apr-2017, at 10:02 AM, Yash Sharma <yash...@gmail.com> wrote:
> 
> Hi Nayan,
> I use the --packages with the spark shell and the spark submit. Could you 
> please try that and let us know:
> Command:
> spark-submit --packages com.databricks:spark-csv_2.11:1.4.0
> 
> On Fri, 7 Apr 2017 at 00:39 nayan sharma <nayansharm...@gmail.com 
> <mailto:nayansharm...@gmail.com>> wrote:
> spark version 1.6.2
> scala version 2.10.5
> 
>> On 06-Apr-2017, at 8:05 PM, Jörn Franke <jornfra...@gmail.com 
>> <mailto:jornfra...@gmail.com>> wrote:
>> 
>> And which version does your Spark cluster use?
>> 
>> On 6. Apr 2017, at 16:11, nayan sharma <nayansharm...@gmail.com 
>> <mailto:nayansharm...@gmail.com>> wrote:
>> 
>>> scalaVersion := “2.10.5"
>>> 
>>> 
>>> 
>>> 
>>>> On 06-Apr-2017, at 7:35 PM, Jörn Franke <jornfra...@gmail.com 
>>>> <mailto:jornfra...@gmail.com>> wrote:
>>>> 
>>>> Maybe your Spark is based on scala 2.11, but you compile it for 2.10 or 
>>>> the other way around?
>>>> 
>>>> On 6. Apr 2017, at 15:54, nayan sharma <nayansharm...@gmail.com 
>>>> <mailto:nayansharm...@gmail.com>> wrote:
>>>> 
>>>>> In addition I am using spark version 1.6.2
>>>>> Is there any chance of error coming because of Scala version or 
>>>>> dependencies are not matching.?I just guessed.
>>>>> 
>>>>> Thanks,
>>>>> Nayan
>>>>> 
>>>>>  
>>>>>> On 06-Apr-2017, at 7:16 PM, nayan sharma <nayansharm...@gmail.com 
>>>>>> <mailto:nayansharm...@gmail.com>> wrote:
>>>>>> 
>>>>>> Hi Jorn,
>>>>>> Thanks for replying.
>>>>>> 
>>>>>> jar -tf catalyst-data-prepration-assembly-1.0.jar | grep csv
>>>>>> 
>>>>>> after doing this I have found a lot of classes under 
>>>>>> com/databricks/spark/csv/
>>>>>> 
>>>>>> do I need to check for any specific class ??
>>>>>> 
>>>>>> Regards,
>>>>>> Nayan
>>>>>>> On 06-Apr-2017, at 6:42 PM, Jörn Franke <jornfra...@gmail.com 
>>>>>>> <mailto:jornfra...@gmail.com>> wrote:
>>>>>>> 
>>>>>>> Is the library in your assembly jar?
>>>>>>> 
>>>>>>> On 6. Apr 2017, at 15:06, nayan sharma <nayansharm...@gmail.com 
>>>>>>> <mailto:nayansharm...@gmail.com>> wrote:
>>>>>>> 
>>>>>>>> Hi All,
>>>>>>>> I am getting error while loading CSV file.
>>>>>>>> 
>>>>>>>> val 
>>>>>>>> datacsv=sqlContext.read.format("com.databricks.spark.csv").option("header",
>>>>>>>>  "true").load("timeline.csv")
>>>>>>>> java.lang.NoSuchMethodError: 
>>>>>>>> org.apache.commons.csv.CSVFormat.withQuote(Ljava/lang/Character;)Lorg/apache/commons/csv/CSVFormat;
>>>>>>>> 
>>>>>>>> 
>>>>>>>> I have added the dependencies in sbt file 
>>>>>>>> // Spark Additional Library - CSV Read as DF
>>>>>>>> libraryDependencies += "com.databricks" %% "spark-csv" % “1.5.0"
>>>>>>>> and starting the spark-shell with command
>>>>>>>> 
>>>>>>>> spark-shell --master yarn-client  --jars 
>>>>>>>> /opt/packages/-data-prepration/target/scala-2.10/-data-prepration-assembly-1.0.jar
>>>>>>>>  --name nayan 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Thanks for any help!!
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Nayan
>>>>>> 
>>>>> 
>>> 
> 



Re: Error while reading the CSV

2017-04-06 Thread nayan sharma
spark version 1.6.2
scala version 2.10.5

> On 06-Apr-2017, at 8:05 PM, Jörn Franke <jornfra...@gmail.com> wrote:
> 
> And which version does your Spark cluster use?
> 
> On 6. Apr 2017, at 16:11, nayan sharma <nayansharm...@gmail.com 
> <mailto:nayansharm...@gmail.com>> wrote:
> 
>> scalaVersion := “2.10.5"
>> 
>> 
>> 
>> 
>>> On 06-Apr-2017, at 7:35 PM, Jörn Franke <jornfra...@gmail.com 
>>> <mailto:jornfra...@gmail.com>> wrote:
>>> 
>>> Maybe your Spark is based on scala 2.11, but you compile it for 2.10 or the 
>>> other way around?
>>> 
>>> On 6. Apr 2017, at 15:54, nayan sharma <nayansharm...@gmail.com 
>>> <mailto:nayansharm...@gmail.com>> wrote:
>>> 
>>>> In addition I am using spark version 1.6.2
>>>> Is there any chance of error coming because of Scala version or 
>>>> dependencies are not matching.?I just guessed.
>>>> 
>>>> Thanks,
>>>> Nayan
>>>> 
>>>>  
>>>>> On 06-Apr-2017, at 7:16 PM, nayan sharma <nayansharm...@gmail.com 
>>>>> <mailto:nayansharm...@gmail.com>> wrote:
>>>>> 
>>>>> Hi Jorn,
>>>>> Thanks for replying.
>>>>> 
>>>>> jar -tf catalyst-data-prepration-assembly-1.0.jar | grep csv
>>>>> 
>>>>> after doing this I have found a lot of classes under 
>>>>> com/databricks/spark/csv/
>>>>> 
>>>>> do I need to check for any specific class ??
>>>>> 
>>>>> Regards,
>>>>> Nayan
>>>>>> On 06-Apr-2017, at 6:42 PM, Jörn Franke <jornfra...@gmail.com 
>>>>>> <mailto:jornfra...@gmail.com>> wrote:
>>>>>> 
>>>>>> Is the library in your assembly jar?
>>>>>> 
>>>>>> On 6. Apr 2017, at 15:06, nayan sharma <nayansharm...@gmail.com 
>>>>>> <mailto:nayansharm...@gmail.com>> wrote:
>>>>>> 
>>>>>>> Hi All,
>>>>>>> I am getting error while loading CSV file.
>>>>>>> 
>>>>>>> val 
>>>>>>> datacsv=sqlContext.read.format("com.databricks.spark.csv").option("header",
>>>>>>>  "true").load("timeline.csv")
>>>>>>> java.lang.NoSuchMethodError: 
>>>>>>> org.apache.commons.csv.CSVFormat.withQuote(Ljava/lang/Character;)Lorg/apache/commons/csv/CSVFormat;
>>>>>>> 
>>>>>>> 
>>>>>>> I have added the dependencies in sbt file 
>>>>>>> // Spark Additional Library - CSV Read as DF
>>>>>>> libraryDependencies += "com.databricks" %% "spark-csv" % “1.5.0"
>>>>>>> and starting the spark-shell with command
>>>>>>> 
>>>>>>> spark-shell --master yarn-client  --jars 
>>>>>>> /opt/packages/-data-prepration/target/scala-2.10/-data-prepration-assembly-1.0.jar
>>>>>>>  --name nayan 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Thanks for any help!!
>>>>>>> 
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> Nayan
>>>>> 
>>>> 
>> 



Re: Error while reading the CSV

2017-04-06 Thread nayan sharma
scalaVersion := “2.10.5"




> On 06-Apr-2017, at 7:35 PM, Jörn Franke <jornfra...@gmail.com> wrote:
> 
> Maybe your Spark is based on scala 2.11, but you compile it for 2.10 or the 
> other way around?
> 
> On 6. Apr 2017, at 15:54, nayan sharma <nayansharm...@gmail.com 
> <mailto:nayansharm...@gmail.com>> wrote:
> 
>> In addition I am using spark version 1.6.2
>> Is there any chance of error coming because of Scala version or dependencies 
>> are not matching.?I just guessed.
>> 
>> Thanks,
>> Nayan
>> 
>>  
>>> On 06-Apr-2017, at 7:16 PM, nayan sharma <nayansharm...@gmail.com 
>>> <mailto:nayansharm...@gmail.com>> wrote:
>>> 
>>> Hi Jorn,
>>> Thanks for replying.
>>> 
>>> jar -tf catalyst-data-prepration-assembly-1.0.jar | grep csv
>>> 
>>> after doing this I have found a lot of classes under 
>>> com/databricks/spark/csv/
>>> 
>>> do I need to check for any specific class ??
>>> 
>>> Regards,
>>> Nayan
>>>> On 06-Apr-2017, at 6:42 PM, Jörn Franke <jornfra...@gmail.com 
>>>> <mailto:jornfra...@gmail.com>> wrote:
>>>> 
>>>> Is the library in your assembly jar?
>>>> 
>>>> On 6. Apr 2017, at 15:06, nayan sharma <nayansharm...@gmail.com 
>>>> <mailto:nayansharm...@gmail.com>> wrote:
>>>> 
>>>>> Hi All,
>>>>> I am getting error while loading CSV file.
>>>>> 
>>>>> val 
>>>>> datacsv=sqlContext.read.format("com.databricks.spark.csv").option("header",
>>>>>  "true").load("timeline.csv")
>>>>> java.lang.NoSuchMethodError: 
>>>>> org.apache.commons.csv.CSVFormat.withQuote(Ljava/lang/Character;)Lorg/apache/commons/csv/CSVFormat;
>>>>> 
>>>>> 
>>>>> I have added the dependencies in sbt file 
>>>>> // Spark Additional Library - CSV Read as DF
>>>>> libraryDependencies += "com.databricks" %% "spark-csv" % “1.5.0"
>>>>> and starting the spark-shell with command
>>>>> 
>>>>> spark-shell --master yarn-client  --jars 
>>>>> /opt/packages/-data-prepration/target/scala-2.10/-data-prepration-assembly-1.0.jar
>>>>>  --name nayan 
>>>>> 
>>>>> 
>>>>> 
>>>>> Thanks for any help!!
>>>>> 
>>>>> 
>>>>> Thanks,
>>>>> Nayan
>>> 
>> 



Re: Error while reading the CSV

2017-04-06 Thread nayan sharma
In addition I am using spark version 1.6.2
Is there any chance of error coming because of Scala version or dependencies 
are not matching.?I just guessed.

Thanks,
Nayan

 
> On 06-Apr-2017, at 7:16 PM, nayan sharma <nayansharm...@gmail.com> wrote:
> 
> Hi Jorn,
> Thanks for replying.
> 
> jar -tf catalyst-data-prepration-assembly-1.0.jar | grep csv
> 
> after doing this I have found a lot of classes under 
> com/databricks/spark/csv/
> 
> do I need to check for any specific class ??
> 
> Regards,
> Nayan
>> On 06-Apr-2017, at 6:42 PM, Jörn Franke <jornfra...@gmail.com 
>> <mailto:jornfra...@gmail.com>> wrote:
>> 
>> Is the library in your assembly jar?
>> 
>> On 6. Apr 2017, at 15:06, nayan sharma <nayansharm...@gmail.com 
>> <mailto:nayansharm...@gmail.com>> wrote:
>> 
>>> Hi All,
>>> I am getting error while loading CSV file.
>>> 
>>> val 
>>> datacsv=sqlContext.read.format("com.databricks.spark.csv").option("header", 
>>> "true").load("timeline.csv")
>>> java.lang.NoSuchMethodError: 
>>> org.apache.commons.csv.CSVFormat.withQuote(Ljava/lang/Character;)Lorg/apache/commons/csv/CSVFormat;
>>> 
>>> 
>>> I have added the dependencies in sbt file 
>>> // Spark Additional Library - CSV Read as DF
>>> libraryDependencies += "com.databricks" %% "spark-csv" % “1.5.0"
>>> and starting the spark-shell with command
>>> 
>>> spark-shell --master yarn-client  --jars 
>>> /opt/packages/-data-prepration/target/scala-2.10/-data-prepration-assembly-1.0.jar
>>>  --name nayan 
>>> 
>>> 
>>> 
>>> Thanks for any help!!
>>> 
>>> 
>>> Thanks,
>>> Nayan
> 



Re: Error while reading the CSV

2017-04-06 Thread nayan sharma
Hi Jorn,
Thanks for replying.

jar -tf catalyst-data-prepration-assembly-1.0.jar | grep csv

after doing this I have found a lot of classes under 
com/databricks/spark/csv/

do I need to check for any specific class ??

Regards,
Nayan
> On 06-Apr-2017, at 6:42 PM, Jörn Franke <jornfra...@gmail.com> wrote:
> 
> Is the library in your assembly jar?
> 
> On 6. Apr 2017, at 15:06, nayan sharma <nayansharm...@gmail.com 
> <mailto:nayansharm...@gmail.com>> wrote:
> 
>> Hi All,
>> I am getting error while loading CSV file.
>> 
>> val 
>> datacsv=sqlContext.read.format("com.databricks.spark.csv").option("header", 
>> "true").load("timeline.csv")
>> java.lang.NoSuchMethodError: 
>> org.apache.commons.csv.CSVFormat.withQuote(Ljava/lang/Character;)Lorg/apache/commons/csv/CSVFormat;
>> 
>> 
>> I have added the dependencies in sbt file 
>> // Spark Additional Library - CSV Read as DF
>> libraryDependencies += "com.databricks" %% "spark-csv" % “1.5.0"
>> and starting the spark-shell with command
>> 
>> spark-shell --master yarn-client  --jars 
>> /opt/packages/-data-prepration/target/scala-2.10/-data-prepration-assembly-1.0.jar
>>  --name nayan 
>> 
>> 
>> 
>> Thanks for any help!!
>> 
>> 
>> Thanks,
>> Nayan



Error while reading the CSV

2017-04-06 Thread nayan sharma
Hi All,
I am getting error while loading CSV file.

val datacsv=sqlContext.read.format("com.databricks.spark.csv").option("header", 
"true").load("timeline.csv")
java.lang.NoSuchMethodError: 
org.apache.commons.csv.CSVFormat.withQuote(Ljava/lang/Character;)Lorg/apache/commons/csv/CSVFormat;


I have added the dependencies in sbt file 
// Spark Additional Library - CSV Read as DF
libraryDependencies += "com.databricks" %% "spark-csv" % “1.5.0"
and starting the spark-shell with command

spark-shell --master yarn-client  --jars 
/opt/packages/-data-prepration/target/scala-2.10/-data-prepration-assembly-1.0.jar
 --name nayan 



Thanks for any help!!


Thanks,
Nayan

skipping header in multiple files

2017-03-24 Thread nayan sharma
Hi,
I wanted to skip all the headers of CSVs present in a directory.
After searching on Google I got to know that it can be done using 
sc.wholetextfiles.

Can any one suggest me how to do that in Scala.?

Thanks & Regards,
Nayan Sharma
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Persist RDD doubt

2017-03-23 Thread nayan sharma
In case of task failures,does spark clear the persisted RDD 
(StorageLevel.MEMORY_ONLY_SER) and recompute them again when the task is 
attempted to start from beginning. Or will the cached RDD be appended.

How does spark checks whether the RDD has been cached and skips the caching 
step for a particular task.

> On 23-Mar-2017, at 3:36 PM, Artur R <ar...@gpnxgroup.com> wrote:
> 
> I am not pretty sure, but:
>  - if RDD persisted in memory then on task fail executor JVM process fails 
> too, so the memory is released
>  - if RDD persisted on disk then on task fail Spark shutdown hook just wipes 
> temp files
> 
> On Thu, Mar 23, 2017 at 10:55 AM, Jörn Franke <jornfra...@gmail.com 
> <mailto:jornfra...@gmail.com>> wrote:
> What do you mean by clear ? What is the use case?
> 
> On 23 Mar 2017, at 10:16, nayan sharma <nayansharm...@gmail.com 
> <mailto:nayansharm...@gmail.com>> wrote:
> 
>> Does Spark clears the persisted RDD in case if the task fails ?
>> 
>> Regards,
>> 
>> Nayan
> 



Persist RDD doubt

2017-03-23 Thread nayan sharma
Does Spark clears the persisted RDD in case if the task fails ?

Regards,

Nayan