Dear Michael, dear all,

a minimal example is listed below.

After some further analysis I could figure out, that the problem is related
to the *leftOuterJoinWithRemovalOfEqualColumn*-Method, as I use columns of
the left and right dataframes when doing the select on the joined table.

  /**
   * Customized left outer join on common column.
   */
  def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame,
rightDF: DataFrame, commonColumnName: String): DataFrame = {

    val leftColumns = leftDF.columns.map((cn: String) => leftDF(cn))
    val rightColumns = rightDF.columns.filterNot(cn =>
cn.equals(commonColumnName)).map(cn => rightDF(cn))

    leftDF.join(rightDF, leftDF(commonColumnName) ===
rightDF(commonColumnName), "leftouter")
      .select(leftColumns ++ rightColumns: _*)
  }

As the column "y" of the right table has nullable=false, this is then
also transferred to the joined-Table y-Column, as I use rightDF( "y"
).

Thus, I need to use columns of the joined table for the select.

*Question now: The joined table has column names "x", "a", "x", "y".
How do I discard the second x column?*

All my approaches failed (assuming here, that joinedDF is the joined DataFrame.


   - Using joinedDFdrop( "x" ) discards both "x" columns.
   - Using joinedDF("x") does not work as it is ambigious
   - Also using rightDF.as( "aliasname")  in order to differentiate the
   column "x" (from left DataFrame) with "x" (from right DataFrame) did not
   work out, as I found no way as use select( $"aliasname.x") really
   programmatically. Could someone sketch the code?

Any help welcome, thanks


Martin



========================================
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{DataFrame, SQLContext}

object OtherEntities {

  case class Record( x:Int, a: String)
  case class Mapping( x: Int, y: Int )

  val records = Seq( Record(1, "hello"), Record(2, "bob"))
  val mappings = Seq( Mapping(2, 5) )
}

object MinimalShowcase {

  /**
   * Customized left outer join on common column.
   */
  def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame,
rightDF: DataFrame, commonColumnName: String): DataFrame = {

    val leftColumns = leftDF.columns.map((cn: String) => leftDF(cn))
    val rightColumns = rightDF.columns.filterNot(cn =>
cn.equals(commonColumnName)).map(cn => rightDF(cn))

    leftDF.join(rightDF, leftDF(commonColumnName) ===
rightDF(commonColumnName), "leftouter")
      .select(leftColumns ++ rightColumns: _*)
  }


  /**
   * Set, if a column is nullable.
   * @param df source DataFrame
   * @param cn is the column name to change
   * @param nullable is the flag to set, such that the column is
either nullable or not
   */
  def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
Boolean) : DataFrame = {

    val schema = df.schema
    val newSchema = StructType(schema.map {
      case StructField( c, t, _, m) if c.equals(cn) => StructField( c,
t, nullable = nullable, m)
      case y: StructField => y
    })
    df.sqlContext.createDataFrame( df.rdd, newSchema)
  }


  def main (args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("Minimal")
      .setMaster("local[*]")

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    // used to implicitly convert an RDD to a DataFrame.
    import sqlContext.implicits._

    val recordDF = sc.parallelize(OtherEntities.records, 4).toDF()
    val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()
    val mappingWithNullDF = setNullableStateOfColumn(mappingDF, "y", true)

    val joinedDF = recordDF.join(mappingDF, recordDF("x") ===
mappingDF("x"), "leftouter")
    println("joinedDF:")
    joinedDF.show
    joinedDF.printSchema
    joinedDF.filter(joinedDF("y").isNotNull).show

//    joinedDF:
//    +-+-----+----+----+
//    |x|    a|   x|   y|
//    +-+-----+----+----+
//    |1|hello|null|null|
//    |2|  bob|   2|   5|
//    +-+-----+----+----+
//
//    root
//    |-- x: integer (nullable = false)
//    |-- a: string (nullable = true)
//    |-- x: integer (nullable = true)
//    |-- y: integer (nullable = true)
//
//    +-+---+-+-+
//    |x|  a|x|y|
//    +-+---+-+-+
//    |2|bob|2|5|
//    +-+---+-+-+


    val extrajoinedDF =
leftOuterJoinWithRemovalOfEqualColumn(recordDF, mappingDF, "x")
    println("extrajoinedDF:")
    extrajoinedDF.show
    extrajoinedDF.printSchema
    extrajoinedDF.filter(extrajoinedDF("y").isNotNull).show

//    extrajoinedDF:
//    +-+-----+----+
//    |x|    a|   y|
//    +-+-----+----+
//    |1|hello|null|
//    |2|  bob|   5|
//    +-+-----+----+
//
//    root
//    |-- x: integer (nullable = false)
//    |-- a: string (nullable = true)
//    |-- y: integer (nullable = false)
//
//    +-+-----+----+
//    |x|    a|   y|
//    +-+-----+----+
//    |1|hello|null|
//    |2|  bob|   5|
//    +-+-----+----+



    val joined2DF = recordDF.join(mappingWithNullDF, recordDF("x") ===
mappingWithNullDF("x"), "leftouter")
    println("joined2DF:")
    joined2DF.show
    joined2DF.printSchema
    joined2DF.filter(joined2DF("y").isNotNull).show

//    joined2DF:
//    +-+-----+----+----+
//    |x|    a|   x|   y|
//    +-+-----+----+----+
//    |1|hello|null|null|
//    |2|  bob|   2|   5|
//    +-+-----+----+----+
//
//    root
//    |-- x: integer (nullable = false)
//    |-- a: string (nullable = true)
//    |-- x: integer (nullable = true)
//    |-- y: integer (nullable = true)
//
//    +-+---+-+-+
//    |x|  a|x|y|
//    +-+---+-+-+
//    |2|bob|2|5|
//    +-+---+-+-+

  }
}



2015-07-31 1:56 GMT+02:00 Martin Senne <martin.se...@googlemail.com>:

> Dear Michael, dear all,
>
> distinguishing those records that have a match in mapping from those that
> don't is the crucial point.
>
> Record(x : Int,  a: String)
> Mapping(x: Int, y: Int)
>
> Thus
>
> Record(1, "hello")
> Record(2, "bob")
> Mapping(2, 5)
>
> yield (2, "bob", 5) on an inner join.
> BUT I'm also interested in (1, "hello", null) as there is no counterpart
> in mapping (this is the left outer join part)
>
> I need to distinguish 1 and 2 because of later inserts (case 1, hello) or
> updates (case 2, bon).
>
> Cheers and thanks,
>
> Martin
>
> Am 30.07.2015 22:58 schrieb "Michael Armbrust" <mich...@databricks.com>:
> >
> > Perhaps I'm missing what you are trying to accomplish, but if you'd like
> to avoid the null values do an inner join instead of an outer join.
> >
> > Additionally, I'm confused about how the result
> of joinedDF.filter(joinedDF("y").isNotNull).show still contains null values
> in the column y. This doesn't really have anything to do with nullable,
> which is only a hint to the system so that we can avoid null checking when
> we know that there are no null values. If you provide the full code i can
> try and see if this is a bug.
> >
> > On Thu, Jul 30, 2015 at 11:53 AM, Martin Senne <
> martin.se...@googlemail.com> wrote:
> >>
> >> Dear Michael, dear all,
> >>
> >> motivation:
> >>
> >> object OtherEntities {
> >>
> >>   case class Record( x:Int, a: String)
> >>   case class Mapping( x: Int, y: Int )
> >>
> >>   val records = Seq( Record(1, "hello"), Record(2, "bob"))
> >>   val mappings = Seq( Mapping(2, 5) )
> >> }
> >>
> >> Now I want to perform an left outer join on records and mappings (with
> the ON JOIN criterion on columns (recordDF("x") === mappingDF("x") ....
> shorthand is in leftOuterJoinWithRemovalOfEqualColumn
> >>
> >> val sqlContext = new SQLContext(sc)
> >> // used to implicitly convert an RDD to a DataFrame.
> >> import sqlContext.implicits._
> >>
> >> val recordDF= sc.parallelize(OtherEntities.records, 4).toDF()
> >> val mappingDF = sc.parallelize(OtherEntities.mappings, 4).toDF()
> >>
> >> val joinedDF = recordDF.leftOuterJoinWithRemovalOfEqualColumn(
> mappingDF, "x")
> >>
> >> joinedDF.filter(joinedDF("y").isNotNull).show
> >>
> >>
> >> Currently, the output is
> >>
> >>
> +-+-----+----+
>
> >> |x|    a|   y|
> >> +-+-----+----+
> >> |1|hello|null|
> >> |2|  bob|   5|
> >> +-+-----+----+
> >>
> >> instead of
> >>
> >>
> +-+---+-+
>
> >> |x|  a|y|
> >> +-+---+-+
> >> |2|bob|5|
> >> +-+---+-+
> >>
> >> The last output can be achieved by the method of changing
> nullable=false to nullable=true described in my first post.
> >>
> >> Thus, I need this schema modification as to make outer joins work.
> >>
> >> Cheers and thanks,
> >>
> >> Martin
> >>
> >>
> >>
> >> 2015-07-30 20:23 GMT+02:00 Michael Armbrust <mich...@databricks.com>:
> >>>
> >>> We don't yet updated nullability information based on predicates as we
> don't actually leverage this information in many places yet.  Why do you
> want to update the schema?
> >>>
> >>> On Thu, Jul 30, 2015 at 11:19 AM, martinibus77 <
> martin.se...@googlemail.com> wrote:
> >>>>
> >>>> Hi all,
> >>>>
> >>>> 1. *Columns in dataframes can be nullable and not nullable. Having a
> >>>> nullable column of Doubles, I can use the following Scala code to
> filter all
> >>>> "non-null" rows:*
> >>>>
> >>>>   val df = ..... // some code that creates a DataFrame
> >>>>   df.filter( df("columnname").isNotNull() )
> >>>>
> >>>> +-+-----+----+
> >>>> |x|    a|   y|
> >>>> +-+-----+----+
> >>>> |1|hello|null|
> >>>> |2|  bob|   5|
> >>>> +-+-----+----+
> >>>>
> >>>> root
> >>>>  |-- x: integer (nullable = false)
> >>>>  |-- a: string (nullable = true)
> >>>>  |-- y: integer (nullable = true)
> >>>>
> >>>> And with the filter expression
> >>>> +-+---+-+
> >>>> |x|  a|y|
> >>>> +-+---+-+
> >>>> |2|bob|5|
> >>>> +-+---+-+
> >>>>
> >>>>
> >>>> Unfortunetaly and while this is a true for a nullable column
> (according to
> >>>> df.printSchema), it is not true for a column that is not nullable:
> >>>>
> >>>>
> >>>> +-+-----+----+
> >>>> |x|    a|   y|
> >>>> +-+-----+----+
> >>>> |1|hello|null|
> >>>> |2|  bob|   5|
> >>>> +-+-----+----+
> >>>>
> >>>> root
> >>>>  |-- x: integer (nullable = false)
> >>>>  |-- a: string (nullable = true)
> >>>>  |-- y: integer (nullable = false)
> >>>>
> >>>> +-+-----+----+
> >>>> |x|    a|   y|
> >>>> +-+-----+----+
> >>>> |1|hello|null|
> >>>> |2|  bob|   5|
> >>>> +-+-----+----+
> >>>>
> >>>> such that the output is not affected by the filter. Is this intended?
> >>>>
> >>>>
> >>>> 2. *What is the cheapest (in sense of performance) to turn a
> non-nullable
> >>>> column into a nullable column?
> >>>> A came uo with this:*
> >>>>
> >>>>   /**
> >>>>    * Set, if a column is nullable.
> >>>>    * @param df source DataFrame
> >>>>    * @param cn is the column name to change
> >>>>    * @param nullable is the flag to set, such that the column is
> either
> >>>> nullable or not
> >>>>    */
> >>>>   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable:
> >>>> Boolean) : DataFrame = {
> >>>>
> >>>>     val schema = df.schema
> >>>>     val newSchema = StructType(schema.map {
> >>>>       case StructField( c, t, _, m) if c.equals(cn) => StructField(
> c, t,
> >>>> nullable = nullable, m)
> >>>>       case y: StructField => y
> >>>>     })
> >>>>     df.sqlContext.createDataFrame( df.rdd, newSchema)
> >>>>   }
> >>>>
> >>>> Is there a cheaper solution?
> >>>>
> >>>> 3. *Any comments?*
> >>>>
> >>>> Cheers and thx in advance,
> >>>>
> >>>> Martin
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> --
> >>>> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-DataFrame-Nullable-column-and-filtering-tp24087.html
> >>>> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >>>>
> >>>> ---------------------------------------------------------------------
> >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >>>> For additional commands, e-mail: user-h...@spark.apache.org
> >>>>
> >>>
> >>
> >
>
>

Reply via email to