Hi Pralabh,

Thanks for your help.

val xx = columnList.map(x => x->0).toMap
val opMap = dataFrame.rdd.flatMap { row =>
columnList.foldLeft(xx) { case (y, col) =>
val s = row.getAs[String](col).split("\\^").length
if (y(col) < s)
y.updated(col, s)
else
y
}.toList
}


val colMaxSizeMap = opMap.groupBy(x => x._1).map(x => x._2.toList.maxBy(x => 
x._2)).collect().toMap
val x = dataFrame.rdd.map{x =>
val op = columnList.flatMap{ y =>
val op = x.getAs[String](y).split("\\^")
op++List.fill(colMaxSizeMap(y)-op.size)("")
}
Row.fromSeq(op)
}

val structFieldList = columnList.flatMap{colName =>
List.range(0,colMaxSizeMap(colName),1).map{ i =>
StructField(s"$colName"+s"$i",StringType)
}
}
val schema = StructType(structFieldList)
val data1=spark.createDataFrame(x,schema)

opMap
res13: org.apache.spark.rdd.RDD[(String, Int)]

But It is failing when opMap has null value.It is throwing 
java.lang.NullPointerException
trying to figure out.

val opMap1=opMap.filter(_._2 !="")

tried doing this but it is also failing with same exception.

Thanks,
Nayan




> On 17-Jul-2017, at 4:54 PM, Pralabh Kumar <pralabhku...@gmail.com> wrote:
> 
> Hi Nayan
> 
> Please find the solution of your problem which work on spark 2.
> 
> val spark = 
> SparkSession.builder().appName("practice").enableHiveSupport().getOrCreate()
>   val sc = spark.sparkContext
>   val sqlContext = spark.sqlContext
>   import spark.implicits._
>   val dataFrame = 
> sc.parallelize(List("ERN~58XXXXXX7~^EPN~5XXXXX551~|C~MXXX~MSO~^CAxxE~~~~~~3XXX5"))
>       .map(s=>s.split("\\|")).map(s=>(s(0),s(1)))
>     .toDF("phone","contact")
>   dataFrame.show()
>   val newDataSet= dataFrame.rdd.map(data=>{
>     val  t1 =  ArrayBuffer[String] ()
>     for (i <- 0.to <http://0.to/>(1)) {
>       val col = data.get(i).asInstanceOf[String]
>       val dd= col.split("\\^").toSeq
>       for(col<-dd){
>         t1 +=(col)
>       }
>     }
>     Row.fromSeq(t1.seq)
>   })
> 
>   val firtRow = dataFrame.select("*").take(1)(0)
>   dataFrame.schema.fieldNames
>   var schema =""
> 
>   for ((colNames,idx) <- dataFrame.schema.fieldNames.zipWithIndex.view) {
>     val data = firtRow(idx).asInstanceOf[String].split("\\^")
>     var j = 0
>     for(d<-data){
>       schema = schema + colNames + j + ","
>       j = j+1
>     }
>   }
>   schema=schema.substring(0,schema.length-1)
>   val sqlSchema = 
> StructType(schema.split(",").map(s=>StructField(s,StringType,false)))
>   sqlContext.createDataFrame(newDataSet,sqlSchema).show()
> 
> Regards
> Pralabh Kumar
> 
> 
> On Mon, Jul 17, 2017 at 1:55 PM, nayan sharma <nayansharm...@gmail.com 
> <mailto:nayansharm...@gmail.com>> wrote:
> If I have 2-3 values in a column then I can easily separate it and create new 
> columns with withColumn option.
> but I am trying to achieve it in loop and dynamically generate the new 
> columns as many times the ^ has occurred in column values
> 
> Can it be achieve in this way.
> 
>> On 17-Jul-2017, at 3:29 AM, ayan guha <guha.a...@gmail.com 
>> <mailto:guha.a...@gmail.com>> wrote:
>> 
>> You are looking for explode function.
>> 
>> On Mon, 17 Jul 2017 at 4:25 am, nayan sharma <nayansharm...@gmail.com 
>> <mailto:nayansharm...@gmail.com>> wrote:
>> I’ve a Dataframe where in some columns there are multiple values, always 
>> separated by ^
>> 
>> phone|contact|
>> ERN~58XXXXXX7~^EPN~5XXXXX551~|C~MXXX~MSO~^CAxxE~~~~~~3XXX5|
>> 
>> phone1|phone2|contact1|contact2| 
>> ERN~5XXXXXXX7|EPN~58XXXX91551~|C~MXXXH~MSO~|CAxxE~~~~~~3XXX5|
>> How can this be achieved using loop as the separator between column values
>> are not constant.
>> 
>> data.withColumn("phone",split($"phone","\\^")).select($"phon‌​e".getItem(0).as("ph‌​one1"),$"phone".getI‌​tem(1).as("phone2”))
>>  I though of doing this way but the problem is  column are having 100+ 
>> separator between the column values
>> 
>> 
>> 
>> Thank you,
>> Nayan
>> -- 
>> Best Regards,
>> Ayan Guha
> 
> 

Reply via email to