GitHub user HyukjinKwon opened a pull request:

    https://github.com/apache/spark/pull/15072

    [SPARK-17123][SQL] Use type-widen encoder for DataFrame rather than 
existing encoder to allow type-widening from set operations

    ## What changes were proposed in this pull request?
    
    This PR fixes set operations in `DataFrame` to be performed fine without 
exceptions when the types are non-scala native types. (e.g, `TimestampType`, 
`DateType` and `DecimalType`). 
    
    The problem is, it seems set operations such as `union`, `intersect` and 
`except` uses the encoder belonging to the `Dataset` in caller.
    
    So, `Dataset` of the caller holds `ExpressionEncoder[Row]` as it is when 
the set operations are performed. However, the return types can be actually 
widen. So, we should use `ExpressionEncoder[Row]` constructed from executed 
plan rather than using existing one. Otherwise, this will generate some codes 
wrongly via `StaticInvoke`.
    
    Running the codes below:
    
    ```scala
    val dates = Seq(
      (new Date(0), BigDecimal.valueOf(1), new Timestamp(2)),
      (new Date(3), BigDecimal.valueOf(4), new Timestamp(5))
    ).toDF("date", "timestamp", "decimal")
    
    val widenTypedRows = Seq(
      (new Timestamp(2), 10.5D, "string")
    ).toDF("date", "timestamp", "decimal")
    
    val results = dates.union(widenTypedRows).collect()
    results.foreach(println)
    ```
    
    prints below:
    
    **Before**
    
    ```java
    /* 001 */ public java.lang.Object generate(Object[] references) {
    /* 002 */   return new SpecificSafeProjection(references);
    /* 003 */ }
    /* 004 */
    /* 005 */ class SpecificSafeProjection extends 
org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
    /* 006 */
    /* 007 */   private Object[] references;
    /* 008 */   private MutableRow mutableRow;
    /* 009 */   private Object[] values;
    /* 010 */   private org.apache.spark.sql.types.StructType schema;
    /* 011 */
    /* 012 */
    /* 013 */   public SpecificSafeProjection(Object[] references) {
    /* 014 */     this.references = references;
    /* 015 */     mutableRow = (MutableRow) references[references.length - 1];
    /* 016 */
    /* 017 */     this.schema = (org.apache.spark.sql.types.StructType) 
references[0];
    /* 018 */   }
    /* 019 */
    /* 020 */   public java.lang.Object apply(java.lang.Object _i) {
    /* 021 */     InternalRow i = (InternalRow) _i;
    /* 022 */
    /* 023 */     values = new Object[3];
    /* 024 */
    /* 025 */     boolean isNull2 = i.isNullAt(0);
    /* 026 */     long value2 = isNull2 ? -1L : (i.getLong(0));
    /* 027 */     boolean isNull1 = isNull2;
    /* 028 */     final java.sql.Date value1 = isNull1 ? null : 
org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaDate(value2);
    /* 029 */     isNull1 = value1 == null;
    /* 030 */     if (isNull1) {
    /* 031 */       values[0] = null;
    /* 032 */     } else {
    /* 033 */       values[0] = value1;
    /* 034 */     }
    /* 035 */
    /* 036 */     boolean isNull4 = i.isNullAt(1);
    /* 037 */     double value4 = isNull4 ? -1.0 : (i.getDouble(1));
    /* 038 */
    /* 039 */     boolean isNull3 = isNull4;
    /* 040 */     java.math.BigDecimal value3 = null;
    /* 041 */     if (!isNull3) {
    /* 042 */
    /* 043 */       Object funcResult = null;
    /* 044 */       funcResult = value4.toJavaBigDecimal();
    /* 045 */       if (funcResult == null) {
    /* 046 */         isNull3 = true;
    /* 047 */       } else {
    /* 048 */         value3 = (java.math.BigDecimal) funcResult;
    /* 049 */       }
    /* 050 */
    /* 051 */     }
    /* 052 */     isNull3 = value3 == null;
    /* 053 */     if (isNull3) {
    /* 054 */       values[1] = null;
    /* 055 */     } else {
    /* 056 */       values[1] = value3;
    /* 057 */     }
    /* 058 */
    /* 059 */     boolean isNull6 = i.isNullAt(2);
    /* 060 */     UTF8String value6 = isNull6 ? null : (i.getUTF8String(2));
    /* 061 */     boolean isNull5 = isNull6;
    /* 062 */     final java.sql.Timestamp value5 = isNull5 ? null : 
org.apache.spark.sql.catalyst.util.DateTimeUtils.toJavaTimestamp(value6);
    /* 063 */     isNull5 = value5 == null;
    /* 064 */     if (isNull5) {
    /* 065 */       values[2] = null;
    /* 066 */     } else {
    /* 067 */       values[2] = value5;
    /* 068 */     }
    /* 069 */
    /* 070 */     final org.apache.spark.sql.Row value = new 
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema(values, schema);
    /* 071 */     if (false) {
    /* 072 */       mutableRow.setNullAt(0);
    /* 073 */     } else {
    /* 074 */
    /* 075 */       mutableRow.update(0, value);
    /* 076 */     }
    /* 077 */
    /* 078 */     return mutableRow;
    /* 079 */   }
    /* 080 */ }
    ```
    
    
    **After**
    
    ```bash
    [1969-12-31 00:00:00.0,1.0,1969-12-31 16:00:00.002]
    [1969-12-31 00:00:00.0,4.0,1969-12-31 16:00:00.005]
    [1969-12-31 16:00:00.002,10.5,string]
    ```
    
    
    
    
    ## How was this patch tested?
    
    Unit tests in `DataFrameSuite`


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HyukjinKwon/spark SPARK-17123

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/15072.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #15072
    
----
commit 613181081e8edac494205c9f40d2bce5e99b86fc
Author: hyukjinkwon <gurwls...@gmail.com>
Date:   2016-09-13T05:36:00Z

    Use type-widen encoders for DataFrame

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to