Dear Michael, dear all, a minimal example is listed below.
After some further analysis I could figure out, that the problem is related to the *leftOuterJoinWithRemovalOfEqualColumn*-Method, as I use columns of the left and right dataframes when doing the select on the joined table. /** * Customized left outer join on common column. */ def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: DataFrame, commonColumnName: String): DataFrame = { val leftColumns = leftDF.columns.map((cn: String) => leftDF(cn)) val rightColumns = rightDF.columns.filterNot(cn => cn.equals(commonColumnName)).map(cn => rightDF(cn)) leftDF.join(rightDF, leftDF(commonColumnName) === rightDF(commonColumnName), "leftouter") .select(leftColumns ++ rightColumns: _*) } As the column "y" of the right table has nullable=false, this is then also transferred to the joined-Table y-Column, as I use rightDF( "y" ). Thus, I need to use columns of the joined table for the select. *Question now: The joined table has column names "x", "a", "x", "y". How do I discard the second x column?* All my approaches failed (assuming here, that joinedDF is the joined DataFrame. - Using joinedDFdrop( "x" ) discards both "x" columns. - Using joinedDF("x") does not work as it is ambigious - Also using rightDF.as( "aliasname") in order to differentiate the column "x" (from left DataFrame) with "x" (from right DataFrame) did not work out, as I found no way as use select( $"aliasname.x") really programmatically. Could someone sketch the code? Any help welcome, thanks Martin ======================================== import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.sql.{DataFrame, SQLContext} object OtherEntities { case class Record( x:Int, a: String) case class Mapping( x: Int, y: Int ) val records = Seq( Record(1, "hello"), Record(2, "bob")) val mappings = Seq( Mapping(2, 5) ) } object MinimalShowcase { /** * Customized left outer join on common column. */ def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: DataFrame, commonColumnName: String): DataFrame = { val leftColumns = leftDF.columns.map((cn: String) => leftDF(cn)) val rightColumns = rightDF.columns.filterNot(cn => cn.equals(commonColumnName)).map(cn => rightDF(cn)) leftDF.join(rightDF, leftDF(commonColumnName) === rightDF(commonColumnName), "leftouter") .select(leftColumns ++ rightColumns: _*) } /** * Set, if a column is nullable. * @param df source DataFrame * @param cn is the column name to change * @param nullable is the flag to set, such that the column is either nullable or not */ def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = { val schema = df.schema val newSchema = StructType(schema.map { case StructField( c, t, _, m) if c.equals(cn) => StructField( c, t, nullable = nullable, m) case y: StructField => y }) df.sqlContext.createDataFrame( df.rdd, newSchema) } def main (args: Array[String]) { val conf = new SparkConf() .setAppName("Minimal") .setMaster("local[*]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) // used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ val recordDF = sc.parallelize(OtherEntities.records, 4).toDF() val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF() val mappingWithNullDF = setNullableStateOfColumn(mappingDF, "y", true) val joinedDF = recordDF.join(mappingDF, recordDF("x") === mappingDF("x"), "leftouter") println("joinedDF:") joinedDF.show joinedDF.printSchema joinedDF.filter(joinedDF("y").isNotNull).show // joinedDF: // +-+-----+----+----+ // |x| a| x| y| // +-+-----+----+----+ // |1|hello|null|null| // |2| bob| 2| 5| // +-+-----+----+----+ // // root // |-- x: integer (nullable = false) // |-- a: string (nullable = true) // |-- x: integer (nullable = true) // |-- y: integer (nullable = true) // // +-+---+-+-+ // |x| a|x|y| // +-+---+-+-+ // |2|bob|2|5| // +-+---+-+-+ val extrajoinedDF = leftOuterJoinWithRemovalOfEqualColumn(recordDF, mappingDF, "x") println("extrajoinedDF:") extrajoinedDF.show extrajoinedDF.printSchema extrajoinedDF.filter(extrajoinedDF("y").isNotNull).show // extrajoinedDF: // +-+-----+----+ // |x| a| y| // +-+-----+----+ // |1|hello|null| // |2| bob| 5| // +-+-----+----+ // // root // |-- x: integer (nullable = false) // |-- a: string (nullable = true) // |-- y: integer (nullable = false) // // +-+-----+----+ // |x| a| y| // +-+-----+----+ // |1|hello|null| // |2| bob| 5| // +-+-----+----+ val joined2DF = recordDF.join(mappingWithNullDF, recordDF("x") === mappingWithNullDF("x"), "leftouter") println("joined2DF:") joined2DF.show joined2DF.printSchema joined2DF.filter(joined2DF("y").isNotNull).show // joined2DF: // +-+-----+----+----+ // |x| a| x| y| // +-+-----+----+----+ // |1|hello|null|null| // |2| bob| 2| 5| // +-+-----+----+----+ // // root // |-- x: integer (nullable = false) // |-- a: string (nullable = true) // |-- x: integer (nullable = true) // |-- y: integer (nullable = true) // // +-+---+-+-+ // |x| a|x|y| // +-+---+-+-+ // |2|bob|2|5| // +-+---+-+-+ } } 2015-07-31 1:56 GMT+02:00 Martin Senne <martin.se...@googlemail.com>: > Dear Michael, dear all, > > distinguishing those records that have a match in mapping from those that > don't is the crucial point. > > Record(x : Int, a: String) > Mapping(x: Int, y: Int) > > Thus > > Record(1, "hello") > Record(2, "bob") > Mapping(2, 5) > > yield (2, "bob", 5) on an inner join. > BUT I'm also interested in (1, "hello", null) as there is no counterpart > in mapping (this is the left outer join part) > > I need to distinguish 1 and 2 because of later inserts (case 1, hello) or > updates (case 2, bon). > > Cheers and thanks, > > Martin > > Am 30.07.2015 22:58 schrieb "Michael Armbrust" <mich...@databricks.com>: > > > > Perhaps I'm missing what you are trying to accomplish, but if you'd like > to avoid the null values do an inner join instead of an outer join. > > > > Additionally, I'm confused about how the result > of joinedDF.filter(joinedDF("y").isNotNull).show still contains null values > in the column y. This doesn't really have anything to do with nullable, > which is only a hint to the system so that we can avoid null checking when > we know that there are no null values. If you provide the full code i can > try and see if this is a bug. > > > > On Thu, Jul 30, 2015 at 11:53 AM, Martin Senne < > martin.se...@googlemail.com> wrote: > >> > >> Dear Michael, dear all, > >> > >> motivation: > >> > >> object OtherEntities { > >> > >> case class Record( x:Int, a: String) > >> case class Mapping( x: Int, y: Int ) > >> > >> val records = Seq( Record(1, "hello"), Record(2, "bob")) > >> val mappings = Seq( Mapping(2, 5) ) > >> } > >> > >> Now I want to perform an left outer join on records and mappings (with > the ON JOIN criterion on columns (recordDF("x") === mappingDF("x") .... > shorthand is in leftOuterJoinWithRemovalOfEqualColumn > >> > >> val sqlContext = new SQLContext(sc) > >> // used to implicitly convert an RDD to a DataFrame. > >> import sqlContext.implicits._ > >> > >> val recordDF= sc.parallelize(OtherEntities.records, 4).toDF() > >> val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF() > >> > >> val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn( > mappingDF, "x") > >> > >> joinedDF.filter(joinedDF("y").isNotNull).show > >> > >> > >> Currently, the output is > >> > >> > +-+-----+----+ > > >> |x| a| y| > >> +-+-----+----+ > >> |1|hello|null| > >> |2| bob| 5| > >> +-+-----+----+ > >> > >> instead of > >> > >> > +-+---+-+ > > >> |x| a|y| > >> +-+---+-+ > >> |2|bob|5| > >> +-+---+-+ > >> > >> The last output can be achieved by the method of changing > nullable=false to nullable=true described in my first post. > >> > >> Thus, I need this schema modification as to make outer joins work. > >> > >> Cheers and thanks, > >> > >> Martin > >> > >> > >> > >> 2015-07-30 20:23 GMT+02:00 Michael Armbrust <mich...@databricks.com>: > >>> > >>> We don't yet updated nullability information based on predicates as we > don't actually leverage this information in many places yet. Why do you > want to update the schema? > >>> > >>> On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 < > martin.se...@googlemail.com> wrote: > >>>> > >>>> Hi all, > >>>> > >>>> 1. *Columns in dataframes can be nullable and not nullable. Having a > >>>> nullable column of Doubles, I can use the following Scala code to > filter all > >>>> "non-null" rows:* > >>>> > >>>> val df = ..... // some code that creates a DataFrame > >>>> df.filter( df("columnname").isNotNull() ) > >>>> > >>>> +-+-----+----+ > >>>> |x| a| y| > >>>> +-+-----+----+ > >>>> |1|hello|null| > >>>> |2| bob| 5| > >>>> +-+-----+----+ > >>>> > >>>> root > >>>> |-- x: integer (nullable = false) > >>>> |-- a: string (nullable = true) > >>>> |-- y: integer (nullable = true) > >>>> > >>>> And with the filter expression > >>>> +-+---+-+ > >>>> |x| a|y| > >>>> +-+---+-+ > >>>> |2|bob|5| > >>>> +-+---+-+ > >>>> > >>>> > >>>> Unfortunetaly and while this is a true for a nullable column > (according to > >>>> df.printSchema), it is not true for a column that is not nullable: > >>>> > >>>> > >>>> +-+-----+----+ > >>>> |x| a| y| > >>>> +-+-----+----+ > >>>> |1|hello|null| > >>>> |2| bob| 5| > >>>> +-+-----+----+ > >>>> > >>>> root > >>>> |-- x: integer (nullable = false) > >>>> |-- a: string (nullable = true) > >>>> |-- y: integer (nullable = false) > >>>> > >>>> +-+-----+----+ > >>>> |x| a| y| > >>>> +-+-----+----+ > >>>> |1|hello|null| > >>>> |2| bob| 5| > >>>> +-+-----+----+ > >>>> > >>>> such that the output is not affected by the filter. Is this intended? > >>>> > >>>> > >>>> 2. *What is the cheapest (in sense of performance) to turn a > non-nullable > >>>> column into a nullable column? > >>>> A came uo with this:* > >>>> > >>>> /** > >>>> * Set, if a column is nullable. > >>>> * @param df source DataFrame > >>>> * @param cn is the column name to change > >>>> * @param nullable is the flag to set, such that the column is > either > >>>> nullable or not > >>>> */ > >>>> def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: > >>>> Boolean) : DataFrame = { > >>>> > >>>> val schema = df.schema > >>>> val newSchema = StructType(schema.map { > >>>> case StructField( c, t, _, m) if c.equals(cn) => StructField( > c, t, > >>>> nullable = nullable, m) > >>>> case y: StructField => y > >>>> }) > >>>> df.sqlContext.createDataFrame( df.rdd, newSchema) > >>>> } > >>>> > >>>> Is there a cheaper solution? > >>>> > >>>> 3. *Any comments?* > >>>> > >>>> Cheers and thx in advance, > >>>> > >>>> Martin > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> -- > >>>> View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html > >>>> Sent from the Apache Spark User List mailing list archive at > Nabble.com. > >>>> > >>>> --------------------------------------------------------------------- > >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > >>>> For additional commands, e-mail: user-h...@spark.apache.org > >>>> > >>> > >> > > > >