http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/avro/ParquetAvroCompat.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/avro/ParquetAvroCompat.java
 
b/sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/avro/ParquetAvroCompat.java
deleted file mode 100644
index 354c9d7..0000000
--- 
a/sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/avro/ParquetAvroCompat.java
+++ /dev/null
@@ -1,1001 +0,0 @@
-/**
- * Autogenerated by Avro
- * 
- * DO NOT EDIT DIRECTLY
- */
-package org.apache.spark.sql.parquet.test.avro;  
-@SuppressWarnings("all")
-@org.apache.avro.specific.AvroGenerated
-public class ParquetAvroCompat extends 
org.apache.avro.specific.SpecificRecordBase implements 
org.apache.avro.specific.SpecificRecord {
-  public static final org.apache.avro.Schema SCHEMA$ = new 
org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"ParquetAvroCompat\",\"namespace\":\"org.apache.spark.sql.parquet.test.avro\",\"fields\":[{\"name\":\"bool_column\",\"type\":\"boolean\"},{\"name\":\"int_column\",\"type\":\"int\"},{\"name\":\"long_column\",\"type\":\"long\"},{\"name\":\"float_column\",\"type\":\"float\"},{\"name\":\"double_column\",\"type\":\"double\"},{\"name\":\"binary_column\",\"type\":\"bytes\"},{\"name\":\"string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"maybe_bool_column\",\"type\":[\"null\",\"boolean\"]},{\"name\":\"maybe_int_column\",\"type\":[\"null\",\"int\"]},{\"name\":\"maybe_long_column\",\"type\":[\"null\",\"long\"]},{\"name\":\"maybe_float_column\",\"type\":[\"null\",\"float\"]},{\"name\":\"maybe_double_column\",\"type\":[\"null\",\"double\"]},{\"name\":\"maybe_binary_column\",\"type\":[\"null\",\"bytes\"]},{\"name\":\"maybe_string
 
_column\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}]},{\"name\":\"strings_column\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}},{\"name\":\"string_to_int_column\",\"type\":{\"type\":\"map\",\"values\":\"int\",\"avro.java.string\":\"String\"}},{\"name\":\"complex_column\",\"type\":{\"type\":\"map\",\"values\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Nested\",\"fields\":[{\"name\":\"nested_ints_column\",\"type\":{\"type\":\"array\",\"items\":\"int\"}},{\"name\":\"nested_string_column\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}},\"avro.java.string\":\"String\"}}]}");
-  public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
-  @Deprecated public boolean bool_column;
-  @Deprecated public int int_column;
-  @Deprecated public long long_column;
-  @Deprecated public float float_column;
-  @Deprecated public double double_column;
-  @Deprecated public java.nio.ByteBuffer binary_column;
-  @Deprecated public java.lang.String string_column;
-  @Deprecated public java.lang.Boolean maybe_bool_column;
-  @Deprecated public java.lang.Integer maybe_int_column;
-  @Deprecated public java.lang.Long maybe_long_column;
-  @Deprecated public java.lang.Float maybe_float_column;
-  @Deprecated public java.lang.Double maybe_double_column;
-  @Deprecated public java.nio.ByteBuffer maybe_binary_column;
-  @Deprecated public java.lang.String maybe_string_column;
-  @Deprecated public java.util.List<java.lang.String> strings_column;
-  @Deprecated public java.util.Map<java.lang.String,java.lang.Integer> 
string_to_int_column;
-  @Deprecated public 
java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>>
 complex_column;
-
-  /**
-   * Default constructor.  Note that this does not initialize fields
-   * to their default values from the schema.  If that is desired then
-   * one should use <code>newBuilder()</code>. 
-   */
-  public ParquetAvroCompat() {}
-
-  /**
-   * All-args constructor.
-   */
-  public ParquetAvroCompat(java.lang.Boolean bool_column, java.lang.Integer 
int_column, java.lang.Long long_column, java.lang.Float float_column, 
java.lang.Double double_column, java.nio.ByteBuffer binary_column, 
java.lang.String string_column, java.lang.Boolean maybe_bool_column, 
java.lang.Integer maybe_int_column, java.lang.Long maybe_long_column, 
java.lang.Float maybe_float_column, java.lang.Double maybe_double_column, 
java.nio.ByteBuffer maybe_binary_column, java.lang.String maybe_string_column, 
java.util.List<java.lang.String> strings_column, 
java.util.Map<java.lang.String,java.lang.Integer> string_to_int_column, 
java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>>
 complex_column) {
-    this.bool_column = bool_column;
-    this.int_column = int_column;
-    this.long_column = long_column;
-    this.float_column = float_column;
-    this.double_column = double_column;
-    this.binary_column = binary_column;
-    this.string_column = string_column;
-    this.maybe_bool_column = maybe_bool_column;
-    this.maybe_int_column = maybe_int_column;
-    this.maybe_long_column = maybe_long_column;
-    this.maybe_float_column = maybe_float_column;
-    this.maybe_double_column = maybe_double_column;
-    this.maybe_binary_column = maybe_binary_column;
-    this.maybe_string_column = maybe_string_column;
-    this.strings_column = strings_column;
-    this.string_to_int_column = string_to_int_column;
-    this.complex_column = complex_column;
-  }
-
-  public org.apache.avro.Schema getSchema() { return SCHEMA$; }
-  // Used by DatumWriter.  Applications should not call. 
-  public java.lang.Object get(int field$) {
-    switch (field$) {
-    case 0: return bool_column;
-    case 1: return int_column;
-    case 2: return long_column;
-    case 3: return float_column;
-    case 4: return double_column;
-    case 5: return binary_column;
-    case 6: return string_column;
-    case 7: return maybe_bool_column;
-    case 8: return maybe_int_column;
-    case 9: return maybe_long_column;
-    case 10: return maybe_float_column;
-    case 11: return maybe_double_column;
-    case 12: return maybe_binary_column;
-    case 13: return maybe_string_column;
-    case 14: return strings_column;
-    case 15: return string_to_int_column;
-    case 16: return complex_column;
-    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
-    }
-  }
-  // Used by DatumReader.  Applications should not call. 
-  @SuppressWarnings(value="unchecked")
-  public void put(int field$, java.lang.Object value$) {
-    switch (field$) {
-    case 0: bool_column = (java.lang.Boolean)value$; break;
-    case 1: int_column = (java.lang.Integer)value$; break;
-    case 2: long_column = (java.lang.Long)value$; break;
-    case 3: float_column = (java.lang.Float)value$; break;
-    case 4: double_column = (java.lang.Double)value$; break;
-    case 5: binary_column = (java.nio.ByteBuffer)value$; break;
-    case 6: string_column = (java.lang.String)value$; break;
-    case 7: maybe_bool_column = (java.lang.Boolean)value$; break;
-    case 8: maybe_int_column = (java.lang.Integer)value$; break;
-    case 9: maybe_long_column = (java.lang.Long)value$; break;
-    case 10: maybe_float_column = (java.lang.Float)value$; break;
-    case 11: maybe_double_column = (java.lang.Double)value$; break;
-    case 12: maybe_binary_column = (java.nio.ByteBuffer)value$; break;
-    case 13: maybe_string_column = (java.lang.String)value$; break;
-    case 14: strings_column = (java.util.List<java.lang.String>)value$; break;
-    case 15: string_to_int_column = 
(java.util.Map<java.lang.String,java.lang.Integer>)value$; break;
-    case 16: complex_column = 
(java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>>)value$;
 break;
-    default: throw new org.apache.avro.AvroRuntimeException("Bad index");
-    }
-  }
-
-  /**
-   * Gets the value of the 'bool_column' field.
-   */
-  public java.lang.Boolean getBoolColumn() {
-    return bool_column;
-  }
-
-  /**
-   * Sets the value of the 'bool_column' field.
-   * @param value the value to set.
-   */
-  public void setBoolColumn(java.lang.Boolean value) {
-    this.bool_column = value;
-  }
-
-  /**
-   * Gets the value of the 'int_column' field.
-   */
-  public java.lang.Integer getIntColumn() {
-    return int_column;
-  }
-
-  /**
-   * Sets the value of the 'int_column' field.
-   * @param value the value to set.
-   */
-  public void setIntColumn(java.lang.Integer value) {
-    this.int_column = value;
-  }
-
-  /**
-   * Gets the value of the 'long_column' field.
-   */
-  public java.lang.Long getLongColumn() {
-    return long_column;
-  }
-
-  /**
-   * Sets the value of the 'long_column' field.
-   * @param value the value to set.
-   */
-  public void setLongColumn(java.lang.Long value) {
-    this.long_column = value;
-  }
-
-  /**
-   * Gets the value of the 'float_column' field.
-   */
-  public java.lang.Float getFloatColumn() {
-    return float_column;
-  }
-
-  /**
-   * Sets the value of the 'float_column' field.
-   * @param value the value to set.
-   */
-  public void setFloatColumn(java.lang.Float value) {
-    this.float_column = value;
-  }
-
-  /**
-   * Gets the value of the 'double_column' field.
-   */
-  public java.lang.Double getDoubleColumn() {
-    return double_column;
-  }
-
-  /**
-   * Sets the value of the 'double_column' field.
-   * @param value the value to set.
-   */
-  public void setDoubleColumn(java.lang.Double value) {
-    this.double_column = value;
-  }
-
-  /**
-   * Gets the value of the 'binary_column' field.
-   */
-  public java.nio.ByteBuffer getBinaryColumn() {
-    return binary_column;
-  }
-
-  /**
-   * Sets the value of the 'binary_column' field.
-   * @param value the value to set.
-   */
-  public void setBinaryColumn(java.nio.ByteBuffer value) {
-    this.binary_column = value;
-  }
-
-  /**
-   * Gets the value of the 'string_column' field.
-   */
-  public java.lang.String getStringColumn() {
-    return string_column;
-  }
-
-  /**
-   * Sets the value of the 'string_column' field.
-   * @param value the value to set.
-   */
-  public void setStringColumn(java.lang.String value) {
-    this.string_column = value;
-  }
-
-  /**
-   * Gets the value of the 'maybe_bool_column' field.
-   */
-  public java.lang.Boolean getMaybeBoolColumn() {
-    return maybe_bool_column;
-  }
-
-  /**
-   * Sets the value of the 'maybe_bool_column' field.
-   * @param value the value to set.
-   */
-  public void setMaybeBoolColumn(java.lang.Boolean value) {
-    this.maybe_bool_column = value;
-  }
-
-  /**
-   * Gets the value of the 'maybe_int_column' field.
-   */
-  public java.lang.Integer getMaybeIntColumn() {
-    return maybe_int_column;
-  }
-
-  /**
-   * Sets the value of the 'maybe_int_column' field.
-   * @param value the value to set.
-   */
-  public void setMaybeIntColumn(java.lang.Integer value) {
-    this.maybe_int_column = value;
-  }
-
-  /**
-   * Gets the value of the 'maybe_long_column' field.
-   */
-  public java.lang.Long getMaybeLongColumn() {
-    return maybe_long_column;
-  }
-
-  /**
-   * Sets the value of the 'maybe_long_column' field.
-   * @param value the value to set.
-   */
-  public void setMaybeLongColumn(java.lang.Long value) {
-    this.maybe_long_column = value;
-  }
-
-  /**
-   * Gets the value of the 'maybe_float_column' field.
-   */
-  public java.lang.Float getMaybeFloatColumn() {
-    return maybe_float_column;
-  }
-
-  /**
-   * Sets the value of the 'maybe_float_column' field.
-   * @param value the value to set.
-   */
-  public void setMaybeFloatColumn(java.lang.Float value) {
-    this.maybe_float_column = value;
-  }
-
-  /**
-   * Gets the value of the 'maybe_double_column' field.
-   */
-  public java.lang.Double getMaybeDoubleColumn() {
-    return maybe_double_column;
-  }
-
-  /**
-   * Sets the value of the 'maybe_double_column' field.
-   * @param value the value to set.
-   */
-  public void setMaybeDoubleColumn(java.lang.Double value) {
-    this.maybe_double_column = value;
-  }
-
-  /**
-   * Gets the value of the 'maybe_binary_column' field.
-   */
-  public java.nio.ByteBuffer getMaybeBinaryColumn() {
-    return maybe_binary_column;
-  }
-
-  /**
-   * Sets the value of the 'maybe_binary_column' field.
-   * @param value the value to set.
-   */
-  public void setMaybeBinaryColumn(java.nio.ByteBuffer value) {
-    this.maybe_binary_column = value;
-  }
-
-  /**
-   * Gets the value of the 'maybe_string_column' field.
-   */
-  public java.lang.String getMaybeStringColumn() {
-    return maybe_string_column;
-  }
-
-  /**
-   * Sets the value of the 'maybe_string_column' field.
-   * @param value the value to set.
-   */
-  public void setMaybeStringColumn(java.lang.String value) {
-    this.maybe_string_column = value;
-  }
-
-  /**
-   * Gets the value of the 'strings_column' field.
-   */
-  public java.util.List<java.lang.String> getStringsColumn() {
-    return strings_column;
-  }
-
-  /**
-   * Sets the value of the 'strings_column' field.
-   * @param value the value to set.
-   */
-  public void setStringsColumn(java.util.List<java.lang.String> value) {
-    this.strings_column = value;
-  }
-
-  /**
-   * Gets the value of the 'string_to_int_column' field.
-   */
-  public java.util.Map<java.lang.String,java.lang.Integer> 
getStringToIntColumn() {
-    return string_to_int_column;
-  }
-
-  /**
-   * Sets the value of the 'string_to_int_column' field.
-   * @param value the value to set.
-   */
-  public void 
setStringToIntColumn(java.util.Map<java.lang.String,java.lang.Integer> value) {
-    this.string_to_int_column = value;
-  }
-
-  /**
-   * Gets the value of the 'complex_column' field.
-   */
-  public 
java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>>
 getComplexColumn() {
-    return complex_column;
-  }
-
-  /**
-   * Sets the value of the 'complex_column' field.
-   * @param value the value to set.
-   */
-  public void 
setComplexColumn(java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>>
 value) {
-    this.complex_column = value;
-  }
-
-  /** Creates a new ParquetAvroCompat RecordBuilder */
-  public static 
org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder newBuilder() {
-    return new 
org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder();
-  }
-  
-  /** Creates a new ParquetAvroCompat RecordBuilder by copying an existing 
Builder */
-  public static 
org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
newBuilder(org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
other) {
-    return new 
org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder(other);
-  }
-  
-  /** Creates a new ParquetAvroCompat RecordBuilder by copying an existing 
ParquetAvroCompat instance */
-  public static 
org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
newBuilder(org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat other) {
-    return new 
org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder(other);
-  }
-  
-  /**
-   * RecordBuilder for ParquetAvroCompat instances.
-   */
-  public static class Builder extends 
org.apache.avro.specific.SpecificRecordBuilderBase<ParquetAvroCompat>
-    implements org.apache.avro.data.RecordBuilder<ParquetAvroCompat> {
-
-    private boolean bool_column;
-    private int int_column;
-    private long long_column;
-    private float float_column;
-    private double double_column;
-    private java.nio.ByteBuffer binary_column;
-    private java.lang.String string_column;
-    private java.lang.Boolean maybe_bool_column;
-    private java.lang.Integer maybe_int_column;
-    private java.lang.Long maybe_long_column;
-    private java.lang.Float maybe_float_column;
-    private java.lang.Double maybe_double_column;
-    private java.nio.ByteBuffer maybe_binary_column;
-    private java.lang.String maybe_string_column;
-    private java.util.List<java.lang.String> strings_column;
-    private java.util.Map<java.lang.String,java.lang.Integer> 
string_to_int_column;
-    private 
java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>>
 complex_column;
-
-    /** Creates a new Builder */
-    private Builder() {
-      super(org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.SCHEMA$);
-    }
-    
-    /** Creates a Builder by copying an existing Builder */
-    private 
Builder(org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder other) 
{
-      super(other);
-      if (isValidValue(fields()[0], other.bool_column)) {
-        this.bool_column = data().deepCopy(fields()[0].schema(), 
other.bool_column);
-        fieldSetFlags()[0] = true;
-      }
-      if (isValidValue(fields()[1], other.int_column)) {
-        this.int_column = data().deepCopy(fields()[1].schema(), 
other.int_column);
-        fieldSetFlags()[1] = true;
-      }
-      if (isValidValue(fields()[2], other.long_column)) {
-        this.long_column = data().deepCopy(fields()[2].schema(), 
other.long_column);
-        fieldSetFlags()[2] = true;
-      }
-      if (isValidValue(fields()[3], other.float_column)) {
-        this.float_column = data().deepCopy(fields()[3].schema(), 
other.float_column);
-        fieldSetFlags()[3] = true;
-      }
-      if (isValidValue(fields()[4], other.double_column)) {
-        this.double_column = data().deepCopy(fields()[4].schema(), 
other.double_column);
-        fieldSetFlags()[4] = true;
-      }
-      if (isValidValue(fields()[5], other.binary_column)) {
-        this.binary_column = data().deepCopy(fields()[5].schema(), 
other.binary_column);
-        fieldSetFlags()[5] = true;
-      }
-      if (isValidValue(fields()[6], other.string_column)) {
-        this.string_column = data().deepCopy(fields()[6].schema(), 
other.string_column);
-        fieldSetFlags()[6] = true;
-      }
-      if (isValidValue(fields()[7], other.maybe_bool_column)) {
-        this.maybe_bool_column = data().deepCopy(fields()[7].schema(), 
other.maybe_bool_column);
-        fieldSetFlags()[7] = true;
-      }
-      if (isValidValue(fields()[8], other.maybe_int_column)) {
-        this.maybe_int_column = data().deepCopy(fields()[8].schema(), 
other.maybe_int_column);
-        fieldSetFlags()[8] = true;
-      }
-      if (isValidValue(fields()[9], other.maybe_long_column)) {
-        this.maybe_long_column = data().deepCopy(fields()[9].schema(), 
other.maybe_long_column);
-        fieldSetFlags()[9] = true;
-      }
-      if (isValidValue(fields()[10], other.maybe_float_column)) {
-        this.maybe_float_column = data().deepCopy(fields()[10].schema(), 
other.maybe_float_column);
-        fieldSetFlags()[10] = true;
-      }
-      if (isValidValue(fields()[11], other.maybe_double_column)) {
-        this.maybe_double_column = data().deepCopy(fields()[11].schema(), 
other.maybe_double_column);
-        fieldSetFlags()[11] = true;
-      }
-      if (isValidValue(fields()[12], other.maybe_binary_column)) {
-        this.maybe_binary_column = data().deepCopy(fields()[12].schema(), 
other.maybe_binary_column);
-        fieldSetFlags()[12] = true;
-      }
-      if (isValidValue(fields()[13], other.maybe_string_column)) {
-        this.maybe_string_column = data().deepCopy(fields()[13].schema(), 
other.maybe_string_column);
-        fieldSetFlags()[13] = true;
-      }
-      if (isValidValue(fields()[14], other.strings_column)) {
-        this.strings_column = data().deepCopy(fields()[14].schema(), 
other.strings_column);
-        fieldSetFlags()[14] = true;
-      }
-      if (isValidValue(fields()[15], other.string_to_int_column)) {
-        this.string_to_int_column = data().deepCopy(fields()[15].schema(), 
other.string_to_int_column);
-        fieldSetFlags()[15] = true;
-      }
-      if (isValidValue(fields()[16], other.complex_column)) {
-        this.complex_column = data().deepCopy(fields()[16].schema(), 
other.complex_column);
-        fieldSetFlags()[16] = true;
-      }
-    }
-    
-    /** Creates a Builder by copying an existing ParquetAvroCompat instance */
-    private Builder(org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat 
other) {
-            
super(org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.SCHEMA$);
-      if (isValidValue(fields()[0], other.bool_column)) {
-        this.bool_column = data().deepCopy(fields()[0].schema(), 
other.bool_column);
-        fieldSetFlags()[0] = true;
-      }
-      if (isValidValue(fields()[1], other.int_column)) {
-        this.int_column = data().deepCopy(fields()[1].schema(), 
other.int_column);
-        fieldSetFlags()[1] = true;
-      }
-      if (isValidValue(fields()[2], other.long_column)) {
-        this.long_column = data().deepCopy(fields()[2].schema(), 
other.long_column);
-        fieldSetFlags()[2] = true;
-      }
-      if (isValidValue(fields()[3], other.float_column)) {
-        this.float_column = data().deepCopy(fields()[3].schema(), 
other.float_column);
-        fieldSetFlags()[3] = true;
-      }
-      if (isValidValue(fields()[4], other.double_column)) {
-        this.double_column = data().deepCopy(fields()[4].schema(), 
other.double_column);
-        fieldSetFlags()[4] = true;
-      }
-      if (isValidValue(fields()[5], other.binary_column)) {
-        this.binary_column = data().deepCopy(fields()[5].schema(), 
other.binary_column);
-        fieldSetFlags()[5] = true;
-      }
-      if (isValidValue(fields()[6], other.string_column)) {
-        this.string_column = data().deepCopy(fields()[6].schema(), 
other.string_column);
-        fieldSetFlags()[6] = true;
-      }
-      if (isValidValue(fields()[7], other.maybe_bool_column)) {
-        this.maybe_bool_column = data().deepCopy(fields()[7].schema(), 
other.maybe_bool_column);
-        fieldSetFlags()[7] = true;
-      }
-      if (isValidValue(fields()[8], other.maybe_int_column)) {
-        this.maybe_int_column = data().deepCopy(fields()[8].schema(), 
other.maybe_int_column);
-        fieldSetFlags()[8] = true;
-      }
-      if (isValidValue(fields()[9], other.maybe_long_column)) {
-        this.maybe_long_column = data().deepCopy(fields()[9].schema(), 
other.maybe_long_column);
-        fieldSetFlags()[9] = true;
-      }
-      if (isValidValue(fields()[10], other.maybe_float_column)) {
-        this.maybe_float_column = data().deepCopy(fields()[10].schema(), 
other.maybe_float_column);
-        fieldSetFlags()[10] = true;
-      }
-      if (isValidValue(fields()[11], other.maybe_double_column)) {
-        this.maybe_double_column = data().deepCopy(fields()[11].schema(), 
other.maybe_double_column);
-        fieldSetFlags()[11] = true;
-      }
-      if (isValidValue(fields()[12], other.maybe_binary_column)) {
-        this.maybe_binary_column = data().deepCopy(fields()[12].schema(), 
other.maybe_binary_column);
-        fieldSetFlags()[12] = true;
-      }
-      if (isValidValue(fields()[13], other.maybe_string_column)) {
-        this.maybe_string_column = data().deepCopy(fields()[13].schema(), 
other.maybe_string_column);
-        fieldSetFlags()[13] = true;
-      }
-      if (isValidValue(fields()[14], other.strings_column)) {
-        this.strings_column = data().deepCopy(fields()[14].schema(), 
other.strings_column);
-        fieldSetFlags()[14] = true;
-      }
-      if (isValidValue(fields()[15], other.string_to_int_column)) {
-        this.string_to_int_column = data().deepCopy(fields()[15].schema(), 
other.string_to_int_column);
-        fieldSetFlags()[15] = true;
-      }
-      if (isValidValue(fields()[16], other.complex_column)) {
-        this.complex_column = data().deepCopy(fields()[16].schema(), 
other.complex_column);
-        fieldSetFlags()[16] = true;
-      }
-    }
-
-    /** Gets the value of the 'bool_column' field */
-    public java.lang.Boolean getBoolColumn() {
-      return bool_column;
-    }
-    
-    /** Sets the value of the 'bool_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
setBoolColumn(boolean value) {
-      validate(fields()[0], value);
-      this.bool_column = value;
-      fieldSetFlags()[0] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'bool_column' field has been set */
-    public boolean hasBoolColumn() {
-      return fieldSetFlags()[0];
-    }
-    
-    /** Clears the value of the 'bool_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
clearBoolColumn() {
-      fieldSetFlags()[0] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'int_column' field */
-    public java.lang.Integer getIntColumn() {
-      return int_column;
-    }
-    
-    /** Sets the value of the 'int_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
setIntColumn(int value) {
-      validate(fields()[1], value);
-      this.int_column = value;
-      fieldSetFlags()[1] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'int_column' field has been set */
-    public boolean hasIntColumn() {
-      return fieldSetFlags()[1];
-    }
-    
-    /** Clears the value of the 'int_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
clearIntColumn() {
-      fieldSetFlags()[1] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'long_column' field */
-    public java.lang.Long getLongColumn() {
-      return long_column;
-    }
-    
-    /** Sets the value of the 'long_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
setLongColumn(long value) {
-      validate(fields()[2], value);
-      this.long_column = value;
-      fieldSetFlags()[2] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'long_column' field has been set */
-    public boolean hasLongColumn() {
-      return fieldSetFlags()[2];
-    }
-    
-    /** Clears the value of the 'long_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
clearLongColumn() {
-      fieldSetFlags()[2] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'float_column' field */
-    public java.lang.Float getFloatColumn() {
-      return float_column;
-    }
-    
-    /** Sets the value of the 'float_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
setFloatColumn(float value) {
-      validate(fields()[3], value);
-      this.float_column = value;
-      fieldSetFlags()[3] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'float_column' field has been set */
-    public boolean hasFloatColumn() {
-      return fieldSetFlags()[3];
-    }
-    
-    /** Clears the value of the 'float_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
clearFloatColumn() {
-      fieldSetFlags()[3] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'double_column' field */
-    public java.lang.Double getDoubleColumn() {
-      return double_column;
-    }
-    
-    /** Sets the value of the 'double_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
setDoubleColumn(double value) {
-      validate(fields()[4], value);
-      this.double_column = value;
-      fieldSetFlags()[4] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'double_column' field has been set */
-    public boolean hasDoubleColumn() {
-      return fieldSetFlags()[4];
-    }
-    
-    /** Clears the value of the 'double_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
clearDoubleColumn() {
-      fieldSetFlags()[4] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'binary_column' field */
-    public java.nio.ByteBuffer getBinaryColumn() {
-      return binary_column;
-    }
-    
-    /** Sets the value of the 'binary_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
setBinaryColumn(java.nio.ByteBuffer value) {
-      validate(fields()[5], value);
-      this.binary_column = value;
-      fieldSetFlags()[5] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'binary_column' field has been set */
-    public boolean hasBinaryColumn() {
-      return fieldSetFlags()[5];
-    }
-    
-    /** Clears the value of the 'binary_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
clearBinaryColumn() {
-      binary_column = null;
-      fieldSetFlags()[5] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'string_column' field */
-    public java.lang.String getStringColumn() {
-      return string_column;
-    }
-    
-    /** Sets the value of the 'string_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
setStringColumn(java.lang.String value) {
-      validate(fields()[6], value);
-      this.string_column = value;
-      fieldSetFlags()[6] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'string_column' field has been set */
-    public boolean hasStringColumn() {
-      return fieldSetFlags()[6];
-    }
-    
-    /** Clears the value of the 'string_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
clearStringColumn() {
-      string_column = null;
-      fieldSetFlags()[6] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'maybe_bool_column' field */
-    public java.lang.Boolean getMaybeBoolColumn() {
-      return maybe_bool_column;
-    }
-    
-    /** Sets the value of the 'maybe_bool_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
setMaybeBoolColumn(java.lang.Boolean value) {
-      validate(fields()[7], value);
-      this.maybe_bool_column = value;
-      fieldSetFlags()[7] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'maybe_bool_column' field has been set */
-    public boolean hasMaybeBoolColumn() {
-      return fieldSetFlags()[7];
-    }
-    
-    /** Clears the value of the 'maybe_bool_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
clearMaybeBoolColumn() {
-      maybe_bool_column = null;
-      fieldSetFlags()[7] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'maybe_int_column' field */
-    public java.lang.Integer getMaybeIntColumn() {
-      return maybe_int_column;
-    }
-    
-    /** Sets the value of the 'maybe_int_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
setMaybeIntColumn(java.lang.Integer value) {
-      validate(fields()[8], value);
-      this.maybe_int_column = value;
-      fieldSetFlags()[8] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'maybe_int_column' field has been set */
-    public boolean hasMaybeIntColumn() {
-      return fieldSetFlags()[8];
-    }
-    
-    /** Clears the value of the 'maybe_int_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
clearMaybeIntColumn() {
-      maybe_int_column = null;
-      fieldSetFlags()[8] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'maybe_long_column' field */
-    public java.lang.Long getMaybeLongColumn() {
-      return maybe_long_column;
-    }
-    
-    /** Sets the value of the 'maybe_long_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
setMaybeLongColumn(java.lang.Long value) {
-      validate(fields()[9], value);
-      this.maybe_long_column = value;
-      fieldSetFlags()[9] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'maybe_long_column' field has been set */
-    public boolean hasMaybeLongColumn() {
-      return fieldSetFlags()[9];
-    }
-    
-    /** Clears the value of the 'maybe_long_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
clearMaybeLongColumn() {
-      maybe_long_column = null;
-      fieldSetFlags()[9] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'maybe_float_column' field */
-    public java.lang.Float getMaybeFloatColumn() {
-      return maybe_float_column;
-    }
-    
-    /** Sets the value of the 'maybe_float_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
setMaybeFloatColumn(java.lang.Float value) {
-      validate(fields()[10], value);
-      this.maybe_float_column = value;
-      fieldSetFlags()[10] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'maybe_float_column' field has been set */
-    public boolean hasMaybeFloatColumn() {
-      return fieldSetFlags()[10];
-    }
-    
-    /** Clears the value of the 'maybe_float_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
clearMaybeFloatColumn() {
-      maybe_float_column = null;
-      fieldSetFlags()[10] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'maybe_double_column' field */
-    public java.lang.Double getMaybeDoubleColumn() {
-      return maybe_double_column;
-    }
-    
-    /** Sets the value of the 'maybe_double_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
setMaybeDoubleColumn(java.lang.Double value) {
-      validate(fields()[11], value);
-      this.maybe_double_column = value;
-      fieldSetFlags()[11] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'maybe_double_column' field has been set */
-    public boolean hasMaybeDoubleColumn() {
-      return fieldSetFlags()[11];
-    }
-    
-    /** Clears the value of the 'maybe_double_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
clearMaybeDoubleColumn() {
-      maybe_double_column = null;
-      fieldSetFlags()[11] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'maybe_binary_column' field */
-    public java.nio.ByteBuffer getMaybeBinaryColumn() {
-      return maybe_binary_column;
-    }
-    
-    /** Sets the value of the 'maybe_binary_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
setMaybeBinaryColumn(java.nio.ByteBuffer value) {
-      validate(fields()[12], value);
-      this.maybe_binary_column = value;
-      fieldSetFlags()[12] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'maybe_binary_column' field has been set */
-    public boolean hasMaybeBinaryColumn() {
-      return fieldSetFlags()[12];
-    }
-    
-    /** Clears the value of the 'maybe_binary_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
clearMaybeBinaryColumn() {
-      maybe_binary_column = null;
-      fieldSetFlags()[12] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'maybe_string_column' field */
-    public java.lang.String getMaybeStringColumn() {
-      return maybe_string_column;
-    }
-    
-    /** Sets the value of the 'maybe_string_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
setMaybeStringColumn(java.lang.String value) {
-      validate(fields()[13], value);
-      this.maybe_string_column = value;
-      fieldSetFlags()[13] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'maybe_string_column' field has been set */
-    public boolean hasMaybeStringColumn() {
-      return fieldSetFlags()[13];
-    }
-    
-    /** Clears the value of the 'maybe_string_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
clearMaybeStringColumn() {
-      maybe_string_column = null;
-      fieldSetFlags()[13] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'strings_column' field */
-    public java.util.List<java.lang.String> getStringsColumn() {
-      return strings_column;
-    }
-    
-    /** Sets the value of the 'strings_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
setStringsColumn(java.util.List<java.lang.String> value) {
-      validate(fields()[14], value);
-      this.strings_column = value;
-      fieldSetFlags()[14] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'strings_column' field has been set */
-    public boolean hasStringsColumn() {
-      return fieldSetFlags()[14];
-    }
-    
-    /** Clears the value of the 'strings_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
clearStringsColumn() {
-      strings_column = null;
-      fieldSetFlags()[14] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'string_to_int_column' field */
-    public java.util.Map<java.lang.String,java.lang.Integer> 
getStringToIntColumn() {
-      return string_to_int_column;
-    }
-    
-    /** Sets the value of the 'string_to_int_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
setStringToIntColumn(java.util.Map<java.lang.String,java.lang.Integer> value) {
-      validate(fields()[15], value);
-      this.string_to_int_column = value;
-      fieldSetFlags()[15] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'string_to_int_column' field has been set */
-    public boolean hasStringToIntColumn() {
-      return fieldSetFlags()[15];
-    }
-    
-    /** Clears the value of the 'string_to_int_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
clearStringToIntColumn() {
-      string_to_int_column = null;
-      fieldSetFlags()[15] = false;
-      return this;
-    }
-
-    /** Gets the value of the 'complex_column' field */
-    public 
java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>>
 getComplexColumn() {
-      return complex_column;
-    }
-    
-    /** Sets the value of the 'complex_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
setComplexColumn(java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>>
 value) {
-      validate(fields()[16], value);
-      this.complex_column = value;
-      fieldSetFlags()[16] = true;
-      return this; 
-    }
-    
-    /** Checks whether the 'complex_column' field has been set */
-    public boolean hasComplexColumn() {
-      return fieldSetFlags()[16];
-    }
-    
-    /** Clears the value of the 'complex_column' field */
-    public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder 
clearComplexColumn() {
-      complex_column = null;
-      fieldSetFlags()[16] = false;
-      return this;
-    }
-
-    @Override
-    public ParquetAvroCompat build() {
-      try {
-        ParquetAvroCompat record = new ParquetAvroCompat();
-        record.bool_column = fieldSetFlags()[0] ? this.bool_column : 
(java.lang.Boolean) defaultValue(fields()[0]);
-        record.int_column = fieldSetFlags()[1] ? this.int_column : 
(java.lang.Integer) defaultValue(fields()[1]);
-        record.long_column = fieldSetFlags()[2] ? this.long_column : 
(java.lang.Long) defaultValue(fields()[2]);
-        record.float_column = fieldSetFlags()[3] ? this.float_column : 
(java.lang.Float) defaultValue(fields()[3]);
-        record.double_column = fieldSetFlags()[4] ? this.double_column : 
(java.lang.Double) defaultValue(fields()[4]);
-        record.binary_column = fieldSetFlags()[5] ? this.binary_column : 
(java.nio.ByteBuffer) defaultValue(fields()[5]);
-        record.string_column = fieldSetFlags()[6] ? this.string_column : 
(java.lang.String) defaultValue(fields()[6]);
-        record.maybe_bool_column = fieldSetFlags()[7] ? this.maybe_bool_column 
: (java.lang.Boolean) defaultValue(fields()[7]);
-        record.maybe_int_column = fieldSetFlags()[8] ? this.maybe_int_column : 
(java.lang.Integer) defaultValue(fields()[8]);
-        record.maybe_long_column = fieldSetFlags()[9] ? this.maybe_long_column 
: (java.lang.Long) defaultValue(fields()[9]);
-        record.maybe_float_column = fieldSetFlags()[10] ? 
this.maybe_float_column : (java.lang.Float) defaultValue(fields()[10]);
-        record.maybe_double_column = fieldSetFlags()[11] ? 
this.maybe_double_column : (java.lang.Double) defaultValue(fields()[11]);
-        record.maybe_binary_column = fieldSetFlags()[12] ? 
this.maybe_binary_column : (java.nio.ByteBuffer) defaultValue(fields()[12]);
-        record.maybe_string_column = fieldSetFlags()[13] ? 
this.maybe_string_column : (java.lang.String) defaultValue(fields()[13]);
-        record.strings_column = fieldSetFlags()[14] ? this.strings_column : 
(java.util.List<java.lang.String>) defaultValue(fields()[14]);
-        record.string_to_int_column = fieldSetFlags()[15] ? 
this.string_to_int_column : (java.util.Map<java.lang.String,java.lang.Integer>) 
defaultValue(fields()[15]);
-        record.complex_column = fieldSetFlags()[16] ? this.complex_column : 
(java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>>)
 defaultValue(fields()[16]);
-        return record;
-      } catch (Exception e) {
-        throw new org.apache.avro.AvroRuntimeException(e);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index c49f256..adbd951 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -25,8 +25,8 @@ import scala.util.Random
 import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.json.JSONRelation
-import org.apache.spark.sql.parquet.ParquetRelation
+import org.apache.spark.sql.execution.datasources.json.JSONRelation
+import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint, SQLTestUtils}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
new file mode 100644
index 0000000..73d5621
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -0,0 +1,1172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.json
+
+import java.io.{File, StringWriter}
+import java.sql.{Date, Timestamp}
+
+import com.fasterxml.jackson.core.JsonFactory
+import org.apache.spark.rdd.RDD
+import org.scalactic.Tolerance._
+
+import org.apache.spark.sql.{SQLContext, QueryTest, Row, SQLConf}
+import org.apache.spark.sql.TestData._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, 
LogicalRelation}
+import 
org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.util.Utils
+
+class JsonSuite extends QueryTest with SQLTestUtils with TestJsonData {
+
+  protected lazy val ctx = org.apache.spark.sql.test.TestSQLContext
+  override def sqlContext: SQLContext = ctx // used by SQLTestUtils
+
+  import ctx.sql
+  import ctx.implicits._
+
+  test("Type promotion") {
+    def checkTypePromotion(expected: Any, actual: Any) {
+      assert(expected.getClass == actual.getClass,
+        s"Failed to promote ${actual.getClass} to ${expected.getClass}.")
+      assert(expected == actual,
+        s"Promoted value ${actual}(${actual.getClass}) does not equal the 
expected value " +
+          s"${expected}(${expected.getClass}).")
+    }
+
+    val factory = new JsonFactory()
+    def enforceCorrectType(value: Any, dataType: DataType): Any = {
+      val writer = new StringWriter()
+      val generator = factory.createGenerator(writer)
+      generator.writeObject(value)
+      generator.flush()
+
+      val parser = factory.createParser(writer.toString)
+      parser.nextToken()
+      JacksonParser.convertField(factory, parser, dataType)
+    }
+
+    val intNumber: Int = 2147483647
+    checkTypePromotion(intNumber, enforceCorrectType(intNumber, IntegerType))
+    checkTypePromotion(intNumber.toLong, enforceCorrectType(intNumber, 
LongType))
+    checkTypePromotion(intNumber.toDouble, enforceCorrectType(intNumber, 
DoubleType))
+    checkTypePromotion(
+      Decimal(intNumber), enforceCorrectType(intNumber, 
DecimalType.SYSTEM_DEFAULT))
+
+    val longNumber: Long = 9223372036854775807L
+    checkTypePromotion(longNumber, enforceCorrectType(longNumber, LongType))
+    checkTypePromotion(longNumber.toDouble, enforceCorrectType(longNumber, 
DoubleType))
+    checkTypePromotion(
+      Decimal(longNumber), enforceCorrectType(longNumber, 
DecimalType.SYSTEM_DEFAULT))
+
+    val doubleNumber: Double = 1.7976931348623157E308d
+    checkTypePromotion(doubleNumber.toDouble, enforceCorrectType(doubleNumber, 
DoubleType))
+
+    checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new 
Timestamp(intNumber)),
+        enforceCorrectType(intNumber, TimestampType))
+    checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new 
Timestamp(intNumber.toLong)),
+        enforceCorrectType(intNumber.toLong, TimestampType))
+    val strTime = "2014-09-30 12:34:56"
+    
checkTypePromotion(DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(strTime)),
+        enforceCorrectType(strTime, TimestampType))
+
+    val strDate = "2014-10-15"
+    checkTypePromotion(
+      DateTimeUtils.fromJavaDate(Date.valueOf(strDate)), 
enforceCorrectType(strDate, DateType))
+
+    val ISO8601Time1 = "1970-01-01T01:00:01.0Z"
+    checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new Timestamp(3601000)),
+        enforceCorrectType(ISO8601Time1, TimestampType))
+    checkTypePromotion(DateTimeUtils.millisToDays(3601000),
+      enforceCorrectType(ISO8601Time1, DateType))
+    val ISO8601Time2 = "1970-01-01T02:00:01-01:00"
+    checkTypePromotion(DateTimeUtils.fromJavaTimestamp(new 
Timestamp(10801000)),
+        enforceCorrectType(ISO8601Time2, TimestampType))
+    checkTypePromotion(DateTimeUtils.millisToDays(10801000),
+      enforceCorrectType(ISO8601Time2, DateType))
+  }
+
+  test("Get compatible type") {
+    def checkDataType(t1: DataType, t2: DataType, expected: DataType) {
+      var actual = compatibleType(t1, t2)
+      assert(actual == expected,
+        s"Expected $expected as the most general data type for $t1 and $t2, 
found $actual")
+      actual = compatibleType(t2, t1)
+      assert(actual == expected,
+        s"Expected $expected as the most general data type for $t1 and $t2, 
found $actual")
+    }
+
+    // NullType
+    checkDataType(NullType, BooleanType, BooleanType)
+    checkDataType(NullType, IntegerType, IntegerType)
+    checkDataType(NullType, LongType, LongType)
+    checkDataType(NullType, DoubleType, DoubleType)
+    checkDataType(NullType, DecimalType.SYSTEM_DEFAULT, 
DecimalType.SYSTEM_DEFAULT)
+    checkDataType(NullType, StringType, StringType)
+    checkDataType(NullType, ArrayType(IntegerType), ArrayType(IntegerType))
+    checkDataType(NullType, StructType(Nil), StructType(Nil))
+    checkDataType(NullType, NullType, NullType)
+
+    // BooleanType
+    checkDataType(BooleanType, BooleanType, BooleanType)
+    checkDataType(BooleanType, IntegerType, StringType)
+    checkDataType(BooleanType, LongType, StringType)
+    checkDataType(BooleanType, DoubleType, StringType)
+    checkDataType(BooleanType, DecimalType.SYSTEM_DEFAULT, StringType)
+    checkDataType(BooleanType, StringType, StringType)
+    checkDataType(BooleanType, ArrayType(IntegerType), StringType)
+    checkDataType(BooleanType, StructType(Nil), StringType)
+
+    // IntegerType
+    checkDataType(IntegerType, IntegerType, IntegerType)
+    checkDataType(IntegerType, LongType, LongType)
+    checkDataType(IntegerType, DoubleType, DoubleType)
+    checkDataType(IntegerType, DecimalType.SYSTEM_DEFAULT, 
DecimalType.SYSTEM_DEFAULT)
+    checkDataType(IntegerType, StringType, StringType)
+    checkDataType(IntegerType, ArrayType(IntegerType), StringType)
+    checkDataType(IntegerType, StructType(Nil), StringType)
+
+    // LongType
+    checkDataType(LongType, LongType, LongType)
+    checkDataType(LongType, DoubleType, DoubleType)
+    checkDataType(LongType, DecimalType.SYSTEM_DEFAULT, 
DecimalType.SYSTEM_DEFAULT)
+    checkDataType(LongType, StringType, StringType)
+    checkDataType(LongType, ArrayType(IntegerType), StringType)
+    checkDataType(LongType, StructType(Nil), StringType)
+
+    // DoubleType
+    checkDataType(DoubleType, DoubleType, DoubleType)
+    checkDataType(DoubleType, DecimalType.SYSTEM_DEFAULT, DoubleType)
+    checkDataType(DoubleType, StringType, StringType)
+    checkDataType(DoubleType, ArrayType(IntegerType), StringType)
+    checkDataType(DoubleType, StructType(Nil), StringType)
+
+    // DecimalType
+    checkDataType(DecimalType.SYSTEM_DEFAULT, DecimalType.SYSTEM_DEFAULT,
+      DecimalType.SYSTEM_DEFAULT)
+    checkDataType(DecimalType.SYSTEM_DEFAULT, StringType, StringType)
+    checkDataType(DecimalType.SYSTEM_DEFAULT, ArrayType(IntegerType), 
StringType)
+    checkDataType(DecimalType.SYSTEM_DEFAULT, StructType(Nil), StringType)
+
+    // StringType
+    checkDataType(StringType, StringType, StringType)
+    checkDataType(StringType, ArrayType(IntegerType), StringType)
+    checkDataType(StringType, StructType(Nil), StringType)
+
+    // ArrayType
+    checkDataType(ArrayType(IntegerType), ArrayType(IntegerType), 
ArrayType(IntegerType))
+    checkDataType(ArrayType(IntegerType), ArrayType(LongType), 
ArrayType(LongType))
+    checkDataType(ArrayType(IntegerType), ArrayType(StringType), 
ArrayType(StringType))
+    checkDataType(ArrayType(IntegerType), StructType(Nil), StringType)
+    checkDataType(
+      ArrayType(IntegerType, true), ArrayType(IntegerType), 
ArrayType(IntegerType, true))
+    checkDataType(
+      ArrayType(IntegerType, true), ArrayType(IntegerType, false), 
ArrayType(IntegerType, true))
+    checkDataType(
+      ArrayType(IntegerType, true), ArrayType(IntegerType, true), 
ArrayType(IntegerType, true))
+    checkDataType(
+      ArrayType(IntegerType, false), ArrayType(IntegerType), 
ArrayType(IntegerType, true))
+    checkDataType(
+      ArrayType(IntegerType, false), ArrayType(IntegerType, false), 
ArrayType(IntegerType, false))
+    checkDataType(
+      ArrayType(IntegerType, false), ArrayType(IntegerType, true), 
ArrayType(IntegerType, true))
+
+    // StructType
+    checkDataType(StructType(Nil), StructType(Nil), StructType(Nil))
+    checkDataType(
+      StructType(StructField("f1", IntegerType, true) :: Nil),
+      StructType(StructField("f1", IntegerType, true) :: Nil),
+      StructType(StructField("f1", IntegerType, true) :: Nil))
+    checkDataType(
+      StructType(StructField("f1", IntegerType, true) :: Nil),
+      StructType(Nil),
+      StructType(StructField("f1", IntegerType, true) :: Nil))
+    checkDataType(
+      StructType(
+        StructField("f1", IntegerType, true) ::
+        StructField("f2", IntegerType, true) :: Nil),
+      StructType(StructField("f1", LongType, true) :: Nil) ,
+      StructType(
+        StructField("f1", LongType, true) ::
+        StructField("f2", IntegerType, true) :: Nil))
+    checkDataType(
+      StructType(
+        StructField("f1", IntegerType, true) :: Nil),
+      StructType(
+        StructField("f2", IntegerType, true) :: Nil),
+      StructType(
+        StructField("f1", IntegerType, true) ::
+        StructField("f2", IntegerType, true) :: Nil))
+    checkDataType(
+      StructType(
+        StructField("f1", IntegerType, true) :: Nil),
+      DecimalType.SYSTEM_DEFAULT,
+      StringType)
+  }
+
+  test("Complex field and type inferring with null in sampling") {
+    val jsonDF = ctx.read.json(jsonNullStruct)
+    val expectedSchema = StructType(
+      StructField("headers", StructType(
+        StructField("Charset", StringType, true) ::
+          StructField("Host", StringType, true) :: Nil)
+        , true) ::
+        StructField("ip", StringType, true) ::
+        StructField("nullstr", StringType, true):: Nil)
+
+    assert(expectedSchema === jsonDF.schema)
+    jsonDF.registerTempTable("jsonTable")
+
+    checkAnswer(
+      sql("select nullstr, headers.Host from jsonTable"),
+      Seq(Row("", "1.abc.com"), Row("", null), Row("", null), Row(null, null))
+    )
+  }
+
+  test("Primitive field and type inferring") {
+    val jsonDF = ctx.read.json(primitiveFieldAndType)
+
+    val expectedSchema = StructType(
+      StructField("bigInteger", DecimalType(20, 0), true) ::
+      StructField("boolean", BooleanType, true) ::
+      StructField("double", DoubleType, true) ::
+      StructField("integer", LongType, true) ::
+      StructField("long", LongType, true) ::
+      StructField("null", StringType, true) ::
+      StructField("string", StringType, true) :: Nil)
+
+    assert(expectedSchema === jsonDF.schema)
+
+    jsonDF.registerTempTable("jsonTable")
+
+    checkAnswer(
+      sql("select * from jsonTable"),
+      Row(new java.math.BigDecimal("92233720368547758070"),
+        true,
+        1.7976931348623157E308,
+        10,
+        21474836470L,
+        null,
+        "this is a simple string.")
+    )
+  }
+
+  test("Complex field and type inferring") {
+    val jsonDF = ctx.read.json(complexFieldAndType1)
+
+    val expectedSchema = StructType(
+      StructField("arrayOfArray1", ArrayType(ArrayType(StringType, true), 
true), true) ::
+      StructField("arrayOfArray2", ArrayType(ArrayType(DoubleType, true), 
true), true) ::
+      StructField("arrayOfBigInteger", ArrayType(DecimalType(21, 0), true), 
true) ::
+      StructField("arrayOfBoolean", ArrayType(BooleanType, true), true) ::
+      StructField("arrayOfDouble", ArrayType(DoubleType, true), true) ::
+      StructField("arrayOfInteger", ArrayType(LongType, true), true) ::
+      StructField("arrayOfLong", ArrayType(LongType, true), true) ::
+      StructField("arrayOfNull", ArrayType(StringType, true), true) ::
+      StructField("arrayOfString", ArrayType(StringType, true), true) ::
+      StructField("arrayOfStruct", ArrayType(
+        StructType(
+          StructField("field1", BooleanType, true) ::
+          StructField("field2", StringType, true) ::
+          StructField("field3", StringType, true) :: Nil), true), true) ::
+      StructField("struct", StructType(
+        StructField("field1", BooleanType, true) ::
+        StructField("field2", DecimalType(20, 0), true) :: Nil), true) ::
+      StructField("structWithArrayFields", StructType(
+        StructField("field1", ArrayType(LongType, true), true) ::
+        StructField("field2", ArrayType(StringType, true), true) :: Nil), 
true) :: Nil)
+
+    assert(expectedSchema === jsonDF.schema)
+
+    jsonDF.registerTempTable("jsonTable")
+
+    // Access elements of a primitive array.
+    checkAnswer(
+      sql("select arrayOfString[0], arrayOfString[1], arrayOfString[2] from 
jsonTable"),
+      Row("str1", "str2", null)
+    )
+
+    // Access an array of null values.
+    checkAnswer(
+      sql("select arrayOfNull from jsonTable"),
+      Row(Seq(null, null, null, null))
+    )
+
+    // Access elements of a BigInteger array (we use DecimalType internally).
+    checkAnswer(
+      sql("select arrayOfBigInteger[0], arrayOfBigInteger[1], 
arrayOfBigInteger[2] from jsonTable"),
+      Row(new java.math.BigDecimal("922337203685477580700"),
+        new java.math.BigDecimal("-922337203685477580800"), null)
+    )
+
+    // Access elements of an array of arrays.
+    checkAnswer(
+      sql("select arrayOfArray1[0], arrayOfArray1[1] from jsonTable"),
+      Row(Seq("1", "2", "3"), Seq("str1", "str2"))
+    )
+
+    // Access elements of an array of arrays.
+    checkAnswer(
+      sql("select arrayOfArray2[0], arrayOfArray2[1] from jsonTable"),
+      Row(Seq(1.0, 2.0, 3.0), Seq(1.1, 2.1, 3.1))
+    )
+
+    // Access elements of an array inside a filed with the type of 
ArrayType(ArrayType).
+    checkAnswer(
+      sql("select arrayOfArray1[1][1], arrayOfArray2[1][1] from jsonTable"),
+      Row("str2", 2.1)
+    )
+
+    // Access elements of an array of structs.
+    checkAnswer(
+      sql("select arrayOfStruct[0], arrayOfStruct[1], arrayOfStruct[2], 
arrayOfStruct[3] " +
+        "from jsonTable"),
+      Row(
+        Row(true, "str1", null),
+        Row(false, null, null),
+        Row(null, null, null),
+        null)
+    )
+
+    // Access a struct and fields inside of it.
+    checkAnswer(
+      sql("select struct, struct.field1, struct.field2 from jsonTable"),
+      Row(
+        Row(true, new java.math.BigDecimal("92233720368547758070")),
+        true,
+        new java.math.BigDecimal("92233720368547758070")) :: Nil
+    )
+
+    // Access an array field of a struct.
+    checkAnswer(
+      sql("select structWithArrayFields.field1, structWithArrayFields.field2 
from jsonTable"),
+      Row(Seq(4, 5, 6), Seq("str1", "str2"))
+    )
+
+    // Access elements of an array field of a struct.
+    checkAnswer(
+      sql("select structWithArrayFields.field1[1], 
structWithArrayFields.field2[3] from jsonTable"),
+      Row(5, null)
+    )
+  }
+
+  test("GetField operation on complex data type") {
+    val jsonDF = ctx.read.json(complexFieldAndType1)
+    jsonDF.registerTempTable("jsonTable")
+
+    checkAnswer(
+      sql("select arrayOfStruct[0].field1, arrayOfStruct[0].field2 from 
jsonTable"),
+      Row(true, "str1")
+    )
+
+    // Getting all values of a specific field from an array of structs.
+    checkAnswer(
+      sql("select arrayOfStruct.field1, arrayOfStruct.field2 from jsonTable"),
+      Row(Seq(true, false, null), Seq("str1", null, null))
+    )
+  }
+
+  test("Type conflict in primitive field values") {
+    val jsonDF = ctx.read.json(primitiveFieldValueTypeConflict)
+
+    val expectedSchema = StructType(
+      StructField("num_bool", StringType, true) ::
+      StructField("num_num_1", LongType, true) ::
+      StructField("num_num_2", DoubleType, true) ::
+      StructField("num_num_3", DoubleType, true) ::
+      StructField("num_str", StringType, true) ::
+      StructField("str_bool", StringType, true) :: Nil)
+
+    assert(expectedSchema === jsonDF.schema)
+
+    jsonDF.registerTempTable("jsonTable")
+
+    checkAnswer(
+      sql("select * from jsonTable"),
+      Row("true", 11L, null, 1.1, "13.1", "str1") ::
+        Row("12", null, 21474836470.9, null, null, "true") ::
+        Row("false", 21474836470L, 92233720368547758070d, 100, "str1", 
"false") ::
+        Row(null, 21474836570L, 1.1, 21474836470L, "92233720368547758070", 
null) :: Nil
+    )
+
+    // Number and Boolean conflict: resolve the type as number in this query.
+    checkAnswer(
+      sql("select num_bool - 10 from jsonTable where num_bool > 11"),
+      Row(2)
+    )
+
+    // Widening to LongType
+    checkAnswer(
+      sql("select num_num_1 - 100 from jsonTable where num_num_1 > 11"),
+      Row(21474836370L) :: Row(21474836470L) :: Nil
+    )
+
+    checkAnswer(
+      sql("select num_num_1 - 100 from jsonTable where num_num_1 > 10"),
+      Row(-89) :: Row(21474836370L) :: Row(21474836470L) :: Nil
+    )
+
+    // Widening to DecimalType
+    checkAnswer(
+      sql("select num_num_2 + 1.3 from jsonTable where num_num_2 > 1.1"),
+      Row(21474836472.2) ::
+        Row(92233720368547758071.3) :: Nil
+    )
+
+    // Widening to Double
+    checkAnswer(
+      sql("select num_num_3 + 1.2 from jsonTable where num_num_3 > 1.1"),
+      Row(101.2) :: Row(21474836471.2) :: Nil
+    )
+
+    // Number and String conflict: resolve the type as number in this query.
+    checkAnswer(
+      sql("select num_str + 1.2 from jsonTable where num_str > 14"),
+      Row(BigDecimal("92233720368547758071.2"))
+    )
+
+    // Number and String conflict: resolve the type as number in this query.
+    checkAnswer(
+      sql("select num_str + 1.2 from jsonTable where num_str >= 
92233720368547758060"),
+      Row(new java.math.BigDecimal("92233720368547758071.2"))
+    )
+
+    // String and Boolean conflict: resolve the type as string.
+    checkAnswer(
+      sql("select * from jsonTable where str_bool = 'str1'"),
+      Row("true", 11L, null, 1.1, "13.1", "str1")
+    )
+  }
+
+  ignore("Type conflict in primitive field values (Ignored)") {
+    val jsonDF = ctx.read.json(primitiveFieldValueTypeConflict)
+    jsonDF.registerTempTable("jsonTable")
+
+    // Right now, the analyzer does not promote strings in a boolean 
expression.
+    // Number and Boolean conflict: resolve the type as boolean in this query.
+    checkAnswer(
+      sql("select num_bool from jsonTable where NOT num_bool"),
+      Row(false)
+    )
+
+    checkAnswer(
+      sql("select str_bool from jsonTable where NOT str_bool"),
+      Row(false)
+    )
+
+    // Right now, the analyzer does not know that num_bool should be treated 
as a boolean.
+    // Number and Boolean conflict: resolve the type as boolean in this query.
+    checkAnswer(
+      sql("select num_bool from jsonTable where num_bool"),
+      Row(true)
+    )
+
+    checkAnswer(
+      sql("select str_bool from jsonTable where str_bool"),
+      Row(false)
+    )
+
+    // The plan of the following DSL is
+    // Project [(CAST(num_str#65:4, DoubleType) + 1.2) AS num#78]
+    //  Filter (CAST(CAST(num_str#65:4, DoubleType), DecimalType) > 
92233720368547758060)
+    //    ExistingRdd 
[num_bool#61,num_num_1#62L,num_num_2#63,num_num_3#64,num_str#65,str_bool#66]
+    // We should directly cast num_str to DecimalType and also need to do the 
right type promotion
+    // in the Project.
+    checkAnswer(
+      jsonDF.
+        where('num_str >= BigDecimal("92233720368547758060")).
+        select(('num_str + 1.2).as("num")),
+      Row(new java.math.BigDecimal("92233720368547758071.2").doubleValue())
+    )
+
+    // The following test will fail. The type of num_str is StringType.
+    // So, to evaluate num_str + 1.2, we first need to use Cast to convert the 
type.
+    // In our test data, one value of num_str is 13.1.
+    // The result of (CAST(num_str#65:4, DoubleType) + 1.2) for this value is 
14.299999999999999,
+    // which is not 14.3.
+    // Number and String conflict: resolve the type as number in this query.
+    checkAnswer(
+      sql("select num_str + 1.2 from jsonTable where num_str > 13"),
+      Row(BigDecimal("14.3")) :: Row(BigDecimal("92233720368547758071.2")) :: 
Nil
+    )
+  }
+
+  test("Type conflict in complex field values") {
+    val jsonDF = ctx.read.json(complexFieldValueTypeConflict)
+
+    val expectedSchema = StructType(
+      StructField("array", ArrayType(LongType, true), true) ::
+      StructField("num_struct", StringType, true) ::
+      StructField("str_array", StringType, true) ::
+      StructField("struct", StructType(
+        StructField("field", StringType, true) :: Nil), true) ::
+      StructField("struct_array", StringType, true) :: Nil)
+
+    assert(expectedSchema === jsonDF.schema)
+
+    jsonDF.registerTempTable("jsonTable")
+
+    checkAnswer(
+      sql("select * from jsonTable"),
+      Row(Seq(), "11", "[1,2,3]", Row(null), "[]") ::
+        Row(null, """{"field":false}""", null, null, "{}") ::
+        Row(Seq(4, 5, 6), null, "str", Row(null), "[7,8,9]") ::
+        Row(Seq(7), "{}", """["str1","str2",33]""", Row("str"), 
"""{"field":true}""") :: Nil
+    )
+  }
+
+  test("Type conflict in array elements") {
+    val jsonDF = ctx.read.json(arrayElementTypeConflict)
+
+    val expectedSchema = StructType(
+      StructField("array1", ArrayType(StringType, true), true) ::
+      StructField("array2", ArrayType(StructType(
+        StructField("field", LongType, true) :: Nil), true), true) ::
+      StructField("array3", ArrayType(StringType, true), true) :: Nil)
+
+    assert(expectedSchema === jsonDF.schema)
+
+    jsonDF.registerTempTable("jsonTable")
+
+    checkAnswer(
+      sql("select * from jsonTable"),
+      Row(Seq("1", "1.1", "true", null, "[]", "{}", "[2,3,4]",
+        """{"field":"str"}"""), Seq(Row(214748364700L), Row(1)), null) ::
+      Row(null, null, Seq("""{"field":"str"}""", """{"field":1}""")) ::
+      Row(null, null, Seq("1", "2", "3")) :: Nil
+    )
+
+    // Treat an element as a number.
+    checkAnswer(
+      sql("select array1[0] + 1 from jsonTable where array1 is not null"),
+      Row(2)
+    )
+  }
+
+  test("Handling missing fields") {
+    val jsonDF = ctx.read.json(missingFields)
+
+    val expectedSchema = StructType(
+      StructField("a", BooleanType, true) ::
+      StructField("b", LongType, true) ::
+      StructField("c", ArrayType(LongType, true), true) ::
+      StructField("d", StructType(
+        StructField("field", BooleanType, true) :: Nil), true) ::
+      StructField("e", StringType, true) :: Nil)
+
+    assert(expectedSchema === jsonDF.schema)
+
+    jsonDF.registerTempTable("jsonTable")
+  }
+
+  test("jsonFile should be based on JSONRelation") {
+    val dir = Utils.createTempDir()
+    dir.delete()
+    val path = dir.getCanonicalFile.toURI.toString
+    ctx.sparkContext.parallelize(1 to 100)
+      .map(i => s"""{"a": 1, "b": "str$i"}""").saveAsTextFile(path)
+    val jsonDF = ctx.read.option("samplingRatio", "0.49").json(path)
+
+    val analyzed = jsonDF.queryExecution.analyzed
+    assert(
+      analyzed.isInstanceOf[LogicalRelation],
+      "The DataFrame returned by jsonFile should be based on LogicalRelation.")
+    val relation = analyzed.asInstanceOf[LogicalRelation].relation
+    assert(
+      relation.isInstanceOf[JSONRelation],
+      "The DataFrame returned by jsonFile should be based on JSONRelation.")
+    assert(relation.asInstanceOf[JSONRelation].paths === Array(path))
+    assert(relation.asInstanceOf[JSONRelation].samplingRatio === (0.49 +- 
0.001))
+
+    val schema = StructType(StructField("a", LongType, true) :: Nil)
+    val logicalRelation =
+      
ctx.read.schema(schema).json(path).queryExecution.analyzed.asInstanceOf[LogicalRelation]
+    val relationWithSchema = 
logicalRelation.relation.asInstanceOf[JSONRelation]
+    assert(relationWithSchema.paths === Array(path))
+    assert(relationWithSchema.schema === schema)
+    assert(relationWithSchema.samplingRatio > 0.99)
+  }
+
+  test("Loading a JSON dataset from a text file") {
+    val dir = Utils.createTempDir()
+    dir.delete()
+    val path = dir.getCanonicalPath
+    primitiveFieldAndType.map(record => record.replaceAll("\n", " 
")).saveAsTextFile(path)
+    val jsonDF = ctx.read.json(path)
+
+    val expectedSchema = StructType(
+      StructField("bigInteger", DecimalType(20, 0), true) ::
+      StructField("boolean", BooleanType, true) ::
+      StructField("double", DoubleType, true) ::
+      StructField("integer", LongType, true) ::
+      StructField("long", LongType, true) ::
+      StructField("null", StringType, true) ::
+      StructField("string", StringType, true) :: Nil)
+
+    assert(expectedSchema === jsonDF.schema)
+
+    jsonDF.registerTempTable("jsonTable")
+
+    checkAnswer(
+      sql("select * from jsonTable"),
+      Row(new java.math.BigDecimal("92233720368547758070"),
+      true,
+      1.7976931348623157E308,
+      10,
+      21474836470L,
+      null,
+      "this is a simple string.")
+    )
+  }
+
+  test("Loading a JSON dataset from a text file with SQL") {
+    val dir = Utils.createTempDir()
+    dir.delete()
+    val path = dir.getCanonicalPath
+    primitiveFieldAndType.map(record => record.replaceAll("\n", " 
")).saveAsTextFile(path)
+
+    sql(
+      s"""
+        |CREATE TEMPORARY TABLE jsonTableSQL
+        |USING org.apache.spark.sql.json
+        |OPTIONS (
+        |  path '$path'
+        |)
+      """.stripMargin)
+
+    checkAnswer(
+      sql("select * from jsonTableSQL"),
+      Row(new java.math.BigDecimal("92233720368547758070"),
+        true,
+        1.7976931348623157E308,
+        10,
+        21474836470L,
+        null,
+        "this is a simple string.")
+    )
+  }
+
+  test("Applying schemas") {
+    val dir = Utils.createTempDir()
+    dir.delete()
+    val path = dir.getCanonicalPath
+    primitiveFieldAndType.map(record => record.replaceAll("\n", " 
")).saveAsTextFile(path)
+
+    val schema = StructType(
+      StructField("bigInteger", DecimalType.SYSTEM_DEFAULT, true) ::
+      StructField("boolean", BooleanType, true) ::
+      StructField("double", DoubleType, true) ::
+      StructField("integer", IntegerType, true) ::
+      StructField("long", LongType, true) ::
+      StructField("null", StringType, true) ::
+      StructField("string", StringType, true) :: Nil)
+
+    val jsonDF1 = ctx.read.schema(schema).json(path)
+
+    assert(schema === jsonDF1.schema)
+
+    jsonDF1.registerTempTable("jsonTable1")
+
+    checkAnswer(
+      sql("select * from jsonTable1"),
+      Row(new java.math.BigDecimal("92233720368547758070"),
+      true,
+      1.7976931348623157E308,
+      10,
+      21474836470L,
+      null,
+      "this is a simple string.")
+    )
+
+    val jsonDF2 = ctx.read.schema(schema).json(primitiveFieldAndType)
+
+    assert(schema === jsonDF2.schema)
+
+    jsonDF2.registerTempTable("jsonTable2")
+
+    checkAnswer(
+      sql("select * from jsonTable2"),
+      Row(new java.math.BigDecimal("92233720368547758070"),
+      true,
+      1.7976931348623157E308,
+      10,
+      21474836470L,
+      null,
+      "this is a simple string.")
+    )
+  }
+
+  test("Applying schemas with MapType") {
+    val schemaWithSimpleMap = StructType(
+      StructField("map", MapType(StringType, IntegerType, true), false) :: Nil)
+    val jsonWithSimpleMap = ctx.read.schema(schemaWithSimpleMap).json(mapType1)
+
+    jsonWithSimpleMap.registerTempTable("jsonWithSimpleMap")
+
+    checkAnswer(
+      sql("select map from jsonWithSimpleMap"),
+      Row(Map("a" -> 1)) ::
+      Row(Map("b" -> 2)) ::
+      Row(Map("c" -> 3)) ::
+      Row(Map("c" -> 1, "d" -> 4)) ::
+      Row(Map("e" -> null)) :: Nil
+    )
+
+    checkAnswer(
+      sql("select map['c'] from jsonWithSimpleMap"),
+      Row(null) ::
+      Row(null) ::
+      Row(3) ::
+      Row(1) ::
+      Row(null) :: Nil
+    )
+
+    val innerStruct = StructType(
+      StructField("field1", ArrayType(IntegerType, true), true) ::
+      StructField("field2", IntegerType, true) :: Nil)
+    val schemaWithComplexMap = StructType(
+      StructField("map", MapType(StringType, innerStruct, true), false) :: Nil)
+
+    val jsonWithComplexMap = 
ctx.read.schema(schemaWithComplexMap).json(mapType2)
+
+    jsonWithComplexMap.registerTempTable("jsonWithComplexMap")
+
+    checkAnswer(
+      sql("select map from jsonWithComplexMap"),
+      Row(Map("a" -> Row(Seq(1, 2, 3, null), null))) ::
+      Row(Map("b" -> Row(null, 2))) ::
+      Row(Map("c" -> Row(Seq(), 4))) ::
+      Row(Map("c" -> Row(null, 3), "d" -> Row(Seq(null), null))) ::
+      Row(Map("e" -> null)) ::
+      Row(Map("f" -> Row(null, null))) :: Nil
+    )
+
+    checkAnswer(
+      sql("select map['a'].field1, map['c'].field2 from jsonWithComplexMap"),
+      Row(Seq(1, 2, 3, null), null) ::
+      Row(null, null) ::
+      Row(null, 4) ::
+      Row(null, 3) ::
+      Row(null, null) ::
+      Row(null, null) :: Nil
+    )
+  }
+
+  test("SPARK-2096 Correctly parse dot notations") {
+    val jsonDF = ctx.read.json(complexFieldAndType2)
+    jsonDF.registerTempTable("jsonTable")
+
+    checkAnswer(
+      sql("select arrayOfStruct[0].field1, arrayOfStruct[0].field2 from 
jsonTable"),
+      Row(true, "str1")
+    )
+    checkAnswer(
+      sql(
+        """
+          |select complexArrayOfStruct[0].field1[1].inner2[0], 
complexArrayOfStruct[1].field2[0][1]
+          |from jsonTable
+        """.stripMargin),
+      Row("str2", 6)
+    )
+  }
+
+  test("SPARK-3390 Complex arrays") {
+    val jsonDF = ctx.read.json(complexFieldAndType2)
+    jsonDF.registerTempTable("jsonTable")
+
+    checkAnswer(
+      sql(
+        """
+          |select arrayOfArray1[0][0][0], arrayOfArray1[1][0][1], 
arrayOfArray1[1][1][0]
+          |from jsonTable
+        """.stripMargin),
+      Row(5, 7, 8)
+    )
+    checkAnswer(
+      sql(
+        """
+          |select arrayOfArray2[0][0][0].inner1, arrayOfArray2[1][0],
+          |arrayOfArray2[1][1][1].inner2[0], 
arrayOfArray2[2][0][0].inner3[0][0].inner4
+          |from jsonTable
+        """.stripMargin),
+      Row("str1", Nil, "str4", 2)
+    )
+  }
+
+  test("SPARK-3308 Read top level JSON arrays") {
+    val jsonDF = ctx.read.json(jsonArray)
+    jsonDF.registerTempTable("jsonTable")
+
+    checkAnswer(
+      sql(
+        """
+          |select a, b, c
+          |from jsonTable
+        """.stripMargin),
+      Row("str_a_1", null, null) ::
+        Row("str_a_2", null, null) ::
+        Row(null, "str_b_3", null) ::
+        Row("str_a_4", "str_b_4", "str_c_4") :: Nil
+    )
+  }
+
+  test("Corrupt records") {
+    // Test if we can query corrupt records.
+    val oldColumnNameOfCorruptRecord = ctx.conf.columnNameOfCorruptRecord
+    ctx.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, "_unparsed")
+
+    val jsonDF = ctx.read.json(corruptRecords)
+    jsonDF.registerTempTable("jsonTable")
+
+    val schema = StructType(
+      StructField("_unparsed", StringType, true) ::
+      StructField("a", StringType, true) ::
+      StructField("b", StringType, true) ::
+      StructField("c", StringType, true) :: Nil)
+
+    assert(schema === jsonDF.schema)
+
+    // In HiveContext, backticks should be used to access columns starting 
with a underscore.
+    checkAnswer(
+      sql(
+        """
+          |SELECT a, b, c, _unparsed
+          |FROM jsonTable
+        """.stripMargin),
+      Row(null, null, null, "{") ::
+        Row(null, null, null, "") ::
+        Row(null, null, null, """{"a":1, b:2}""") ::
+        Row(null, null, null, """{"a":{, b:3}""") ::
+        Row("str_a_4", "str_b_4", "str_c_4", null) ::
+        Row(null, null, null, "]") :: Nil
+    )
+
+    checkAnswer(
+      sql(
+        """
+          |SELECT a, b, c
+          |FROM jsonTable
+          |WHERE _unparsed IS NULL
+        """.stripMargin),
+      Row("str_a_4", "str_b_4", "str_c_4")
+    )
+
+    checkAnswer(
+      sql(
+        """
+          |SELECT _unparsed
+          |FROM jsonTable
+          |WHERE _unparsed IS NOT NULL
+        """.stripMargin),
+      Row("{") ::
+        Row("") ::
+        Row("""{"a":1, b:2}""") ::
+        Row("""{"a":{, b:3}""") ::
+        Row("]") :: Nil
+    )
+
+    ctx.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, 
oldColumnNameOfCorruptRecord)
+  }
+
+  test("SPARK-4068: nulls in arrays") {
+    val jsonDF = ctx.read.json(nullsInArrays)
+    jsonDF.registerTempTable("jsonTable")
+
+    val schema = StructType(
+      StructField("field1",
+        ArrayType(ArrayType(ArrayType(ArrayType(StringType, true), true), 
true), true), true) ::
+      StructField("field2",
+        ArrayType(ArrayType(
+          StructType(StructField("Test", LongType, true) :: Nil), true), 
true), true) ::
+      StructField("field3",
+        ArrayType(ArrayType(
+          StructType(StructField("Test", StringType, true) :: Nil), true), 
true), true) ::
+      StructField("field4",
+        ArrayType(ArrayType(ArrayType(LongType, true), true), true), true) :: 
Nil)
+
+    assert(schema === jsonDF.schema)
+
+    checkAnswer(
+      sql(
+        """
+          |SELECT field1, field2, field3, field4
+          |FROM jsonTable
+        """.stripMargin),
+      Row(Seq(Seq(null), Seq(Seq(Seq("Test")))), null, null, null) ::
+        Row(null, Seq(null, Seq(Row(1))), null, null) ::
+        Row(null, null, Seq(Seq(null), Seq(Row("2"))), null) ::
+        Row(null, null, null, Seq(Seq(null, Seq(1, 2, 3)))) :: Nil
+    )
+  }
+
+  test("SPARK-4228 DataFrame to JSON") {
+    val schema1 = StructType(
+      StructField("f1", IntegerType, false) ::
+      StructField("f2", StringType, false) ::
+      StructField("f3", BooleanType, false) ::
+      StructField("f4", ArrayType(StringType), nullable = true) ::
+      StructField("f5", IntegerType, true) :: Nil)
+
+    val rowRDD1 = unparsedStrings.map { r =>
+      val values = r.split(",").map(_.trim)
+      val v5 = try values(3).toInt catch {
+        case _: NumberFormatException => null
+      }
+      Row(values(0).toInt, values(1), values(2).toBoolean, 
r.split(",").toList, v5)
+    }
+
+    val df1 = ctx.createDataFrame(rowRDD1, schema1)
+    df1.registerTempTable("applySchema1")
+    val df2 = df1.toDF
+    val result = df2.toJSON.collect()
+    // scalastyle:off
+    assert(result(0) === "{\"f1\":1,\"f2\":\"A1\",\"f3\":true,\"f4\":[\"1\",\" 
A1\",\" true\",\" null\"]}")
+    assert(result(3) === "{\"f1\":4,\"f2\":\"D4\",\"f3\":true,\"f4\":[\"4\",\" 
D4\",\" true\",\" 2147483644\"],\"f5\":2147483644}")
+    // scalastyle:on
+
+    val schema2 = StructType(
+      StructField("f1", StructType(
+        StructField("f11", IntegerType, false) ::
+        StructField("f12", BooleanType, false) :: Nil), false) ::
+      StructField("f2", MapType(StringType, IntegerType, true), false) :: Nil)
+
+    val rowRDD2 = unparsedStrings.map { r =>
+      val values = r.split(",").map(_.trim)
+      val v4 = try values(3).toInt catch {
+        case _: NumberFormatException => null
+      }
+      Row(Row(values(0).toInt, values(2).toBoolean), Map(values(1) -> v4))
+    }
+
+    val df3 = ctx.createDataFrame(rowRDD2, schema2)
+    df3.registerTempTable("applySchema2")
+    val df4 = df3.toDF
+    val result2 = df4.toJSON.collect()
+
+    assert(result2(1) === 
"{\"f1\":{\"f11\":2,\"f12\":false},\"f2\":{\"B2\":null}}")
+    assert(result2(3) === 
"{\"f1\":{\"f11\":4,\"f12\":true},\"f2\":{\"D4\":2147483644}}")
+
+    val jsonDF = ctx.read.json(primitiveFieldAndType)
+    val primTable = ctx.read.json(jsonDF.toJSON)
+    primTable.registerTempTable("primativeTable")
+    checkAnswer(
+        sql("select * from primativeTable"),
+      Row(new java.math.BigDecimal("92233720368547758070"),
+        true,
+        1.7976931348623157E308,
+        10,
+        21474836470L,
+        "this is a simple string.")
+      )
+
+    val complexJsonDF = ctx.read.json(complexFieldAndType1)
+    val compTable = ctx.read.json(complexJsonDF.toJSON)
+    compTable.registerTempTable("complexTable")
+    // Access elements of a primitive array.
+    checkAnswer(
+      sql("select arrayOfString[0], arrayOfString[1], arrayOfString[2] from 
complexTable"),
+      Row("str1", "str2", null)
+    )
+
+    // Access an array of null values.
+    checkAnswer(
+      sql("select arrayOfNull from complexTable"),
+      Row(Seq(null, null, null, null))
+    )
+
+    // Access elements of a BigInteger array (we use DecimalType internally).
+    checkAnswer(
+      sql("select arrayOfBigInteger[0], arrayOfBigInteger[1], 
arrayOfBigInteger[2] " +
+        " from complexTable"),
+      Row(new java.math.BigDecimal("922337203685477580700"),
+        new java.math.BigDecimal("-922337203685477580800"), null)
+    )
+
+    // Access elements of an array of arrays.
+    checkAnswer(
+      sql("select arrayOfArray1[0], arrayOfArray1[1] from complexTable"),
+      Row(Seq("1", "2", "3"), Seq("str1", "str2"))
+    )
+
+    // Access elements of an array of arrays.
+    checkAnswer(
+      sql("select arrayOfArray2[0], arrayOfArray2[1] from complexTable"),
+      Row(Seq(1.0, 2.0, 3.0), Seq(1.1, 2.1, 3.1))
+    )
+
+    // Access elements of an array inside a filed with the type of 
ArrayType(ArrayType).
+    checkAnswer(
+      sql("select arrayOfArray1[1][1], arrayOfArray2[1][1] from complexTable"),
+      Row("str2", 2.1)
+    )
+
+    // Access a struct and fields inside of it.
+    checkAnswer(
+      sql("select struct, struct.field1, struct.field2 from complexTable"),
+      Row(
+        Row(true, new java.math.BigDecimal("92233720368547758070")),
+        true,
+        new java.math.BigDecimal("92233720368547758070")) :: Nil
+    )
+
+    // Access an array field of a struct.
+    checkAnswer(
+      sql("select structWithArrayFields.field1, structWithArrayFields.field2 
from complexTable"),
+      Row(Seq(4, 5, 6), Seq("str1", "str2"))
+    )
+
+    // Access elements of an array field of a struct.
+    checkAnswer(
+      sql("select structWithArrayFields.field1[1], 
structWithArrayFields.field2[3] " +
+        "from complexTable"),
+      Row(5, null)
+    )
+  }
+
+  test("JSONRelation equality test") {
+    val context = org.apache.spark.sql.test.TestSQLContext
+
+    val relation0 = new JSONRelation(
+      Some(empty),
+      1.0,
+      Some(StructType(StructField("a", IntegerType, true) :: Nil)),
+      None, None)(context)
+    val logicalRelation0 = LogicalRelation(relation0)
+    val relation1 = new JSONRelation(
+      Some(singleRow),
+      1.0,
+      Some(StructType(StructField("a", IntegerType, true) :: Nil)),
+      None, None)(context)
+    val logicalRelation1 = LogicalRelation(relation1)
+    val relation2 = new JSONRelation(
+      Some(singleRow),
+      0.5,
+      Some(StructType(StructField("a", IntegerType, true) :: Nil)),
+      None, None)(context)
+    val logicalRelation2 = LogicalRelation(relation2)
+    val relation3 = new JSONRelation(
+      Some(singleRow),
+      1.0,
+      Some(StructType(StructField("b", IntegerType, true) :: Nil)),
+      None, None)(context)
+    val logicalRelation3 = LogicalRelation(relation3)
+
+    assert(relation0 !== relation1)
+    assert(!logicalRelation0.sameResult(logicalRelation1),
+      s"$logicalRelation0 and $logicalRelation1 should be considered not 
having the same result.")
+
+    assert(relation1 === relation2)
+    assert(logicalRelation1.sameResult(logicalRelation2),
+      s"$logicalRelation1 and $logicalRelation2 should be considered having 
the same result.")
+
+    assert(relation1 !== relation3)
+    assert(!logicalRelation1.sameResult(logicalRelation3),
+      s"$logicalRelation1 and $logicalRelation3 should be considered not 
having the same result.")
+
+    assert(relation2 !== relation3)
+    assert(!logicalRelation2.sameResult(logicalRelation3),
+      s"$logicalRelation2 and $logicalRelation3 should be considered not 
having the same result.")
+
+    withTempPath(dir => {
+      val path = dir.getCanonicalFile.toURI.toString
+      ctx.sparkContext.parallelize(1 to 100)
+        .map(i => s"""{"a": 1, "b": "str$i"}""").saveAsTextFile(path)
+
+      val d1 = ResolvedDataSource(
+        context,
+        userSpecifiedSchema = None,
+        partitionColumns = Array.empty[String],
+        provider = classOf[DefaultSource].getCanonicalName,
+        options = Map("path" -> path))
+
+      val d2 = ResolvedDataSource(
+        context,
+        userSpecifiedSchema = None,
+        partitionColumns = Array.empty[String],
+        provider = classOf[DefaultSource].getCanonicalName,
+        options = Map("path" -> path))
+      assert(d1 === d2)
+    })
+  }
+
+  test("SPARK-6245 JsonRDD.inferSchema on empty RDD") {
+    // This is really a test that it doesn't throw an exception
+    val emptySchema = InferSchema(empty, 1.0, "")
+    assert(StructType(Seq()) === emptySchema)
+  }
+
+  test("SPARK-7565 MapType in JsonRDD") {
+    val oldColumnNameOfCorruptRecord = ctx.conf.columnNameOfCorruptRecord
+    ctx.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, "_unparsed")
+
+    val schemaWithSimpleMap = StructType(
+      StructField("map", MapType(StringType, IntegerType, true), false) :: Nil)
+    try {
+      val temp = Utils.createTempDir().getPath
+
+      val df = ctx.read.schema(schemaWithSimpleMap).json(mapType1)
+      df.write.mode("overwrite").parquet(temp)
+      // order of MapType is not defined
+      assert(ctx.read.parquet(temp).count() == 5)
+
+      val df2 = ctx.read.json(corruptRecords)
+      df2.write.mode("overwrite").parquet(temp)
+      checkAnswer(ctx.read.parquet(temp), df2.collect())
+    } finally {
+      ctx.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, 
oldColumnNameOfCorruptRecord)
+    }
+  }
+
+  test("SPARK-8093 Erase empty structs") {
+    val emptySchema = InferSchema(emptyRecords, 1.0, "")
+    assert(StructType(Seq()) === emptySchema)
+  }
+
+  test("JSON with Partition") {
+    def makePartition(rdd: RDD[String], parent: File, partName: String, 
partValue: Any): File = {
+      val p = new File(parent, s"$partName=${partValue.toString}")
+      rdd.saveAsTextFile(p.getCanonicalPath)
+      p
+    }
+
+    withTempPath(root => {
+      val d1 = new File(root, "d1=1")
+      // root/dt=1/col1=abc
+      val p1_col1 = makePartition(
+        ctx.sparkContext.parallelize(2 to 5).map(i => s"""{"a": 1, "b": 
"str$i"}"""),
+        d1,
+        "col1",
+        "abc")
+
+      // root/dt=1/col1=abd
+      val p2 = makePartition(
+        ctx.sparkContext.parallelize(6 to 10).map(i => s"""{"a": 1, "b": 
"str$i"}"""),
+        d1,
+        "col1",
+        "abd")
+
+        
ctx.read.json(root.getAbsolutePath).registerTempTable("test_myjson_with_part")
+        checkAnswer(
+          sql("SELECT count(a) FROM test_myjson_with_part where d1 = 1 and 
col1='abc'"), Row(4))
+        checkAnswer(
+          sql("SELECT count(a) FROM test_myjson_with_part where d1 = 1 and 
col1='abd'"), Row(5))
+        checkAnswer(sql("SELECT count(a) FROM test_myjson_with_part where d1 = 
1"), Row(9))
+    })
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to