[jira] [Created] (SPARK-20698) =, ==, > is not working as expected when used in sql query
someshwar kale created SPARK-20698: -- Summary: =, ==, > is not working as expected when used in sql query Key: SPARK-20698 URL: https://issues.apache.org/jira/browse/SPARK-20698 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.6.2 Environment: windows Reporter: someshwar kale Priority: Critical Fix For: 1.6.2 I have written below spark program- its not working as expected package computedBatch; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.hive.HiveContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public class ArithmeticIssueTest { private transient JavaSparkContext javaSparkContext; private transient SQLContext sqlContext; public ArithmeticIssueTest() { Logger.getLogger("org").setLevel(Level.OFF); Logger.getLogger("akka").setLevel(Level.OFF); SparkConf conf = new SparkConf().setAppName("ArithmeticIssueTest").setMaster("local[4]"); javaSparkContext = new JavaSparkContext(conf); sqlContext = new HiveContext(javaSparkContext); } public static void main(String[] args) { ArithmeticIssueTest arithmeticIssueTest = new ArithmeticIssueTest(); arithmeticIssueTest.execute(); } private void execute(){ List data = Arrays.asList( "a1,1494389759,99.8793003568,325.389705932", "a1,1494389759,99.9472573803,325.27559502", "a1,1494389759,99.7887233987,325.334374851", "a1,1494389759,99.9547800925,325.371537062", "a1,1494389759,99.8039111691,325.305285877", "a1,1494389759,99.8342317379,325.24881354", "a1,1494389759,99.9849449235,325.396678931", "a1,1494389759,99.9396731311,325.336115345", "a1,1494389759,99.9320915068,325.242622938", "a1,1494389759,99.894669,325.320965146", "a1,1494389759,99.7735359781,325.345168334", "a1,1494389759,99.9698837734,325.352291407", "a1,1494389759,99.8418330703,325.296539372", "a1,1494389759,99.796315751,325.347570632", "a1,1494389759,99.7811931613,325.351137315", "a1,1494389759,99.9773765104,325.218131741", "a1,1494389759,99.8189825201,325.288197381", "a1,1494389759,99.8115005369,325.282327633", "a1,1494389759,99.9924539722,325.24048614", "a1,1494389759,99.9170191204,325.299431664"); JavaRDD rawData = javaSparkContext.parallelize(data); List fields = new ArrayList<>(); fields.add(DataTypes.createStructField("ASSET_ID", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("TIMESTAMP", DataTypes.LongType, true)); fields.add(DataTypes.createStructField("fuel", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("temperature", DataTypes.DoubleType, true)); StructType schema = DataTypes.createStructType(fields); JavaRDD rowRDD = rawData.map( (Function) record -> { String[] fields1 = record.split(","); return RowFactory.create( fields1[0].trim(), Long.parseLong(fields1[1].trim()), Double.parseDouble(fields1[2].trim()), Double.parseDouble(fields1[3].trim())); }); DataFrame df = sqlContext.createDataFrame(rowRDD, schema); df.show(false); df.registerTempTable("x_linkx1087571272_filtered"); sqlContext.sql("SELECT x_linkx1087571272_filtered.ASSET_ID, count(case when x_linkx1087571272_filtered" + ".temperature=325.0 then 1 else 0 end) AS xsumptionx1582594572, max(x_linkx1087571272_filtered" + ".TIMESTAMP) AS eventTime FROM x_linkx1087571272_filtered GROUP BY x_linkx1087571272_filtered" + ".ASSET_ID").show(false); sqlContext.sql("SELECT x_linkx1087571272_filtered.ASSET_ID, count(case when x_linkx1087571272_filtered" + ".fuel>99.8 then 1 else 0 end) AS xnsumptionx352569416, max(x_linkx1087571272_filtered.TIMESTAMP) AS " + "eventTime FROM x_li
[jira] [Updated] (SPARK-20698) =, ==, > is not working as expected when used in sql query
[ https://issues.apache.org/jira/browse/SPARK-20698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] someshwar kale updated SPARK-20698: --- Description: I have written below spark program- its not working as expected {code} package computedBatch; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.hive.HiveContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public class ArithmeticIssueTest { private transient JavaSparkContext javaSparkContext; private transient SQLContext sqlContext; public ArithmeticIssueTest() { Logger.getLogger("org").setLevel(Level.OFF); Logger.getLogger("akka").setLevel(Level.OFF); SparkConf conf = new SparkConf().setAppName("ArithmeticIssueTest").setMaster("local[4]"); javaSparkContext = new JavaSparkContext(conf); sqlContext = new HiveContext(javaSparkContext); } public static void main(String[] args) { ArithmeticIssueTest arithmeticIssueTest = new ArithmeticIssueTest(); arithmeticIssueTest.execute(); } private void execute(){ List data = Arrays.asList( "a1,1494389759,99.8793003568,325.389705932", "a1,1494389759,99.9472573803,325.27559502", "a1,1494389759,99.7887233987,325.334374851", "a1,1494389759,99.9547800925,325.371537062", "a1,1494389759,99.8039111691,325.305285877", "a1,1494389759,99.8342317379,325.24881354", "a1,1494389759,99.9849449235,325.396678931", "a1,1494389759,99.9396731311,325.336115345", "a1,1494389759,99.9320915068,325.242622938", "a1,1494389759,99.894669,325.320965146", "a1,1494389759,99.7735359781,325.345168334", "a1,1494389759,99.9698837734,325.352291407", "a1,1494389759,99.8418330703,325.296539372", "a1,1494389759,99.796315751,325.347570632", "a1,1494389759,99.7811931613,325.351137315", "a1,1494389759,99.9773765104,325.218131741", "a1,1494389759,99.8189825201,325.288197381", "a1,1494389759,99.8115005369,325.282327633", "a1,1494389759,99.9924539722,325.24048614", "a1,1494389759,99.9170191204,325.299431664"); JavaRDD rawData = javaSparkContext.parallelize(data); List fields = new ArrayList<>(); fields.add(DataTypes.createStructField("ASSET_ID", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("TIMESTAMP", DataTypes.LongType, true)); fields.add(DataTypes.createStructField("fuel", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("temperature", DataTypes.DoubleType, true)); StructType schema = DataTypes.createStructType(fields); JavaRDD rowRDD = rawData.map( (Function) record -> { String[] fields1 = record.split(","); return RowFactory.create( fields1[0].trim(), Long.parseLong(fields1[1].trim()), Double.parseDouble(fields1[2].trim()), Double.parseDouble(fields1[3].trim())); }); DataFrame df = sqlContext.createDataFrame(rowRDD, schema); df.show(false); df.registerTempTable("x_linkx1087571272_filtered"); sqlContext.sql("SELECT x_linkx1087571272_filtered.ASSET_ID, count(case when x_linkx1087571272_filtered" + ".temperature=325.0 then 1 else 0 end) AS xsumptionx1582594572, max(x_linkx1087571272_filtered" + ".TIMESTAMP) AS eventTime FROM x_linkx1087571272_filtered GROUP BY x_linkx1087571272_filtered" + ".ASSET_ID").show(false); sqlContext.sql("SELECT x_linkx1087571272_filtered.ASSET_ID, count(case when x_linkx1087571272_filtered" + ".fuel>99.8 then 1 else 0 end) AS xnsumptionx352569416, max(x_linkx1087571272_filtered.TIMESTAMP) AS " + "eventTime FROM x_linkx1087571272_filtered GROUP BY x_linkx1087571272_filtered.ASSET_ID").show(false); //+ sqlContext.sql("SELECT x_linkx1087571272_filtered.ASSET_ID, count(case when x_linkx1087571272_filtered" + ".temperature==325.0 then 1 else 0 en
[jira] [Updated] (SPARK-20698) =, ==, > is not working as expected when used in sql query
[ https://issues.apache.org/jira/browse/SPARK-20698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] someshwar kale updated SPARK-20698: --- Description: I have written below spark program- its not working as expected {code} package computedBatch; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.hive.HiveContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public class ArithmeticIssueTest { private transient JavaSparkContext javaSparkContext; private transient SQLContext sqlContext; public ArithmeticIssueTest() { Logger.getLogger("org").setLevel(Level.OFF); Logger.getLogger("akka").setLevel(Level.OFF); SparkConf conf = new SparkConf().setAppName("ArithmeticIssueTest").setMaster("local[4]"); javaSparkContext = new JavaSparkContext(conf); sqlContext = new HiveContext(javaSparkContext); } public static void main(String[] args) { ArithmeticIssueTest arithmeticIssueTest = new ArithmeticIssueTest(); arithmeticIssueTest.execute(); } private void execute(){ List data = Arrays.asList( "a1,1494389759,99.8793003568,325.389705932", "a1,1494389759,99.9472573803,325.27559502", "a1,1494389759,99.7887233987,325.334374851", "a1,1494389759,99.9547800925,325.371537062", "a1,1494389759,99.8039111691,325.305285877", "a1,1494389759,99.8342317379,325.24881354", "a1,1494389759,99.9849449235,325.396678931", "a1,1494389759,99.9396731311,325.336115345", "a1,1494389759,99.9320915068,325.242622938", "a1,1494389759,99.894669,325.320965146", "a1,1494389759,99.7735359781,325.345168334", "a1,1494389759,99.9698837734,325.352291407", "a1,1494389759,99.8418330703,325.296539372", "a1,1494389759,99.796315751,325.347570632", "a1,1494389759,99.7811931613,325.351137315", "a1,1494389759,99.9773765104,325.218131741", "a1,1494389759,99.8189825201,325.288197381", "a1,1494389759,99.8115005369,325.282327633", "a1,1494389759,99.9924539722,325.24048614", "a1,1494389759,99.9170191204,325.299431664"); JavaRDD rawData = javaSparkContext.parallelize(data); List fields = new ArrayList<>(); fields.add(DataTypes.createStructField("ASSET_ID", DataTypes.StringType, true)); fields.add(DataTypes.createStructField("TIMESTAMP", DataTypes.LongType, true)); fields.add(DataTypes.createStructField("fuel", DataTypes.DoubleType, true)); fields.add(DataTypes.createStructField("temperature", DataTypes.DoubleType, true)); StructType schema = DataTypes.createStructType(fields); JavaRDD rowRDD = rawData.map( (Function) record -> { String[] fields1 = record.split(","); return RowFactory.create( fields1[0].trim(), Long.parseLong(fields1[1].trim()), Double.parseDouble(fields1[2].trim()), Double.parseDouble(fields1[3].trim())); }); DataFrame df = sqlContext.createDataFrame(rowRDD, schema); df.show(false); df.registerTempTable("x_linkx1087571272_filtered"); sqlContext.sql("SELECT x_linkx1087571272_filtered.ASSET_ID, count(case when x_linkx1087571272_filtered" + ".temperature=325.0 then 1 else 0 end) AS xsumptionx1582594572, max(x_linkx1087571272_filtered" + ".TIMESTAMP) AS eventTime FROM x_linkx1087571272_filtered GROUP BY x_linkx1087571272_filtered" + ".ASSET_ID").show(false); sqlContext.sql("SELECT x_linkx1087571272_filtered.ASSET_ID, count(case when x_linkx1087571272_filtered" + ".fuel>99.8 then 1 else 0 end) AS xnsumptionx352569416, max(x_linkx1087571272_filtered.TIMESTAMP) AS " + "eventTime FROM x_linkx1087571272_filtered GROUP BY x_linkx1087571272_filtered.ASSET_ID").show(false); //+ sqlContext.sql("SELECT x_linkx1087571272_filtered.ASSET_ID, count(case when x_linkx1087571272_filtered" + ".temperature==325.0 then 1 else 0 end) AS xsumptionx15825945
[jira] [Commented] (SPARK-20698) =, ==, > is not working as expected when used in sql query
[ https://issues.apache.org/jira/browse/SPARK-20698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16005955#comment-16005955 ] someshwar kale commented on SPARK-20698: Sorry [~srowen]..Silly question...will follow the ethics next time Thanks!! > =, ==, > is not working as expected when used in sql query > -- > > Key: SPARK-20698 > URL: https://issues.apache.org/jira/browse/SPARK-20698 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 1.6.2 > Environment: windows >Reporter: someshwar kale >Priority: Critical > > I have written below spark program- its not working as expected > {code} > package computedBatch; > import org.apache.log4j.Level; > import org.apache.log4j.Logger; > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.JavaSparkContext; > import org.apache.spark.api.java.function.Function; > import org.apache.spark.sql.DataFrame; > import org.apache.spark.sql.Row; > import org.apache.spark.sql.RowFactory; > import org.apache.spark.sql.SQLContext; > import org.apache.spark.sql.hive.HiveContext; > import org.apache.spark.sql.types.DataTypes; > import org.apache.spark.sql.types.StructField; > import org.apache.spark.sql.types.StructType; > import java.util.ArrayList; > import java.util.Arrays; > import java.util.List; > public class ArithmeticIssueTest { > private transient JavaSparkContext javaSparkContext; > private transient SQLContext sqlContext; > public ArithmeticIssueTest() { > Logger.getLogger("org").setLevel(Level.OFF); > Logger.getLogger("akka").setLevel(Level.OFF); > SparkConf conf = new > SparkConf().setAppName("ArithmeticIssueTest").setMaster("local[4]"); > javaSparkContext = new JavaSparkContext(conf); > sqlContext = new HiveContext(javaSparkContext); > } > public static void main(String[] args) { > ArithmeticIssueTest arithmeticIssueTest = new ArithmeticIssueTest(); > arithmeticIssueTest.execute(); > } > private void execute(){ > List data = Arrays.asList( > "a1,1494389759,99.8793003568,325.389705932", > "a1,1494389759,99.9472573803,325.27559502", > "a1,1494389759,99.7887233987,325.334374851", > "a1,1494389759,99.9547800925,325.371537062", > "a1,1494389759,99.8039111691,325.305285877", > "a1,1494389759,99.8342317379,325.24881354", > "a1,1494389759,99.9849449235,325.396678931", > "a1,1494389759,99.9396731311,325.336115345", > "a1,1494389759,99.9320915068,325.242622938", > "a1,1494389759,99.894669,325.320965146", > "a1,1494389759,99.7735359781,325.345168334", > "a1,1494389759,99.9698837734,325.352291407", > "a1,1494389759,99.8418330703,325.296539372", > "a1,1494389759,99.796315751,325.347570632", > "a1,1494389759,99.7811931613,325.351137315", > "a1,1494389759,99.9773765104,325.218131741", > "a1,1494389759,99.8189825201,325.288197381", > "a1,1494389759,99.8115005369,325.282327633", > "a1,1494389759,99.9924539722,325.24048614", > "a1,1494389759,99.9170191204,325.299431664"); > JavaRDD rawData = javaSparkContext.parallelize(data); > List fields = new ArrayList<>(); > fields.add(DataTypes.createStructField("ASSET_ID", > DataTypes.StringType, true)); > fields.add(DataTypes.createStructField("TIMESTAMP", > DataTypes.LongType, true)); > fields.add(DataTypes.createStructField("fuel", DataTypes.DoubleType, > true)); > fields.add(DataTypes.createStructField("temperature", > DataTypes.DoubleType, true)); > StructType schema = DataTypes.createStructType(fields); > JavaRDD rowRDD = rawData.map( > (Function) record -> { > String[] fields1 = record.split(","); > return RowFactory.create( > fields1[0].trim(), > Long.parseLong(fields1[1].trim()), > Double.parseDouble(fields1[2].trim()), > Double.parseDouble(fields1[3].trim())); > }); > DataFrame df = sqlContext.createDataFrame(rowRDD, schema); > df.show(false); > df.registerTempTable("x_linkx1087571272_filtered"); > sqlContext.sql("SELECT x_linkx1087571272_filtered.ASSET_ID, > count(case when x_linkx1087571272_filtered" + > ".temperature=325.0 then 1 else 0 end) AS > xsumptionx1582594572, max(x_linkx1087571272_filtered" + >
[jira] [Commented] (SPARK-48463) MLLib function unable to handle nested data
[ https://issues.apache.org/jira/browse/SPARK-48463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852751#comment-17852751 ] someshwar kale commented on SPARK-48463: As a temporary fix you may consider renaming the columns by adding a transformer as below- {code:java} import org.apache.spark.ml.Transformer import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} import org.apache.spark.ml.param.{ParamMap, Params} import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable} import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.types.{StructField, StructType} class RenameColumn(val uid: String) extends Transformer with Params with HasInputCol with HasOutputCol with DefaultParamsWritable { def this() = this(Identifiable.randomUID("RenameColumn")) /** @group setParam */ def setInputCol(value: String): this.type = set(inputCol, value) /** @group setParam */ def setOutputCol(value: String): this.type = set(outputCol, value) def validateAndTransformSchema(schema: StructType): StructType = { val col = schema(getInputCol) schema.add(StructField(getOutputCol, col.dataType, col.nullable, col.metadata)) } def transformSchema(schema: StructType): StructType = validateAndTransformSchema(schema) def copy(extra: ParamMap): RenameColumn = defaultCopy(extra) override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) dataset.toDF().withColumnRenamed(getInputCol, getOutputCol) } } object RenameColumn extends DefaultParamsReadable[RenameColumn] { override def load(path: String): RenameColumn = super.load(path) } {code} and use the above transformer in the pipeline as below- {code:java} val structureData = Seq( Row(Row(10, 12), 1000), Row(Row(12, 14), 4300), Row( Row(37, 891), 1400), Row(Row(8902, 12), 4000), Row(Row(12, 89), 1000) ) val structureSchema = new StructType() .add("location", new StructType() .add("longitude", IntegerType) .add("latitude", IntegerType)) .add("salary", IntegerType) val df = spark.createDataFrame(spark.sparkContext.parallelize(structureData), structureSchema) def flattenSchema(schema: StructType, prefix: String = null, prefixSelect: String = null): Array[Column] = { schema.fields.flatMap(f => { val colName = if (prefix == null) f.name else (prefix + "." + f.name) val colnameSelect = if (prefix == null) f.name else (prefixSelect + "." + f.name) f.dataType match { case st: StructType => flattenSchema(st, colName, colnameSelect) case _ => Array(col(colName).as(colnameSelect)) } }) } val flattenColumns = flattenSchema(df.schema) val flattenedDf = df.select(flattenColumns: _*) flattenedDf.printSchema flattenedDf.show() val renameColumn = new RenameColumn().setInputCol("location.longitude").setOutputCol("location_longitude") val si = new StringIndexer().setInputCol("location_longitude").setOutputCol("longitutdee") val pipeline = new Pipeline().setStages(Array(renameColumn, si)) pipeline.fit(flattenedDf).transform(flattenedDf).show() /** * +--+-+--+---+ * |location_longitude|location.latitude|salary|longitutdee| * +--+-+--+---+ * |10| 12| 1000|1.0| * |12| 14| 4300|0.0| * |37| 891| 1400|2.0| * | 8902| 12| 4000|3.0| * |12| 89| 1000|0.0| * +--+-+--+---+ */ {code} > MLLib function unable to handle nested data > --- > > Key: SPARK-48463 > URL: https://issues.apache.org/jira/browse/SPARK-48463 > Project: Spark > Issue Type: Bug > Components: ML, MLlib >Affects Versions: 3.5.1 >Reporter: Chhavi Bansal >Priority: Major > Labels: ML, MLPipelines, mllib, nested > > I am trying to use feature transformer on nested data after flattening, but > it fails. > > {code:java} > val structureData = Seq( > Row(Row(10, 12), 1000), > Row(Row(12, 14), 4300), > Row( Row(37, 891), 1400), > Row(Row(8902, 12), 4000), > Row(Row(12, 89), 1000) > ) > val structureSchema = new StructType() > .add("location", new StructType() > .add("longitude", IntegerType) > .add("latitude", IntegerType)) > .add("salary", IntegerType) > val df = spark.createDataFrame(spark.sparkContext.parallelize(structureData), > structureSchema) > def flattenSchema(schema: StructType, prefix: String = null, prefixSelect: > String = null): > Array[Column] = { > schema.fields.flatMap(f => { > val colName = if (prefix == null) f.name els
[jira] [Commented] (SPARK-48463) MLLib function unable to handle nested data
[ https://issues.apache.org/jira/browse/SPARK-48463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853363#comment-17853363 ] someshwar kale commented on SPARK-48463: I don't think renaming 200+ (simple metadata modification) columns causes addition 200+ stages. Spark is enough intelligent to handle that well. Please check exec plan for more details. Also, you can improve rename columns to rename multiple columns at once. This is, i think, least to do to unblock you till the bug is resolved. > MLLib function unable to handle nested data > --- > > Key: SPARK-48463 > URL: https://issues.apache.org/jira/browse/SPARK-48463 > Project: Spark > Issue Type: Bug > Components: ML, MLlib >Affects Versions: 3.5.1 >Reporter: Chhavi Bansal >Priority: Major > Labels: ML, MLPipelines, mllib, nested > > I am trying to use feature transformer on nested data after flattening, but > it fails. > > {code:java} > val structureData = Seq( > Row(Row(10, 12), 1000), > Row(Row(12, 14), 4300), > Row( Row(37, 891), 1400), > Row(Row(8902, 12), 4000), > Row(Row(12, 89), 1000) > ) > val structureSchema = new StructType() > .add("location", new StructType() > .add("longitude", IntegerType) > .add("latitude", IntegerType)) > .add("salary", IntegerType) > val df = spark.createDataFrame(spark.sparkContext.parallelize(structureData), > structureSchema) > def flattenSchema(schema: StructType, prefix: String = null, prefixSelect: > String = null): > Array[Column] = { > schema.fields.flatMap(f => { > val colName = if (prefix == null) f.name else (prefix + "." + f.name) > val colnameSelect = if (prefix == null) f.name else (prefixSelect + "." + > f.name) > f.dataType match { > case st: StructType => flattenSchema(st, colName, colnameSelect) > case _ => > Array(col(colName).as(colnameSelect)) > } > }) > } > val flattenColumns = flattenSchema(df.schema) > val flattenedDf = df.select(flattenColumns: _*){code} > Now using the string indexer on the DOT notation. > > {code:java} > val si = new > StringIndexer().setInputCol("location.longitude").setOutputCol("longitutdee") > val pipeline = new Pipeline().setStages(Array(si)) > pipeline.fit(flattenedDf).transform(flattenedDf).show() {code} > The above code fails > {code:java} > xception in thread "main" org.apache.spark.sql.AnalysisException: Cannot > resolve column name "location.longitude" among (location.longitude, > location.latitude, salary); did you mean to quote the `location.longitude` > column? > at > org.apache.spark.sql.errors.QueryCompilationErrors$.cannotResolveColumnNameAmongFieldsError(QueryCompilationErrors.scala:2261) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$resolveException(Dataset.scala:258) > at org.apache.spark.sql.Dataset.$anonfun$resolve$1(Dataset.scala:250) > . {code} > This points to the same failure as when we try to select dot notation columns > in a spark dataframe, which is solved using BACKTICKS *`column.name`.* > [https://stackoverflow.com/a/51430335/11688337] > > *so next* > I use the back ticks while defining stringIndexer > {code:java} > val si = new > StringIndexer().setInputCol("`location.longitude`").setOutputCol("longitutdee") > {code} > In this case *it again fails* (with a diff reason) in the stringIndexer code > itself > {code:java} > Exception in thread "main" org.apache.spark.SparkException: Input column > `location.longitude` does not exist. > at > org.apache.spark.ml.feature.StringIndexerBase.$anonfun$validateAndTransformSchema$2(StringIndexer.scala:128) > at > scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:244) > at > scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) > at > scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) > {code} > > This blocks me to use feature transformation functions on nested columns. > Any help in solving this problem will be highly appreciated. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-48463) MLLib function unable to handle nested data
[ https://issues.apache.org/jira/browse/SPARK-48463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853459#comment-17853459 ] someshwar kale commented on SPARK-48463: Currently there is no way to handle backtick(`) spark StructType. Hence the field name a.b and `a.b` are completely different within StructType. To handle that, I have added a custom implementation fixing StringIndexer#validateAndTransformSchema. You can refer to the code on [my github|https://github.com/skale1990/LearnSpark/blob/main/src/main/java/com/som/learnspark/TestCustomStringIndexer.scala]. > MLLib function unable to handle nested data > --- > > Key: SPARK-48463 > URL: https://issues.apache.org/jira/browse/SPARK-48463 > Project: Spark > Issue Type: Bug > Components: ML, MLlib >Affects Versions: 3.5.1 >Reporter: Chhavi Bansal >Priority: Major > Labels: ML, MLPipelines, mllib, nested > > I am trying to use feature transformer on nested data after flattening, but > it fails. > > {code:java} > val structureData = Seq( > Row(Row(10, 12), 1000), > Row(Row(12, 14), 4300), > Row( Row(37, 891), 1400), > Row(Row(8902, 12), 4000), > Row(Row(12, 89), 1000) > ) > val structureSchema = new StructType() > .add("location", new StructType() > .add("longitude", IntegerType) > .add("latitude", IntegerType)) > .add("salary", IntegerType) > val df = spark.createDataFrame(spark.sparkContext.parallelize(structureData), > structureSchema) > def flattenSchema(schema: StructType, prefix: String = null, prefixSelect: > String = null): > Array[Column] = { > schema.fields.flatMap(f => { > val colName = if (prefix == null) f.name else (prefix + "." + f.name) > val colnameSelect = if (prefix == null) f.name else (prefixSelect + "." + > f.name) > f.dataType match { > case st: StructType => flattenSchema(st, colName, colnameSelect) > case _ => > Array(col(colName).as(colnameSelect)) > } > }) > } > val flattenColumns = flattenSchema(df.schema) > val flattenedDf = df.select(flattenColumns: _*){code} > Now using the string indexer on the DOT notation. > > {code:java} > val si = new > StringIndexer().setInputCol("location.longitude").setOutputCol("longitutdee") > val pipeline = new Pipeline().setStages(Array(si)) > pipeline.fit(flattenedDf).transform(flattenedDf).show() {code} > The above code fails > {code:java} > xception in thread "main" org.apache.spark.sql.AnalysisException: Cannot > resolve column name "location.longitude" among (location.longitude, > location.latitude, salary); did you mean to quote the `location.longitude` > column? > at > org.apache.spark.sql.errors.QueryCompilationErrors$.cannotResolveColumnNameAmongFieldsError(QueryCompilationErrors.scala:2261) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$resolveException(Dataset.scala:258) > at org.apache.spark.sql.Dataset.$anonfun$resolve$1(Dataset.scala:250) > . {code} > This points to the same failure as when we try to select dot notation columns > in a spark dataframe, which is solved using BACKTICKS *`column.name`.* > [https://stackoverflow.com/a/51430335/11688337] > > *so next* > I use the back ticks while defining stringIndexer > {code:java} > val si = new > StringIndexer().setInputCol("`location.longitude`").setOutputCol("longitutdee") > {code} > In this case *it again fails* (with a diff reason) in the stringIndexer code > itself > {code:java} > Exception in thread "main" org.apache.spark.SparkException: Input column > `location.longitude` does not exist. > at > org.apache.spark.ml.feature.StringIndexerBase.$anonfun$validateAndTransformSchema$2(StringIndexer.scala:128) > at > scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:244) > at > scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) > at > scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) > {code} > > This blocks me to use feature transformation functions on nested columns. > Any help in solving this problem will be highly appreciated. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org