Hi Pralabh, Thanks for your help.
val xx = columnList.map(x => x->0).toMap val opMap = dataFrame.rdd.flatMap { row => columnList.foldLeft(xx) { case (y, col) => val s = row.getAs[String](col).split("\\^").length if (y(col) < s) y.updated(col, s) else y }.toList } val colMaxSizeMap = opMap.groupBy(x => x._1).map(x => x._2.toList.maxBy(x => x._2)).collect().toMap val x = dataFrame.rdd.map{x => val op = columnList.flatMap{ y => val op = x.getAs[String](y).split("\\^") op++List.fill(colMaxSizeMap(y)-op.size)("") } Row.fromSeq(op) } val structFieldList = columnList.flatMap{colName => List.range(0,colMaxSizeMap(colName),1).map{ i => StructField(s"$colName"+s"$i",StringType) } } val schema = StructType(structFieldList) val data1=spark.createDataFrame(x,schema) opMap res13: org.apache.spark.rdd.RDD[(String, Int)] But It is failing when opMap has null value.It is throwing java.lang.NullPointerException trying to figure out. val opMap1=opMap.filter(_._2 !="") tried doing this but it is also failing with same exception. Thanks, Nayan > On 17-Jul-2017, at 4:54 PM, Pralabh Kumar <pralabhku...@gmail.com> wrote: > > Hi Nayan > > Please find the solution of your problem which work on spark 2. > > val spark = > SparkSession.builder().appName("practice").enableHiveSupport().getOrCreate() > val sc = spark.sparkContext > val sqlContext = spark.sqlContext > import spark.implicits._ > val dataFrame = > sc.parallelize(List("ERN~58XXXXXX7~^EPN~5XXXXX551~|C~MXXX~MSO~^CAxxE~~~~~~3XXX5")) > .map(s=>s.split("\\|")).map(s=>(s(0),s(1))) > .toDF("phone","contact") > dataFrame.show() > val newDataSet= dataFrame.rdd.map(data=>{ > val t1 = ArrayBuffer[String] () > for (i <- 0.to <http://0.to/>(1)) { > val col = data.get(i).asInstanceOf[String] > val dd= col.split("\\^").toSeq > for(col<-dd){ > t1 +=(col) > } > } > Row.fromSeq(t1.seq) > }) > > val firtRow = dataFrame.select("*").take(1)(0) > dataFrame.schema.fieldNames > var schema ="" > > for ((colNames,idx) <- dataFrame.schema.fieldNames.zipWithIndex.view) { > val data = firtRow(idx).asInstanceOf[String].split("\\^") > var j = 0 > for(d<-data){ > schema = schema + colNames + j + "," > j = j+1 > } > } > schema=schema.substring(0,schema.length-1) > val sqlSchema = > StructType(schema.split(",").map(s=>StructField(s,StringType,false))) > sqlContext.createDataFrame(newDataSet,sqlSchema).show() > > Regards > Pralabh Kumar > > > On Mon, Jul 17, 2017 at 1:55 PM, nayan sharma <nayansharm...@gmail.com > <mailto:nayansharm...@gmail.com>> wrote: > If I have 2-3 values in a column then I can easily separate it and create new > columns with withColumn option. > but I am trying to achieve it in loop and dynamically generate the new > columns as many times the ^ has occurred in column values > > Can it be achieve in this way. > >> On 17-Jul-2017, at 3:29 AM, ayan guha <guha.a...@gmail.com >> <mailto:guha.a...@gmail.com>> wrote: >> >> You are looking for explode function. >> >> On Mon, 17 Jul 2017 at 4:25 am, nayan sharma <nayansharm...@gmail.com >> <mailto:nayansharm...@gmail.com>> wrote: >> I’ve a Dataframe where in some columns there are multiple values, always >> separated by ^ >> >> phone|contact| >> ERN~58XXXXXX7~^EPN~5XXXXX551~|C~MXXX~MSO~^CAxxE~~~~~~3XXX5| >> >> phone1|phone2|contact1|contact2| >> ERN~5XXXXXXX7|EPN~58XXXX91551~|C~MXXXH~MSO~|CAxxE~~~~~~3XXX5| >> How can this be achieved using loop as the separator between column values >> are not constant. >> >> data.withColumn("phone",split($"phone","\\^")).select($"phone".getItem(0).as("phone1"),$"phone".getItem(1).as("phone2”)) >> I though of doing this way but the problem is column are having 100+ >> separator between the column values >> >> >> >> Thank you, >> Nayan >> -- >> Best Regards, >> Ayan Guha > >