Repository: spark
Updated Branches:
  refs/heads/master 74335b310 -> 0ba98c04c


[SPARK-8753][SQL] Create an IntervalType data type

We need a new data type to represent time intervals. Because we can't determine 
how many days in a month, so we need 2 values for interval: a int `months`, a 
long `microseconds`.

The interval literal syntax looks like:
`interval 3 years -4 month 4 weeks 3 second`

Because we use number of 100ns as value of `TimestampType`, so it may not makes 
sense to support nano second unit.

Author: Wenchen Fan <cloud0...@outlook.com>

Closes #7226 from cloud-fan/interval and squashes the following commits:

632062d [Wenchen Fan] address comments
ac348c3 [Wenchen Fan] use case class
0342d2e [Wenchen Fan] use array byte
df9256c [Wenchen Fan] fix style
fd6f18a [Wenchen Fan] address comments
1856af3 [Wenchen Fan] support interval type


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0ba98c04
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0ba98c04
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0ba98c04

Branch: refs/heads/master
Commit: 0ba98c04c726a827df8cb19b0db17c352a647960
Parents: 74335b3
Author: Wenchen Fan <cloud0...@outlook.com>
Authored: Wed Jul 8 10:51:32 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Jul 8 10:51:32 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/types/DataTypes.java   |  5 ++
 .../apache/spark/sql/catalyst/SqlParser.scala   | 86 +++++++++++++++-----
 .../apache/spark/sql/types/IntervalType.scala   | 37 +++++++++
 .../apache/spark/sql/types/TimestampType.scala  |  2 +-
 .../org/apache/spark/sql/sources/ddl.scala      |  3 +
 .../org/apache/spark/sql/SQLQuerySuite.scala    | 25 ++++++
 .../org/apache/spark/unsafe/types/Interval.java | 47 +++++++++++
 7 files changed, 185 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0ba98c04/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java
index e457542..d22ad67 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/types/DataTypes.java
@@ -50,6 +50,11 @@ public class DataTypes {
   public static final DataType TimestampType = TimestampType$.MODULE$;
 
   /**
+   * Gets the IntervalType object.
+   */
+  public static final DataType IntervalType = IntervalType$.MODULE$;
+
+  /**
    * Gets the DoubleType object.
    */
   public static final DataType DoubleType = DoubleType$.MODULE$;

http://git-wip-us.apache.org/repos/asf/spark/blob/0ba98c04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index e8e9b98..dedd8c8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.Interval
 
 /**
  * A very simple SQL parser.  Based loosely on:
@@ -72,6 +73,7 @@ class SqlParser extends AbstractSparkSQLParser with 
DataTypeParser {
   protected val INNER = Keyword("INNER")
   protected val INSERT = Keyword("INSERT")
   protected val INTERSECT = Keyword("INTERSECT")
+  protected val INTERVAL = Keyword("INTERVAL")
   protected val INTO = Keyword("INTO")
   protected val IS = Keyword("IS")
   protected val JOIN = Keyword("JOIN")
@@ -279,12 +281,12 @@ class SqlParser extends AbstractSparkSQLParser with 
DataTypeParser {
         throw new AnalysisException(s"invalid function approximate $udfName")
       }
     }
-    | APPROXIMATE ~> "(" ~> floatLit ~ ")" ~ ident ~ "(" ~ DISTINCT ~ 
expression <~ ")" ^^
+    | APPROXIMATE ~> "(" ~> unsignedFloat ~ ")" ~ ident ~ "(" ~ DISTINCT ~ 
expression <~ ")" ^^
       { case s ~ _ ~ udfName ~ _ ~ _ ~ exp =>
         if (lexical.normalizeKeyword(udfName) == "count") {
           ApproxCountDistinct(exp, s.toDouble)
         } else {
-          throw new AnalysisException(s"invalid function 
approximate($floatLit) $udfName")
+          throw new AnalysisException(s"invalid function approximate($s) 
$udfName")
         }
       }
     | CASE ~> whenThenElse ^^ CaseWhen
@@ -309,6 +311,7 @@ class SqlParser extends AbstractSparkSQLParser with 
DataTypeParser {
     ( numericLiteral
     | booleanLiteral
     | stringLit ^^ {case s => Literal.create(s, StringType) }
+    | intervalLiteral
     | NULL ^^^ Literal.create(null, NullType)
     )
 
@@ -318,21 +321,71 @@ class SqlParser extends AbstractSparkSQLParser with 
DataTypeParser {
     )
 
   protected lazy val numericLiteral: Parser[Literal] =
-    signedNumericLiteral | unsignedNumericLiteral
-
-  protected lazy val sign: Parser[String] =
-    "+" | "-"
-
-  protected lazy val signedNumericLiteral: Parser[Literal] =
-    ( sign ~ numericLit  ^^ { case s ~ l => Literal(toNarrowestIntegerType(s + 
l)) }
-    | sign ~ floatLit ^^ { case s ~ f => Literal((s + f).toDouble) }
+    ( integral  ^^ { case i => Literal(toNarrowestIntegerType(i)) }
+    | sign.? ~ unsignedFloat ^^ { case s ~ f => Literal((s.getOrElse("") + 
f).toDouble) }
     )
 
-  protected lazy val unsignedNumericLiteral: Parser[Literal] =
-    ( numericLit ^^ { n => Literal(toNarrowestIntegerType(n)) }
-    | floatLit ^^ { f => Literal(f.toDouble) }
+  protected lazy val unsignedFloat: Parser[String] =
+    ( "." ~> numericLit ^^ { u => "0." + u }
+    | elem("decimal", _.isInstanceOf[lexical.FloatLit]) ^^ (_.chars)
     )
 
+  protected lazy val sign: Parser[String] = ("+" | "-")
+
+  protected lazy val integral: Parser[String] =
+    sign.? ~ numericLit ^^ { case s ~ n => s.getOrElse("") + n }
+
+  private def intervalUnit(unitName: String) =
+    acceptIf {
+      case lexical.Identifier(str) =>
+        val normalized = lexical.normalizeKeyword(str)
+        normalized == unitName || normalized == unitName + "s"
+      case _ => false
+    } {_ => "wrong interval unit"}
+
+  protected lazy val month: Parser[Int] =
+    integral <~ intervalUnit("month") ^^ { case num => num.toInt }
+
+  protected lazy val year: Parser[Int] =
+    integral <~ intervalUnit("year") ^^ { case num => num.toInt * 12 }
+
+  protected lazy val microsecond: Parser[Long] =
+    integral <~ intervalUnit("microsecond") ^^ { case num => num.toLong }
+
+  protected lazy val millisecond: Parser[Long] =
+    integral <~ intervalUnit("millisecond") ^^ { case num => num.toLong * 1000 
}
+
+  protected lazy val second: Parser[Long] =
+    integral <~ intervalUnit("second") ^^ { case num => num.toLong * 1000 * 
1000 }
+
+  protected lazy val minute: Parser[Long] =
+    integral <~ intervalUnit("minute") ^^ { case num => num.toLong * 1000 * 
1000 * 60 }
+
+  protected lazy val hour: Parser[Long] =
+    integral <~ intervalUnit("hour") ^^ { case num => num.toLong * 1000 * 1000 
* 3600 }
+
+  protected lazy val day: Parser[Long] =
+    integral <~ intervalUnit("day") ^^ { case num => num.toLong * 1000 * 1000 
* 3600 * 24 }
+
+  protected lazy val week: Parser[Long] =
+    integral <~ intervalUnit("week") ^^ { case num => num.toLong * 1000 * 1000 
* 3600 * 24 * 7 }
+
+  protected lazy val intervalLiteral: Parser[Literal] =
+    INTERVAL ~> year.? ~ month.? ~ week.? ~ day.? ~ hour.? ~ minute.? ~ 
second.? ~
+      millisecond.? ~ microsecond.? ^^ {
+        case year ~ month ~ week ~ day ~ hour ~ minute ~ second ~
+          millisecond ~ microsecond =>
+          if (!Seq(year, month, week, day, hour, minute, second,
+            millisecond, microsecond).exists(_.isDefined)) {
+            throw new AnalysisException(
+              "at least one time unit should be given for interval literal")
+          }
+          val months = Seq(year, month).map(_.getOrElse(0)).sum
+          val microseconds = Seq(week, day, hour, minute, second, millisecond, 
microsecond)
+            .map(_.getOrElse(0L)).sum
+          Literal.create(new Interval(months, microseconds), IntervalType)
+      }
+
   private def toNarrowestIntegerType(value: String): Any = {
     val bigIntValue = BigDecimal(value)
 
@@ -343,11 +396,6 @@ class SqlParser extends AbstractSparkSQLParser with 
DataTypeParser {
     }
   }
 
-  protected lazy val floatLit: Parser[String] =
-    ( "." ~> unsignedNumericLiteral ^^ { u => "0." + u }
-    | elem("decimal", _.isInstanceOf[lexical.FloatLit]) ^^ (_.chars)
-    )
-
   protected lazy val baseExpression: Parser[Expression] =
     ( "*" ^^^ UnresolvedStar(None)
     | ident <~ "." ~ "*" ^^ { case tableName => 
UnresolvedStar(Option(tableName)) }
@@ -355,7 +403,7 @@ class SqlParser extends AbstractSparkSQLParser with 
DataTypeParser {
     )
 
   protected lazy val signedPrimary: Parser[Expression] =
-    sign ~ primary ^^ { case s ~ e => if (s == "-") UnaryMinus(e) else e}
+    sign ~ primary ^^ { case s ~ e => if (s == "-") UnaryMinus(e) else e }
 
   protected lazy val attributeName: Parser[String] = acceptMatch("attribute 
name", {
     case lexical.Identifier(str) => str

http://git-wip-us.apache.org/repos/asf/spark/blob/0ba98c04/sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntervalType.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntervalType.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntervalType.scala
new file mode 100644
index 0000000..87c6e9e
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntervalType.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import org.apache.spark.annotation.DeveloperApi
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing time intervals.
+ *
+ * Please use the singleton [[DataTypes.IntervalType]].
+ */
+@DeveloperApi
+class IntervalType private() extends DataType {
+
+  override def defaultSize: Int = 4096
+
+  private[spark] override def asNullable: IntervalType = this
+}
+
+case object IntervalType extends IntervalType

http://git-wip-us.apache.org/repos/asf/spark/blob/0ba98c04/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala
index de4b511..2be9b2d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala
@@ -41,7 +41,7 @@ class TimestampType private() extends AtomicType {
   private[sql] val ordering = implicitly[Ordering[InternalType]]
 
   /**
-   * The default size of a value of the TimestampType is 12 bytes.
+   * The default size of a value of the TimestampType is 8 bytes.
    */
   override def defaultSize: Int = 8
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0ba98c04/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 1f0b93e..d7440c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -304,6 +304,9 @@ private[sql] object ResolvedDataSource {
       mode: SaveMode,
       options: Map[String, String],
       data: DataFrame): ResolvedDataSource = {
+    if (data.schema.map(_.dataType).exists(_.isInstanceOf[IntervalType])) {
+      throw new AnalysisException("Cannot save interval data type into 
external storage.")
+    }
     val clazz: Class[_] = lookupDataSource(provider)
     val relation = clazz.newInstance() match {
       case dataSource: CreatableRelationProvider =>

http://git-wip-us.apache.org/repos/asf/spark/blob/0ba98c04/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 12ad019..2314408 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1467,4 +1467,29 @@ class SQLQuerySuite extends QueryTest with 
BeforeAndAfterAll with SQLTestUtils {
       checkAnswer(sql("select count, sort from t"), Row(1, "a"))
     }
   }
+
+  test("SPARK-8753: add interval type") {
+    import org.apache.spark.unsafe.types.Interval
+
+    val df = sql("select interval 3 years -3 month 7 week 123 microseconds")
+    checkAnswer(df, Row(new Interval(12 * 3 - 3, 7L * 1000 * 1000 * 3600 * 24 
* 7 + 123 )))
+    withTempPath(f => {
+      // Currently we don't yet support saving out values of interval data 
type.
+      val e = intercept[AnalysisException] {
+        df.write.json(f.getCanonicalPath)
+      }
+      e.message.contains("Cannot save interval data type into external 
storage")
+    })
+
+    def checkIntervalParseError(s: String): Unit = {
+      val e = intercept[AnalysisException] {
+        sql(s)
+      }
+      e.message.contains("at least one time unit should be given for interval 
literal")
+    }
+
+    checkIntervalParseError("select interval")
+    // Currently we don't yet support nanosecond
+    checkIntervalParseError("select interval 23 nanosecond")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0ba98c04/unsafe/src/main/java/org/apache/spark/unsafe/types/Interval.java
----------------------------------------------------------------------
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/types/Interval.java 
b/unsafe/src/main/java/org/apache/spark/unsafe/types/Interval.java
new file mode 100644
index 0000000..3eb67ed
--- /dev/null
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/types/Interval.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.types;
+
+import java.io.Serializable;
+
+/**
+ * The internal representation of interval type.
+ */
+public final class Interval implements Serializable {
+  public final int months;
+  public final long microseconds;
+
+  public Interval(int months, long microseconds) {
+    this.months = months;
+    this.microseconds = microseconds;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) return true;
+    if (other == null || !(other instanceof Interval)) return false;
+
+    Interval o = (Interval) other;
+    return this.months == o.months && this.microseconds == o.microseconds;
+  }
+
+  @Override
+  public int hashCode() {
+    return 31 * months + (int) microseconds;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to