[jira] [Created] (SPARK-20698) =, ==, > is not working as expected when used in sql query

2017-05-10 Thread someshwar kale (JIRA)
someshwar kale created SPARK-20698:
--

 Summary: =, ==, > is not working as expected when used in sql query
 Key: SPARK-20698
 URL: https://issues.apache.org/jira/browse/SPARK-20698
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.6.2
 Environment: windows
Reporter: someshwar kale
Priority: Critical
 Fix For: 1.6.2


I have written below spark program- its not working as expected


package computedBatch;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;


public class ArithmeticIssueTest {
private transient JavaSparkContext javaSparkContext;
private transient SQLContext sqlContext;

public ArithmeticIssueTest() {
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
SparkConf conf = new 
SparkConf().setAppName("ArithmeticIssueTest").setMaster("local[4]");
javaSparkContext = new JavaSparkContext(conf);
sqlContext = new HiveContext(javaSparkContext);
}

public static void main(String[] args) {
  ArithmeticIssueTest arithmeticIssueTest = new ArithmeticIssueTest();
arithmeticIssueTest.execute();
}

private void execute(){
List data = Arrays.asList(
"a1,1494389759,99.8793003568,325.389705932",
"a1,1494389759,99.9472573803,325.27559502",
"a1,1494389759,99.7887233987,325.334374851",
"a1,1494389759,99.9547800925,325.371537062",
"a1,1494389759,99.8039111691,325.305285877",
"a1,1494389759,99.8342317379,325.24881354",
"a1,1494389759,99.9849449235,325.396678931",
"a1,1494389759,99.9396731311,325.336115345",
"a1,1494389759,99.9320915068,325.242622938",
"a1,1494389759,99.894669,325.320965146",
"a1,1494389759,99.7735359781,325.345168334",
"a1,1494389759,99.9698837734,325.352291407",
"a1,1494389759,99.8418330703,325.296539372",
"a1,1494389759,99.796315751,325.347570632",
"a1,1494389759,99.7811931613,325.351137315",
"a1,1494389759,99.9773765104,325.218131741",
"a1,1494389759,99.8189825201,325.288197381",
"a1,1494389759,99.8115005369,325.282327633",
"a1,1494389759,99.9924539722,325.24048614",
"a1,1494389759,99.9170191204,325.299431664");
JavaRDD rawData = javaSparkContext.parallelize(data);
List fields = new ArrayList<>();
fields.add(DataTypes.createStructField("ASSET_ID", 
DataTypes.StringType, true));
fields.add(DataTypes.createStructField("TIMESTAMP", DataTypes.LongType, 
true));
fields.add(DataTypes.createStructField("fuel", DataTypes.DoubleType, 
true));
fields.add(DataTypes.createStructField("temperature", 
DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
JavaRDD rowRDD = rawData.map(
(Function) record -> {
String[] fields1 = record.split(",");
return RowFactory.create(
fields1[0].trim(),
Long.parseLong(fields1[1].trim()),
Double.parseDouble(fields1[2].trim()),
Double.parseDouble(fields1[3].trim()));
});
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
df.show(false);
df.registerTempTable("x_linkx1087571272_filtered");

sqlContext.sql("SELECT x_linkx1087571272_filtered.ASSET_ID, count(case 
when x_linkx1087571272_filtered" +
".temperature=325.0 then 1 else 0 end) AS xsumptionx1582594572, 
max(x_linkx1087571272_filtered" +
".TIMESTAMP) AS eventTime  FROM x_linkx1087571272_filtered 
GROUP BY x_linkx1087571272_filtered" +
".ASSET_ID").show(false);

sqlContext.sql("SELECT x_linkx1087571272_filtered.ASSET_ID, count(case 
when x_linkx1087571272_filtered" +
".fuel>99.8 then 1 else 0 end) AS xnsumptionx352569416, 
max(x_linkx1087571272_filtered.TIMESTAMP) AS " +
"eventTime  FROM x_li

[jira] [Updated] (SPARK-20698) =, ==, > is not working as expected when used in sql query

2017-05-10 Thread someshwar kale (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

someshwar kale updated SPARK-20698:
---
Description: 
I have written below spark program- its not working as expected


{code}
package computedBatch;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;


public class ArithmeticIssueTest {
private transient JavaSparkContext javaSparkContext;
private transient SQLContext sqlContext;

public ArithmeticIssueTest() {
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
SparkConf conf = new 
SparkConf().setAppName("ArithmeticIssueTest").setMaster("local[4]");
javaSparkContext = new JavaSparkContext(conf);
sqlContext = new HiveContext(javaSparkContext);
}

public static void main(String[] args) {
  ArithmeticIssueTest arithmeticIssueTest = new ArithmeticIssueTest();
arithmeticIssueTest.execute();
}

private void execute(){
List data = Arrays.asList(
"a1,1494389759,99.8793003568,325.389705932",
"a1,1494389759,99.9472573803,325.27559502",
"a1,1494389759,99.7887233987,325.334374851",
"a1,1494389759,99.9547800925,325.371537062",
"a1,1494389759,99.8039111691,325.305285877",
"a1,1494389759,99.8342317379,325.24881354",
"a1,1494389759,99.9849449235,325.396678931",
"a1,1494389759,99.9396731311,325.336115345",
"a1,1494389759,99.9320915068,325.242622938",
"a1,1494389759,99.894669,325.320965146",
"a1,1494389759,99.7735359781,325.345168334",
"a1,1494389759,99.9698837734,325.352291407",
"a1,1494389759,99.8418330703,325.296539372",
"a1,1494389759,99.796315751,325.347570632",
"a1,1494389759,99.7811931613,325.351137315",
"a1,1494389759,99.9773765104,325.218131741",
"a1,1494389759,99.8189825201,325.288197381",
"a1,1494389759,99.8115005369,325.282327633",
"a1,1494389759,99.9924539722,325.24048614",
"a1,1494389759,99.9170191204,325.299431664");
JavaRDD rawData = javaSparkContext.parallelize(data);
List fields = new ArrayList<>();
fields.add(DataTypes.createStructField("ASSET_ID", 
DataTypes.StringType, true));
fields.add(DataTypes.createStructField("TIMESTAMP", DataTypes.LongType, 
true));
fields.add(DataTypes.createStructField("fuel", DataTypes.DoubleType, 
true));
fields.add(DataTypes.createStructField("temperature", 
DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
JavaRDD rowRDD = rawData.map(
(Function) record -> {
String[] fields1 = record.split(",");
return RowFactory.create(
fields1[0].trim(),
Long.parseLong(fields1[1].trim()),
Double.parseDouble(fields1[2].trim()),
Double.parseDouble(fields1[3].trim()));
});
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
df.show(false);
df.registerTempTable("x_linkx1087571272_filtered");

sqlContext.sql("SELECT x_linkx1087571272_filtered.ASSET_ID, count(case 
when x_linkx1087571272_filtered" +
".temperature=325.0 then 1 else 0 end) AS xsumptionx1582594572, 
max(x_linkx1087571272_filtered" +
".TIMESTAMP) AS eventTime  FROM x_linkx1087571272_filtered 
GROUP BY x_linkx1087571272_filtered" +
".ASSET_ID").show(false);

sqlContext.sql("SELECT x_linkx1087571272_filtered.ASSET_ID, count(case 
when x_linkx1087571272_filtered" +
".fuel>99.8 then 1 else 0 end) AS xnsumptionx352569416, 
max(x_linkx1087571272_filtered.TIMESTAMP) AS " +
"eventTime  FROM x_linkx1087571272_filtered GROUP BY 
x_linkx1087571272_filtered.ASSET_ID").show(false);

//+
sqlContext.sql("SELECT x_linkx1087571272_filtered.ASSET_ID, count(case 
when x_linkx1087571272_filtered" +
".temperature==325.0 then 1 else 0 en

[jira] [Updated] (SPARK-20698) =, ==, > is not working as expected when used in sql query

2017-05-10 Thread someshwar kale (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-20698?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

someshwar kale updated SPARK-20698:
---
Description: 
I have written below spark program- its not working as expected

{code}
package computedBatch;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;


public class ArithmeticIssueTest {
private transient JavaSparkContext javaSparkContext;
private transient SQLContext sqlContext;

public ArithmeticIssueTest() {
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
SparkConf conf = new 
SparkConf().setAppName("ArithmeticIssueTest").setMaster("local[4]");
javaSparkContext = new JavaSparkContext(conf);
sqlContext = new HiveContext(javaSparkContext);
}

public static void main(String[] args) {
  ArithmeticIssueTest arithmeticIssueTest = new ArithmeticIssueTest();
arithmeticIssueTest.execute();
}

private void execute(){
List data = Arrays.asList(
"a1,1494389759,99.8793003568,325.389705932",
"a1,1494389759,99.9472573803,325.27559502",
"a1,1494389759,99.7887233987,325.334374851",
"a1,1494389759,99.9547800925,325.371537062",
"a1,1494389759,99.8039111691,325.305285877",
"a1,1494389759,99.8342317379,325.24881354",
"a1,1494389759,99.9849449235,325.396678931",
"a1,1494389759,99.9396731311,325.336115345",
"a1,1494389759,99.9320915068,325.242622938",
"a1,1494389759,99.894669,325.320965146",
"a1,1494389759,99.7735359781,325.345168334",
"a1,1494389759,99.9698837734,325.352291407",
"a1,1494389759,99.8418330703,325.296539372",
"a1,1494389759,99.796315751,325.347570632",
"a1,1494389759,99.7811931613,325.351137315",
"a1,1494389759,99.9773765104,325.218131741",
"a1,1494389759,99.8189825201,325.288197381",
"a1,1494389759,99.8115005369,325.282327633",
"a1,1494389759,99.9924539722,325.24048614",
"a1,1494389759,99.9170191204,325.299431664");
JavaRDD rawData = javaSparkContext.parallelize(data);
List fields = new ArrayList<>();
fields.add(DataTypes.createStructField("ASSET_ID", 
DataTypes.StringType, true));
fields.add(DataTypes.createStructField("TIMESTAMP", DataTypes.LongType, 
true));
fields.add(DataTypes.createStructField("fuel", DataTypes.DoubleType, 
true));
fields.add(DataTypes.createStructField("temperature", 
DataTypes.DoubleType, true));
StructType schema = DataTypes.createStructType(fields);
JavaRDD rowRDD = rawData.map(
(Function) record -> {
String[] fields1 = record.split(",");
return RowFactory.create(
fields1[0].trim(),
Long.parseLong(fields1[1].trim()),
Double.parseDouble(fields1[2].trim()),
Double.parseDouble(fields1[3].trim()));
});
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
df.show(false);
df.registerTempTable("x_linkx1087571272_filtered");

sqlContext.sql("SELECT x_linkx1087571272_filtered.ASSET_ID, count(case 
when x_linkx1087571272_filtered" +
".temperature=325.0 then 1 else 0 end) AS xsumptionx1582594572, 
max(x_linkx1087571272_filtered" +
".TIMESTAMP) AS eventTime  FROM x_linkx1087571272_filtered 
GROUP BY x_linkx1087571272_filtered" +
".ASSET_ID").show(false);

sqlContext.sql("SELECT x_linkx1087571272_filtered.ASSET_ID, count(case 
when x_linkx1087571272_filtered" +
".fuel>99.8 then 1 else 0 end) AS xnsumptionx352569416, 
max(x_linkx1087571272_filtered.TIMESTAMP) AS " +
"eventTime  FROM x_linkx1087571272_filtered GROUP BY 
x_linkx1087571272_filtered.ASSET_ID").show(false);

//+
sqlContext.sql("SELECT x_linkx1087571272_filtered.ASSET_ID, count(case 
when x_linkx1087571272_filtered" +
".temperature==325.0 then 1 else 0 end) AS 
xsumptionx15825945

[jira] [Commented] (SPARK-20698) =, ==, > is not working as expected when used in sql query

2017-05-10 Thread someshwar kale (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16005955#comment-16005955
 ] 

someshwar kale commented on SPARK-20698:


Sorry [~srowen]..Silly question...will follow the ethics next time Thanks!!

> =, ==, > is not working as expected when used in sql query
> --
>
> Key: SPARK-20698
> URL: https://issues.apache.org/jira/browse/SPARK-20698
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.2
> Environment: windows
>Reporter: someshwar kale
>Priority: Critical
>
> I have written below spark program- its not working as expected
> {code}
> package computedBatch;
> import org.apache.log4j.Level;
> import org.apache.log4j.Logger;
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaRDD;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.sql.DataFrame;
> import org.apache.spark.sql.Row;
> import org.apache.spark.sql.RowFactory;
> import org.apache.spark.sql.SQLContext;
> import org.apache.spark.sql.hive.HiveContext;
> import org.apache.spark.sql.types.DataTypes;
> import org.apache.spark.sql.types.StructField;
> import org.apache.spark.sql.types.StructType;
> import java.util.ArrayList;
> import java.util.Arrays;
> import java.util.List;
> public class ArithmeticIssueTest {
> private transient JavaSparkContext javaSparkContext;
> private transient SQLContext sqlContext;
> public ArithmeticIssueTest() {
> Logger.getLogger("org").setLevel(Level.OFF);
> Logger.getLogger("akka").setLevel(Level.OFF);
> SparkConf conf = new 
> SparkConf().setAppName("ArithmeticIssueTest").setMaster("local[4]");
> javaSparkContext = new JavaSparkContext(conf);
> sqlContext = new HiveContext(javaSparkContext);
> }
> public static void main(String[] args) {
>   ArithmeticIssueTest arithmeticIssueTest = new ArithmeticIssueTest();
> arithmeticIssueTest.execute();
> }
> private void execute(){
> List data = Arrays.asList(
> "a1,1494389759,99.8793003568,325.389705932",
> "a1,1494389759,99.9472573803,325.27559502",
> "a1,1494389759,99.7887233987,325.334374851",
> "a1,1494389759,99.9547800925,325.371537062",
> "a1,1494389759,99.8039111691,325.305285877",
> "a1,1494389759,99.8342317379,325.24881354",
> "a1,1494389759,99.9849449235,325.396678931",
> "a1,1494389759,99.9396731311,325.336115345",
> "a1,1494389759,99.9320915068,325.242622938",
> "a1,1494389759,99.894669,325.320965146",
> "a1,1494389759,99.7735359781,325.345168334",
> "a1,1494389759,99.9698837734,325.352291407",
> "a1,1494389759,99.8418330703,325.296539372",
> "a1,1494389759,99.796315751,325.347570632",
> "a1,1494389759,99.7811931613,325.351137315",
> "a1,1494389759,99.9773765104,325.218131741",
> "a1,1494389759,99.8189825201,325.288197381",
> "a1,1494389759,99.8115005369,325.282327633",
> "a1,1494389759,99.9924539722,325.24048614",
> "a1,1494389759,99.9170191204,325.299431664");
> JavaRDD rawData = javaSparkContext.parallelize(data);
> List fields = new ArrayList<>();
> fields.add(DataTypes.createStructField("ASSET_ID", 
> DataTypes.StringType, true));
> fields.add(DataTypes.createStructField("TIMESTAMP", 
> DataTypes.LongType, true));
> fields.add(DataTypes.createStructField("fuel", DataTypes.DoubleType, 
> true));
> fields.add(DataTypes.createStructField("temperature", 
> DataTypes.DoubleType, true));
> StructType schema = DataTypes.createStructType(fields);
> JavaRDD rowRDD = rawData.map(
> (Function) record -> {
> String[] fields1 = record.split(",");
> return RowFactory.create(
> fields1[0].trim(),
> Long.parseLong(fields1[1].trim()),
> Double.parseDouble(fields1[2].trim()),
> Double.parseDouble(fields1[3].trim()));
> });
> DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
> df.show(false);
> df.registerTempTable("x_linkx1087571272_filtered");
> sqlContext.sql("SELECT x_linkx1087571272_filtered.ASSET_ID, 
> count(case when x_linkx1087571272_filtered" +
> ".temperature=325.0 then 1 else 0 end) AS 
> xsumptionx1582594572, max(x_linkx1087571272_filtered" +
>  

[jira] [Commented] (SPARK-48463) MLLib function unable to handle nested data

2024-06-06 Thread someshwar kale (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-48463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17852751#comment-17852751
 ] 

someshwar kale commented on SPARK-48463:


As a temporary fix you may consider renaming the columns by adding a 
transformer as below-
{code:java}
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
import org.apache.spark.ml.param.{ParamMap, Params}
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, 
Identifiable}
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.types.{StructField, StructType}

class RenameColumn(val uid: String) extends Transformer with Params
  with HasInputCol with HasOutputCol with DefaultParamsWritable  {
  def this() = this(Identifiable.randomUID("RenameColumn"))

  /** @group setParam */
  def setInputCol(value: String): this.type = set(inputCol, value)

  /** @group setParam */
  def setOutputCol(value: String): this.type = set(outputCol, value)

  def validateAndTransformSchema(schema: StructType): StructType = {
val col = schema(getInputCol)
schema.add(StructField(getOutputCol, col.dataType, col.nullable, 
col.metadata))
  }

  def transformSchema(schema: StructType): StructType = 
validateAndTransformSchema(schema)

  def copy(extra: ParamMap): RenameColumn = defaultCopy(extra)

  override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema, logging = true)
dataset.toDF().withColumnRenamed(getInputCol, getOutputCol)
  }
}

object RenameColumn extends DefaultParamsReadable[RenameColumn] {
  override def load(path: String): RenameColumn = super.load(path)
} {code}
and use the above transformer in the pipeline as below- 
{code:java}
val structureData = Seq(
  Row(Row(10, 12), 1000),
  Row(Row(12, 14), 4300),
  Row( Row(37, 891), 1400),
  Row(Row(8902, 12), 4000),
  Row(Row(12, 89), 1000)
)

val structureSchema = new StructType()
  .add("location", new StructType()
.add("longitude", IntegerType)
.add("latitude", IntegerType))
  .add("salary", IntegerType)
val df = spark.createDataFrame(spark.sparkContext.parallelize(structureData), 
structureSchema)

def flattenSchema(schema: StructType, prefix: String = null, prefixSelect: 
String = null):
Array[Column] = {
  schema.fields.flatMap(f => {
val colName = if (prefix == null) f.name else (prefix + "." + f.name)
val colnameSelect = if (prefix == null) f.name else (prefixSelect + "." + 
f.name)

f.dataType match {
  case st: StructType => flattenSchema(st, colName, colnameSelect)
  case _ =>
Array(col(colName).as(colnameSelect))
}
  })
}

val flattenColumns = flattenSchema(df.schema)
val flattenedDf = df.select(flattenColumns: _*)

flattenedDf.printSchema
flattenedDf.show()

val renameColumn = new 
RenameColumn().setInputCol("location.longitude").setOutputCol("location_longitude")
val si = new 
StringIndexer().setInputCol("location_longitude").setOutputCol("longitutdee")
val pipeline = new Pipeline().setStages(Array(renameColumn, si))
pipeline.fit(flattenedDf).transform(flattenedDf).show()

/**
 * +--+-+--+---+
 * |location_longitude|location.latitude|salary|longitutdee|
 * +--+-+--+---+
 * |10|   12|  1000|1.0|
 * |12|   14|  4300|0.0|
 * |37|  891|  1400|2.0|
 * |  8902|   12|  4000|3.0|
 * |12|   89|  1000|0.0|
 * +--+-+--+---+
 */ {code}

> MLLib function unable to handle nested data
> ---
>
> Key: SPARK-48463
> URL: https://issues.apache.org/jira/browse/SPARK-48463
> Project: Spark
>  Issue Type: Bug
>  Components: ML, MLlib
>Affects Versions: 3.5.1
>Reporter: Chhavi Bansal
>Priority: Major
>  Labels: ML, MLPipelines, mllib, nested
>
> I am trying to use feature transformer on nested data after flattening, but 
> it fails.
>  
> {code:java}
> val structureData = Seq(
>   Row(Row(10, 12), 1000),
>   Row(Row(12, 14), 4300),
>   Row( Row(37, 891), 1400),
>   Row(Row(8902, 12), 4000),
>   Row(Row(12, 89), 1000)
> )
> val structureSchema = new StructType()
>   .add("location", new StructType()
> .add("longitude", IntegerType)
> .add("latitude", IntegerType))
>   .add("salary", IntegerType) 
> val df = spark.createDataFrame(spark.sparkContext.parallelize(structureData), 
> structureSchema) 
> def flattenSchema(schema: StructType, prefix: String = null, prefixSelect: 
> String = null):
> Array[Column] = {
>   schema.fields.flatMap(f => {
> val colName = if (prefix == null) f.name els

[jira] [Commented] (SPARK-48463) MLLib function unable to handle nested data

2024-06-08 Thread someshwar kale (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-48463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853363#comment-17853363
 ] 

someshwar kale commented on SPARK-48463:


I don't think renaming 200+ (simple metadata modification) columns causes 
addition 200+ stages. Spark is enough intelligent to handle that well. Please 
check exec plan for more details.

Also, you can improve rename columns to rename multiple columns at once. This 
is, i think, least to do to unblock you till the bug is resolved.

> MLLib function unable to handle nested data
> ---
>
> Key: SPARK-48463
> URL: https://issues.apache.org/jira/browse/SPARK-48463
> Project: Spark
>  Issue Type: Bug
>  Components: ML, MLlib
>Affects Versions: 3.5.1
>Reporter: Chhavi Bansal
>Priority: Major
>  Labels: ML, MLPipelines, mllib, nested
>
> I am trying to use feature transformer on nested data after flattening, but 
> it fails.
>  
> {code:java}
> val structureData = Seq(
>   Row(Row(10, 12), 1000),
>   Row(Row(12, 14), 4300),
>   Row( Row(37, 891), 1400),
>   Row(Row(8902, 12), 4000),
>   Row(Row(12, 89), 1000)
> )
> val structureSchema = new StructType()
>   .add("location", new StructType()
> .add("longitude", IntegerType)
> .add("latitude", IntegerType))
>   .add("salary", IntegerType) 
> val df = spark.createDataFrame(spark.sparkContext.parallelize(structureData), 
> structureSchema) 
> def flattenSchema(schema: StructType, prefix: String = null, prefixSelect: 
> String = null):
> Array[Column] = {
>   schema.fields.flatMap(f => {
> val colName = if (prefix == null) f.name else (prefix + "." + f.name)
> val colnameSelect = if (prefix == null) f.name else (prefixSelect + "." + 
> f.name)
> f.dataType match {
>   case st: StructType => flattenSchema(st, colName, colnameSelect)
>   case _ =>
> Array(col(colName).as(colnameSelect))
> }
>   })
> }
> val flattenColumns = flattenSchema(df.schema)
> val flattenedDf = df.select(flattenColumns: _*){code}
> Now using the string indexer on the DOT notation.
>  
> {code:java}
> val si = new 
> StringIndexer().setInputCol("location.longitude").setOutputCol("longitutdee")
> val pipeline = new Pipeline().setStages(Array(si))
> pipeline.fit(flattenedDf).transform(flattenedDf).show() {code}
> The above code fails 
> {code:java}
> xception in thread "main" org.apache.spark.sql.AnalysisException: Cannot 
> resolve column name "location.longitude" among (location.longitude, 
> location.latitude, salary); did you mean to quote the `location.longitude` 
> column?
>     at 
> org.apache.spark.sql.errors.QueryCompilationErrors$.cannotResolveColumnNameAmongFieldsError(QueryCompilationErrors.scala:2261)
>     at 
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$resolveException(Dataset.scala:258)
>     at org.apache.spark.sql.Dataset.$anonfun$resolve$1(Dataset.scala:250)
> . {code}
> This points to the same failure as when we try to select dot notation columns 
> in a spark dataframe, which is solved using BACKTICKS *`column.name`.* 
> [https://stackoverflow.com/a/51430335/11688337]
>  
> *so next*
> I use the back ticks while defining stringIndexer
> {code:java}
> val si = new 
> StringIndexer().setInputCol("`location.longitude`").setOutputCol("longitutdee")
>  {code}
> In this case *it again fails* (with a diff reason) in the stringIndexer code 
> itself
> {code:java}
> Exception in thread "main" org.apache.spark.SparkException: Input column 
> `location.longitude` does not exist.
>     at 
> org.apache.spark.ml.feature.StringIndexerBase.$anonfun$validateAndTransformSchema$2(StringIndexer.scala:128)
>     at 
> scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:244)
>     at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
>     at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> {code}
>  
> This blocks me to use feature transformation functions on nested columns. 
> Any help in solving this problem will be highly appreciated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-48463) MLLib function unable to handle nested data

2024-06-08 Thread someshwar kale (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-48463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853459#comment-17853459
 ] 

someshwar kale commented on SPARK-48463:


Currently there is no way to handle backtick(`) spark StructType. Hence the 
field name a.b and `a.b` are completely different within StructType. 

To handle that, I have added a custom implementation fixing 
StringIndexer#validateAndTransformSchema. You can refer to the code on [my 
github|https://github.com/skale1990/LearnSpark/blob/main/src/main/java/com/som/learnspark/TestCustomStringIndexer.scala].

> MLLib function unable to handle nested data
> ---
>
> Key: SPARK-48463
> URL: https://issues.apache.org/jira/browse/SPARK-48463
> Project: Spark
>  Issue Type: Bug
>  Components: ML, MLlib
>Affects Versions: 3.5.1
>Reporter: Chhavi Bansal
>Priority: Major
>  Labels: ML, MLPipelines, mllib, nested
>
> I am trying to use feature transformer on nested data after flattening, but 
> it fails.
>  
> {code:java}
> val structureData = Seq(
>   Row(Row(10, 12), 1000),
>   Row(Row(12, 14), 4300),
>   Row( Row(37, 891), 1400),
>   Row(Row(8902, 12), 4000),
>   Row(Row(12, 89), 1000)
> )
> val structureSchema = new StructType()
>   .add("location", new StructType()
> .add("longitude", IntegerType)
> .add("latitude", IntegerType))
>   .add("salary", IntegerType) 
> val df = spark.createDataFrame(spark.sparkContext.parallelize(structureData), 
> structureSchema) 
> def flattenSchema(schema: StructType, prefix: String = null, prefixSelect: 
> String = null):
> Array[Column] = {
>   schema.fields.flatMap(f => {
> val colName = if (prefix == null) f.name else (prefix + "." + f.name)
> val colnameSelect = if (prefix == null) f.name else (prefixSelect + "." + 
> f.name)
> f.dataType match {
>   case st: StructType => flattenSchema(st, colName, colnameSelect)
>   case _ =>
> Array(col(colName).as(colnameSelect))
> }
>   })
> }
> val flattenColumns = flattenSchema(df.schema)
> val flattenedDf = df.select(flattenColumns: _*){code}
> Now using the string indexer on the DOT notation.
>  
> {code:java}
> val si = new 
> StringIndexer().setInputCol("location.longitude").setOutputCol("longitutdee")
> val pipeline = new Pipeline().setStages(Array(si))
> pipeline.fit(flattenedDf).transform(flattenedDf).show() {code}
> The above code fails 
> {code:java}
> xception in thread "main" org.apache.spark.sql.AnalysisException: Cannot 
> resolve column name "location.longitude" among (location.longitude, 
> location.latitude, salary); did you mean to quote the `location.longitude` 
> column?
>     at 
> org.apache.spark.sql.errors.QueryCompilationErrors$.cannotResolveColumnNameAmongFieldsError(QueryCompilationErrors.scala:2261)
>     at 
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$resolveException(Dataset.scala:258)
>     at org.apache.spark.sql.Dataset.$anonfun$resolve$1(Dataset.scala:250)
> . {code}
> This points to the same failure as when we try to select dot notation columns 
> in a spark dataframe, which is solved using BACKTICKS *`column.name`.* 
> [https://stackoverflow.com/a/51430335/11688337]
>  
> *so next*
> I use the back ticks while defining stringIndexer
> {code:java}
> val si = new 
> StringIndexer().setInputCol("`location.longitude`").setOutputCol("longitutdee")
>  {code}
> In this case *it again fails* (with a diff reason) in the stringIndexer code 
> itself
> {code:java}
> Exception in thread "main" org.apache.spark.SparkException: Input column 
> `location.longitude` does not exist.
>     at 
> org.apache.spark.ml.feature.StringIndexerBase.$anonfun$validateAndTransformSchema$2(StringIndexer.scala:128)
>     at 
> scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:244)
>     at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
>     at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> {code}
>  
> This blocks me to use feature transformation functions on nested columns. 
> Any help in solving this problem will be highly appreciated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org