Repository: spark Updated Branches: refs/heads/branch-1.4 d5c52d9ac -> acd872bbd
[SQL] Move some classes into packages that are more appropriate. JavaTypeInference into catalyst types.DateUtils into catalyst CacheManager into execution DefaultParserDialect into catalyst Author: Reynold Xin <r...@databricks.com> Closes #6108 from rxin/sql-rename and squashes the following commits: 3fc9613 [Reynold Xin] Fixed import ordering. 83d9ff4 [Reynold Xin] Fixed codegen tests. e271e86 [Reynold Xin] mima f4e24a6 [Reynold Xin] [SQL] Move some classes into packages that are more appropriate. (cherry picked from commit e683182c3e6347afdac0e5658487f80e5e054ef4) Signed-off-by: Michael Armbrust <mich...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/acd872bb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/acd872bb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/acd872bb Branch: refs/heads/branch-1.4 Commit: acd872bbdbbd2b998d3fd0b79863fd9cdae62e78 Parents: d5c52d9 Author: Reynold Xin <r...@databricks.com> Authored: Wed May 13 16:15:31 2015 -0700 Committer: Michael Armbrust <mich...@databricks.com> Committed: Wed May 13 16:15:43 2015 -0700 ---------------------------------------------------------------------- project/MimaExcludes.scala | 5 +- .../sql/catalyst/CatalystTypeConverters.scala | 1 + .../spark/sql/catalyst/JavaTypeInference.scala | 109 ++++++++++++ .../spark/sql/catalyst/ParserDialect.scala | 36 ++++ .../spark/sql/catalyst/expressions/Cast.scala | 1 + .../expressions/codegen/CodeGenerator.scala | 2 +- .../sql/catalyst/expressions/literals.scala | 1 + .../spark/sql/catalyst/util/DateUtils.scala | 90 ++++++++++ .../org/apache/spark/sql/types/DateUtils.scala | 90 ---------- .../org/apache/spark/sql/types/UTF8String.scala | 17 +- .../expressions/ExpressionEvaluationSuite.scala | 1 + .../org/apache/spark/sql/CacheManager.scala | 164 ------------------ .../scala/org/apache/spark/sql/Column.scala | 2 + .../apache/spark/sql/JavaTypeInference.scala | 111 ------------- .../scala/org/apache/spark/sql/SQLContext.scala | 40 +---- .../spark/sql/execution/CacheManager.scala | 165 +++++++++++++++++++ .../apache/spark/sql/execution/pythonUdfs.scala | 5 +- .../scala/org/apache/spark/sql/functions.scala | 1 + .../org/apache/spark/sql/jdbc/JDBCRDD.scala | 2 +- .../apache/spark/sql/json/JacksonParser.scala | 1 + .../org/apache/spark/sql/json/JsonRDD.scala | 1 + .../org/apache/spark/sql/SQLQuerySuite.scala | 2 +- .../org/apache/spark/sql/json/JsonSuite.scala | 1 + .../spark/sql/parquet/ParquetIOSuite.scala | 1 + .../apache/spark/sql/hive/HiveInspectors.scala | 1 + .../org/apache/spark/sql/hive/TableReader.scala | 2 +- .../sql/hive/execution/SQLQuerySuite.scala | 3 +- 27 files changed, 439 insertions(+), 416 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index f31f0e5..fba7290 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -123,7 +123,10 @@ object MimaExcludes { ProblemFilters.exclude[MissingClassProblem]( "org.apache.spark.sql.parquet.ParquetTestData$"), ProblemFilters.exclude[MissingClassProblem]( - "org.apache.spark.sql.parquet.TestGroupWriteSupport") + "org.apache.spark.sql.parquet.TestGroupWriteSupport"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.CachedData"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.CachedData$"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.CacheManager") ) ++ Seq( // SPARK-7530 Added StreamingContext.getState() ProblemFilters.exclude[MissingMethodProblem]( http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala index a13e2f3..75a493b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala @@ -23,6 +23,7 @@ import java.util.{Map => JavaMap} import scala.collection.mutable.HashMap import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.sql.types._ /** http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala new file mode 100644 index 0000000..625c8d3 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst + +import java.beans.Introspector +import java.lang.{Iterable => JIterable} +import java.util.{Iterator => JIterator, Map => JMap} + +import scala.language.existentials + +import com.google.common.reflect.TypeToken +import org.apache.spark.sql.types._ + +/** + * Type-inference utilities for POJOs and Java collections. + */ +private [sql] object JavaTypeInference { + + private val iterableType = TypeToken.of(classOf[JIterable[_]]) + private val mapType = TypeToken.of(classOf[JMap[_, _]]) + private val iteratorReturnType = classOf[JIterable[_]].getMethod("iterator").getGenericReturnType + private val nextReturnType = classOf[JIterator[_]].getMethod("next").getGenericReturnType + private val keySetReturnType = classOf[JMap[_, _]].getMethod("keySet").getGenericReturnType + private val valuesReturnType = classOf[JMap[_, _]].getMethod("values").getGenericReturnType + + /** + * Infers the corresponding SQL data type of a Java type. + * @param typeToken Java type + * @return (SQL data type, nullable) + */ + private [sql] def inferDataType(typeToken: TypeToken[_]): (DataType, Boolean) = { + // TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific. + typeToken.getRawType match { + case c: Class[_] if c.isAnnotationPresent(classOf[SQLUserDefinedType]) => + (c.getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance(), true) + + case c: Class[_] if c == classOf[java.lang.String] => (StringType, true) + case c: Class[_] if c == java.lang.Short.TYPE => (ShortType, false) + case c: Class[_] if c == java.lang.Integer.TYPE => (IntegerType, false) + case c: Class[_] if c == java.lang.Long.TYPE => (LongType, false) + case c: Class[_] if c == java.lang.Double.TYPE => (DoubleType, false) + case c: Class[_] if c == java.lang.Byte.TYPE => (ByteType, false) + case c: Class[_] if c == java.lang.Float.TYPE => (FloatType, false) + case c: Class[_] if c == java.lang.Boolean.TYPE => (BooleanType, false) + + case c: Class[_] if c == classOf[java.lang.Short] => (ShortType, true) + case c: Class[_] if c == classOf[java.lang.Integer] => (IntegerType, true) + case c: Class[_] if c == classOf[java.lang.Long] => (LongType, true) + case c: Class[_] if c == classOf[java.lang.Double] => (DoubleType, true) + case c: Class[_] if c == classOf[java.lang.Byte] => (ByteType, true) + case c: Class[_] if c == classOf[java.lang.Float] => (FloatType, true) + case c: Class[_] if c == classOf[java.lang.Boolean] => (BooleanType, true) + + case c: Class[_] if c == classOf[java.math.BigDecimal] => (DecimalType(), true) + case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true) + case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, true) + + case _ if typeToken.isArray => + val (dataType, nullable) = inferDataType(typeToken.getComponentType) + (ArrayType(dataType, nullable), true) + + case _ if iterableType.isAssignableFrom(typeToken) => + val (dataType, nullable) = inferDataType(elementType(typeToken)) + (ArrayType(dataType, nullable), true) + + case _ if mapType.isAssignableFrom(typeToken) => + val typeToken2 = typeToken.asInstanceOf[TypeToken[_ <: JMap[_, _]]] + val mapSupertype = typeToken2.getSupertype(classOf[JMap[_, _]]) + val keyType = elementType(mapSupertype.resolveType(keySetReturnType)) + val valueType = elementType(mapSupertype.resolveType(valuesReturnType)) + val (keyDataType, _) = inferDataType(keyType) + val (valueDataType, nullable) = inferDataType(valueType) + (MapType(keyDataType, valueDataType, nullable), true) + + case _ => + val beanInfo = Introspector.getBeanInfo(typeToken.getRawType) + val properties = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class") + val fields = properties.map { property => + val returnType = typeToken.method(property.getReadMethod).getReturnType + val (dataType, nullable) = inferDataType(returnType) + new StructField(property.getName, dataType, nullable) + } + (new StructType(fields), true) + } + } + + private def elementType(typeToken: TypeToken[_]): TypeToken[_] = { + val typeToken2 = typeToken.asInstanceOf[TypeToken[_ <: JIterable[_]]] + val iterableSupertype = typeToken2.getSupertype(classOf[JIterable[_]]) + val iteratorType = iterableSupertype.resolveType(iteratorReturnType) + val itemType = iteratorType.resolveType(nextReturnType) + itemType + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserDialect.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserDialect.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserDialect.scala index 05a92b0..554fb4e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserDialect.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ParserDialect.scala @@ -31,3 +31,39 @@ abstract class ParserDialect { // this is the main function that will be implemented by sql parser. def parse(sqlText: String): LogicalPlan } + +/** + * Currently we support the default dialect named "sql", associated with the class + * [[DefaultParserDialect]] + * + * And we can also provide custom SQL Dialect, for example in Spark SQL CLI: + * {{{ + *-- switch to "hiveql" dialect + * spark-sql>SET spark.sql.dialect=hiveql; + * spark-sql>SELECT * FROM src LIMIT 1; + * + *-- switch to "sql" dialect + * spark-sql>SET spark.sql.dialect=sql; + * spark-sql>SELECT * FROM src LIMIT 1; + * + *-- register the new SQL dialect + * spark-sql> SET spark.sql.dialect=com.xxx.xxx.SQL99Dialect; + * spark-sql> SELECT * FROM src LIMIT 1; + * + *-- register the non-exist SQL dialect + * spark-sql> SET spark.sql.dialect=NotExistedClass; + * spark-sql> SELECT * FROM src LIMIT 1; + * + *-- Exception will be thrown and switch to dialect + *-- "sql" (for SQLContext) or + *-- "hiveql" (for HiveContext) + * }}} + */ +private[spark] class DefaultParserDialect extends ParserDialect { + @transient + protected val sqlParser = new SqlParser + + override def parse(sqlText: String): LogicalPlan = { + sqlParser.parse(sqlText) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index adf941a..d8cf2b2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -21,6 +21,7 @@ import java.sql.{Date, Timestamp} import java.text.{DateFormat, SimpleDateFormat} import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.sql.types._ /** Cast the child expression to the target data type. */ http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index d17af0e..ecb4c4b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -250,7 +250,7 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin case Cast(child @ DateType(), StringType) => child.castOrNull(c => q"""org.apache.spark.sql.types.UTF8String( - org.apache.spark.sql.types.DateUtils.toString($c))""", + org.apache.spark.sql.catalyst.util.DateUtils.toString($c))""", StringType) case Cast(child @ NumericType(), IntegerType) => http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala index 18cba4c..5f8c735 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.expressions import java.sql.{Date, Timestamp} import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.sql.types._ object Literal { http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala new file mode 100644 index 0000000..3f92be4 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateUtils.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import java.sql.Date +import java.text.SimpleDateFormat +import java.util.{Calendar, TimeZone} + +import org.apache.spark.sql.catalyst.expressions.Cast + +/** + * helper function to convert between Int value of days since 1970-01-01 and java.sql.Date + */ +object DateUtils { + private val MILLIS_PER_DAY = 86400000 + + // Java TimeZone has no mention of thread safety. Use thread local instance to be safe. + private val LOCAL_TIMEZONE = new ThreadLocal[TimeZone] { + override protected def initialValue: TimeZone = { + Calendar.getInstance.getTimeZone + } + } + + private def javaDateToDays(d: Date): Int = { + millisToDays(d.getTime) + } + + // we should use the exact day as Int, for example, (year, month, day) -> day + def millisToDays(millisLocal: Long): Int = { + ((millisLocal + LOCAL_TIMEZONE.get().getOffset(millisLocal)) / MILLIS_PER_DAY).toInt + } + + private def toMillisSinceEpoch(days: Int): Long = { + val millisUtc = days.toLong * MILLIS_PER_DAY + millisUtc - LOCAL_TIMEZONE.get().getOffset(millisUtc) + } + + def fromJavaDate(date: java.sql.Date): Int = { + javaDateToDays(date) + } + + def toJavaDate(daysSinceEpoch: Int): java.sql.Date = { + new java.sql.Date(toMillisSinceEpoch(daysSinceEpoch)) + } + + def toString(days: Int): String = Cast.threadLocalDateFormat.get.format(toJavaDate(days)) + + def stringToTime(s: String): java.util.Date = { + if (!s.contains('T')) { + // JDBC escape string + if (s.contains(' ')) { + java.sql.Timestamp.valueOf(s) + } else { + java.sql.Date.valueOf(s) + } + } else if (s.endsWith("Z")) { + // this is zero timezone of ISO8601 + stringToTime(s.substring(0, s.length - 1) + "GMT-00:00") + } else if (s.indexOf("GMT") == -1) { + // timezone with ISO8601 + val inset = "+00.00".length + val s0 = s.substring(0, s.length - inset) + val s1 = s.substring(s.length - inset, s.length) + if (s0.substring(s0.lastIndexOf(':')).contains('.')) { + stringToTime(s0 + "GMT" + s1) + } else { + stringToTime(s0 + ".0GMT" + s1) + } + } else { + // ISO8601 with GMT insert + val ISO8601GMT: SimpleDateFormat = new SimpleDateFormat( "yyyy-MM-dd'T'HH:mm:ss.SSSz" ) + ISO8601GMT.parse(s) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala deleted file mode 100644 index d36a491..0000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateUtils.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.types - -import java.sql.Date -import java.text.SimpleDateFormat -import java.util.{Calendar, TimeZone} - -import org.apache.spark.sql.catalyst.expressions.Cast - -/** - * helper function to convert between Int value of days since 1970-01-01 and java.sql.Date - */ -object DateUtils { - private val MILLIS_PER_DAY = 86400000 - - // Java TimeZone has no mention of thread safety. Use thread local instance to be safe. - private val LOCAL_TIMEZONE = new ThreadLocal[TimeZone] { - override protected def initialValue: TimeZone = { - Calendar.getInstance.getTimeZone - } - } - - private def javaDateToDays(d: Date): Int = { - millisToDays(d.getTime) - } - - // we should use the exact day as Int, for example, (year, month, day) -> day - def millisToDays(millisLocal: Long): Int = { - ((millisLocal + LOCAL_TIMEZONE.get().getOffset(millisLocal)) / MILLIS_PER_DAY).toInt - } - - private def toMillisSinceEpoch(days: Int): Long = { - val millisUtc = days.toLong * MILLIS_PER_DAY - millisUtc - LOCAL_TIMEZONE.get().getOffset(millisUtc) - } - - def fromJavaDate(date: java.sql.Date): Int = { - javaDateToDays(date) - } - - def toJavaDate(daysSinceEpoch: Int): java.sql.Date = { - new java.sql.Date(toMillisSinceEpoch(daysSinceEpoch)) - } - - def toString(days: Int): String = Cast.threadLocalDateFormat.get.format(toJavaDate(days)) - - def stringToTime(s: String): java.util.Date = { - if (!s.contains('T')) { - // JDBC escape string - if (s.contains(' ')) { - java.sql.Timestamp.valueOf(s) - } else { - java.sql.Date.valueOf(s) - } - } else if (s.endsWith("Z")) { - // this is zero timezone of ISO8601 - stringToTime(s.substring(0, s.length - 1) + "GMT-00:00") - } else if (s.indexOf("GMT") == -1) { - // timezone with ISO8601 - val inset = "+00.00".length - val s0 = s.substring(0, s.length - inset) - val s1 = s.substring(s.length - inset, s.length) - if (s0.substring(s0.lastIndexOf(':')).contains('.')) { - stringToTime(s0 + "GMT" + s1) - } else { - stringToTime(s0 + ".0GMT" + s1) - } - } else { - // ISO8601 with GMT insert - val ISO8601GMT: SimpleDateFormat = new SimpleDateFormat( "yyyy-MM-dd'T'HH:mm:ss.SSSz" ) - ISO8601GMT.parse(s) - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala index fc02ba6..bc9c37b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UTF8String.scala @@ -19,15 +19,18 @@ package org.apache.spark.sql.types import java.util.Arrays +import org.apache.spark.annotation.DeveloperApi + /** - * A UTF-8 String, as internal representation of StringType in SparkSQL + * :: DeveloperApi :: + * A UTF-8 String, as internal representation of StringType in SparkSQL * - * A String encoded in UTF-8 as an Array[Byte], which can be used for comparison, - * search, see http://en.wikipedia.org/wiki/UTF-8 for details. + * A String encoded in UTF-8 as an Array[Byte], which can be used for comparison, + * search, see http://en.wikipedia.org/wiki/UTF-8 for details. * - * Note: This is not designed for general use cases, should not be used outside SQL. + * Note: This is not designed for general use cases, should not be used outside SQL. */ - +@DeveloperApi final class UTF8String extends Ordered[UTF8String] with Serializable { private[this] var bytes: Array[Byte] = _ @@ -180,6 +183,10 @@ final class UTF8String extends Ordered[UTF8String] with Serializable { } } +/** + * :: DeveloperApi :: + */ +@DeveloperApi object UTF8String { // number of tailing bytes in a UTF8 sequence for a code point // see http://en.wikipedia.org/wiki/UTF-8, 192-256 of Byte 1 http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala index 04fd261..5c4a152 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvaluationSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.CatalystTypeConverters import org.apache.spark.sql.catalyst.analysis.UnresolvedExtractValue import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions.mathfuncs._ +import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.sql.types._ http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala deleted file mode 100644 index 18584c2..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import java.util.concurrent.locks.ReentrantReadWriteLock - -import org.apache.spark.Logging -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.columnar.InMemoryRelation -import org.apache.spark.storage.StorageLevel -import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK - -/** Holds a cached logical plan and its data */ -private case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation) - -/** - * Provides support in a SQLContext for caching query results and automatically using these cached - * results when subsequent queries are executed. Data is cached using byte buffers stored in an - * InMemoryRelation. This relation is automatically substituted query plans that return the - * `sameResult` as the originally cached query. - * - * Internal to Spark SQL. - */ -private[sql] class CacheManager(sqlContext: SQLContext) extends Logging { - - @transient - private val cachedData = new scala.collection.mutable.ArrayBuffer[CachedData] - - @transient - private val cacheLock = new ReentrantReadWriteLock - - /** Returns true if the table is currently cached in-memory. */ - def isCached(tableName: String): Boolean = lookupCachedData(sqlContext.table(tableName)).nonEmpty - - /** Caches the specified table in-memory. */ - def cacheTable(tableName: String): Unit = cacheQuery(sqlContext.table(tableName), Some(tableName)) - - /** Removes the specified table from the in-memory cache. */ - def uncacheTable(tableName: String): Unit = uncacheQuery(sqlContext.table(tableName)) - - /** Acquires a read lock on the cache for the duration of `f`. */ - private def readLock[A](f: => A): A = { - val lock = cacheLock.readLock() - lock.lock() - try f finally { - lock.unlock() - } - } - - /** Acquires a write lock on the cache for the duration of `f`. */ - private def writeLock[A](f: => A): A = { - val lock = cacheLock.writeLock() - lock.lock() - try f finally { - lock.unlock() - } - } - - /** Clears all cached tables. */ - private[sql] def clearCache(): Unit = writeLock { - cachedData.foreach(_.cachedRepresentation.cachedColumnBuffers.unpersist()) - cachedData.clear() - } - - /** Checks if the cache is empty. */ - private[sql] def isEmpty: Boolean = readLock { - cachedData.isEmpty - } - - /** - * Caches the data produced by the logical representation of the given schema rdd. Unlike - * `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because recomputing - * the in-memory columnar representation of the underlying table is expensive. - */ - private[sql] def cacheQuery( - query: DataFrame, - tableName: Option[String] = None, - storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock { - val planToCache = query.queryExecution.analyzed - if (lookupCachedData(planToCache).nonEmpty) { - logWarning("Asked to cache already cached data.") - } else { - cachedData += - CachedData( - planToCache, - InMemoryRelation( - sqlContext.conf.useCompression, - sqlContext.conf.columnBatchSize, - storageLevel, - query.queryExecution.executedPlan, - tableName)) - } - } - - /** Removes the data for the given [[DataFrame]] from the cache */ - private[sql] def uncacheQuery(query: DataFrame, blocking: Boolean = true): Unit = writeLock { - val planToCache = query.queryExecution.analyzed - val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) - require(dataIndex >= 0, s"Table $query is not cached.") - cachedData(dataIndex).cachedRepresentation.uncache(blocking) - cachedData.remove(dataIndex) - } - - /** Tries to remove the data for the given [[DataFrame]] from the cache if it's cached */ - private[sql] def tryUncacheQuery( - query: DataFrame, - blocking: Boolean = true): Boolean = writeLock { - val planToCache = query.queryExecution.analyzed - val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) - val found = dataIndex >= 0 - if (found) { - cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking) - cachedData.remove(dataIndex) - } - found - } - - /** Optionally returns cached data for the given [[DataFrame]] */ - private[sql] def lookupCachedData(query: DataFrame): Option[CachedData] = readLock { - lookupCachedData(query.queryExecution.analyzed) - } - - /** Optionally returns cached data for the given LogicalPlan. */ - private[sql] def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock { - cachedData.find(cd => plan.sameResult(cd.plan)) - } - - /** Replaces segments of the given logical plan with cached versions where possible. */ - private[sql] def useCachedData(plan: LogicalPlan): LogicalPlan = { - plan transformDown { - case currentFragment => - lookupCachedData(currentFragment) - .map(_.cachedRepresentation.withOutput(currentFragment.output)) - .getOrElse(currentFragment) - } - } - - /** - * Invalidates the cache of any data that contains `plan`. Note that it is possible that this - * function will over invalidate. - */ - private[sql] def invalidateCache(plan: LogicalPlan): Unit = writeLock { - cachedData.foreach { - case data if data.plan.collect { case p if p.sameResult(plan) => p }.nonEmpty => - data.cachedRepresentation.recache() - case _ => - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/core/src/main/scala/org/apache/spark/sql/Column.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 42f5bcd..8bf1320 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -346,6 +346,7 @@ class Column(protected[sql] val expr: Expression) extends Logging { * }}} * * @group expr_ops + * @since 1.4.0 */ def when(condition: Column, value: Any):Column = this.expr match { case CaseWhen(branches: Seq[Expression]) => @@ -374,6 +375,7 @@ class Column(protected[sql] val expr: Expression) extends Logging { * }}} * * @group expr_ops + * @since 1.4.0 */ def otherwise(value: Any):Column = this.expr match { case CaseWhen(branches: Seq[Expression]) => http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/core/src/main/scala/org/apache/spark/sql/JavaTypeInference.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/JavaTypeInference.scala b/sql/core/src/main/scala/org/apache/spark/sql/JavaTypeInference.scala deleted file mode 100644 index 1ec874f..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/JavaTypeInference.scala +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import java.beans.Introspector -import java.lang.{Iterable => JIterable} -import java.util.{Iterator => JIterator, Map => JMap} - -import scala.language.existentials - -import com.google.common.reflect.TypeToken - -import org.apache.spark.sql.types._ - - -/** - * Type-inference utilities for POJOs and Java collections. - */ -private [sql] object JavaTypeInference { - - private val iterableType = TypeToken.of(classOf[JIterable[_]]) - private val mapType = TypeToken.of(classOf[JMap[_, _]]) - private val iteratorReturnType = classOf[JIterable[_]].getMethod("iterator").getGenericReturnType - private val nextReturnType = classOf[JIterator[_]].getMethod("next").getGenericReturnType - private val keySetReturnType = classOf[JMap[_, _]].getMethod("keySet").getGenericReturnType - private val valuesReturnType = classOf[JMap[_, _]].getMethod("values").getGenericReturnType - - /** - * Infers the corresponding SQL data type of a Java type. - * @param typeToken Java type - * @return (SQL data type, nullable) - */ - private [sql] def inferDataType(typeToken: TypeToken[_]): (DataType, Boolean) = { - // TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific. - typeToken.getRawType match { - case c: Class[_] if c.isAnnotationPresent(classOf[SQLUserDefinedType]) => - (c.getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance(), true) - - case c: Class[_] if c == classOf[java.lang.String] => (StringType, true) - case c: Class[_] if c == java.lang.Short.TYPE => (ShortType, false) - case c: Class[_] if c == java.lang.Integer.TYPE => (IntegerType, false) - case c: Class[_] if c == java.lang.Long.TYPE => (LongType, false) - case c: Class[_] if c == java.lang.Double.TYPE => (DoubleType, false) - case c: Class[_] if c == java.lang.Byte.TYPE => (ByteType, false) - case c: Class[_] if c == java.lang.Float.TYPE => (FloatType, false) - case c: Class[_] if c == java.lang.Boolean.TYPE => (BooleanType, false) - - case c: Class[_] if c == classOf[java.lang.Short] => (ShortType, true) - case c: Class[_] if c == classOf[java.lang.Integer] => (IntegerType, true) - case c: Class[_] if c == classOf[java.lang.Long] => (LongType, true) - case c: Class[_] if c == classOf[java.lang.Double] => (DoubleType, true) - case c: Class[_] if c == classOf[java.lang.Byte] => (ByteType, true) - case c: Class[_] if c == classOf[java.lang.Float] => (FloatType, true) - case c: Class[_] if c == classOf[java.lang.Boolean] => (BooleanType, true) - - case c: Class[_] if c == classOf[java.math.BigDecimal] => (DecimalType(), true) - case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true) - case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, true) - - case _ if typeToken.isArray => - val (dataType, nullable) = inferDataType(typeToken.getComponentType) - (ArrayType(dataType, nullable), true) - - case _ if iterableType.isAssignableFrom(typeToken) => - val (dataType, nullable) = inferDataType(elementType(typeToken)) - (ArrayType(dataType, nullable), true) - - case _ if mapType.isAssignableFrom(typeToken) => - val typeToken2 = typeToken.asInstanceOf[TypeToken[_ <: JMap[_, _]]] - val mapSupertype = typeToken2.getSupertype(classOf[JMap[_, _]]) - val keyType = elementType(mapSupertype.resolveType(keySetReturnType)) - val valueType = elementType(mapSupertype.resolveType(valuesReturnType)) - val (keyDataType, _) = inferDataType(keyType) - val (valueDataType, nullable) = inferDataType(valueType) - (MapType(keyDataType, valueDataType, nullable), true) - - case _ => - val beanInfo = Introspector.getBeanInfo(typeToken.getRawType) - val properties = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class") - val fields = properties.map { property => - val returnType = typeToken.method(property.getReadMethod).getReturnType - val (dataType, nullable) = inferDataType(returnType) - new StructField(property.getName, dataType, nullable) - } - (new StructType(fields), true) - } - } - - private def elementType(typeToken: TypeToken[_]): TypeToken[_] = { - val typeToken2 = typeToken.asInstanceOf[TypeToken[_ <: JIterable[_]]] - val iterableSupertype = typeToken2.getSupertype(classOf[JIterable[_]]) - val iteratorType = iterableSupertype.resolveType(iteratorReturnType) - val itemType = iteratorType.resolveType(nextReturnType) - itemType - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 0a148c7..521f3dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -33,6 +33,7 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.errors.DialectException @@ -40,7 +41,6 @@ import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.ParserDialect -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, expressions} import org.apache.spark.sql.execution.{Filter, _} import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation} import org.apache.spark.sql.json._ @@ -51,42 +51,6 @@ import org.apache.spark.util.Utils import org.apache.spark.{Partition, SparkContext} /** - * Currently we support the default dialect named "sql", associated with the class - * [[DefaultParserDialect]] - * - * And we can also provide custom SQL Dialect, for example in Spark SQL CLI: - * {{{ - *-- switch to "hiveql" dialect - * spark-sql>SET spark.sql.dialect=hiveql; - * spark-sql>SELECT * FROM src LIMIT 1; - * - *-- switch to "sql" dialect - * spark-sql>SET spark.sql.dialect=sql; - * spark-sql>SELECT * FROM src LIMIT 1; - * - *-- register the new SQL dialect - * spark-sql> SET spark.sql.dialect=com.xxx.xxx.SQL99Dialect; - * spark-sql> SELECT * FROM src LIMIT 1; - * - *-- register the non-exist SQL dialect - * spark-sql> SET spark.sql.dialect=NotExistedClass; - * spark-sql> SELECT * FROM src LIMIT 1; - * - *-- Exception will be thrown and switch to dialect - *-- "sql" (for SQLContext) or - *-- "hiveql" (for HiveContext) - * }}} - */ -private[spark] class DefaultParserDialect extends ParserDialect { - @transient - protected val sqlParser = new catalyst.SqlParser - - override def parse(sqlText: String): LogicalPlan = { - sqlParser.parse(sqlText) - } -} - -/** * The entry point for working with structured data (rows and columns) in Spark. Allows the * creation of [[DataFrame]] objects as well as the execution of SQL queries. * @@ -1276,7 +1240,7 @@ class SQLContext(@transient val sparkContext: SparkContext) val projectSet = AttributeSet(projectList.flatMap(_.references)) val filterSet = AttributeSet(filterPredicates.flatMap(_.references)) val filterCondition = - prunePushedDownFilters(filterPredicates).reduceLeftOption(expressions.And) + prunePushedDownFilters(filterPredicates).reduceLeftOption(catalyst.expressions.And) // Right now we still use a projection even if the only evaluation is applying an alias // to a column. Since this is a no-op, it could be avoided. However, using this http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala new file mode 100644 index 0000000..5fcc48a --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.util.concurrent.locks.ReentrantReadWriteLock + +import org.apache.spark.Logging +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.columnar.InMemoryRelation +import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.storage.StorageLevel +import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK + +/** Holds a cached logical plan and its data */ +private[sql] case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation) + +/** + * Provides support in a SQLContext for caching query results and automatically using these cached + * results when subsequent queries are executed. Data is cached using byte buffers stored in an + * InMemoryRelation. This relation is automatically substituted query plans that return the + * `sameResult` as the originally cached query. + * + * Internal to Spark SQL. + */ +private[sql] class CacheManager(sqlContext: SQLContext) extends Logging { + + @transient + private val cachedData = new scala.collection.mutable.ArrayBuffer[CachedData] + + @transient + private val cacheLock = new ReentrantReadWriteLock + + /** Returns true if the table is currently cached in-memory. */ + def isCached(tableName: String): Boolean = lookupCachedData(sqlContext.table(tableName)).nonEmpty + + /** Caches the specified table in-memory. */ + def cacheTable(tableName: String): Unit = cacheQuery(sqlContext.table(tableName), Some(tableName)) + + /** Removes the specified table from the in-memory cache. */ + def uncacheTable(tableName: String): Unit = uncacheQuery(sqlContext.table(tableName)) + + /** Acquires a read lock on the cache for the duration of `f`. */ + private def readLock[A](f: => A): A = { + val lock = cacheLock.readLock() + lock.lock() + try f finally { + lock.unlock() + } + } + + /** Acquires a write lock on the cache for the duration of `f`. */ + private def writeLock[A](f: => A): A = { + val lock = cacheLock.writeLock() + lock.lock() + try f finally { + lock.unlock() + } + } + + /** Clears all cached tables. */ + private[sql] def clearCache(): Unit = writeLock { + cachedData.foreach(_.cachedRepresentation.cachedColumnBuffers.unpersist()) + cachedData.clear() + } + + /** Checks if the cache is empty. */ + private[sql] def isEmpty: Boolean = readLock { + cachedData.isEmpty + } + + /** + * Caches the data produced by the logical representation of the given schema rdd. Unlike + * `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because recomputing + * the in-memory columnar representation of the underlying table is expensive. + */ + private[sql] def cacheQuery( + query: DataFrame, + tableName: Option[String] = None, + storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock { + val planToCache = query.queryExecution.analyzed + if (lookupCachedData(planToCache).nonEmpty) { + logWarning("Asked to cache already cached data.") + } else { + cachedData += + CachedData( + planToCache, + InMemoryRelation( + sqlContext.conf.useCompression, + sqlContext.conf.columnBatchSize, + storageLevel, + query.queryExecution.executedPlan, + tableName)) + } + } + + /** Removes the data for the given [[DataFrame]] from the cache */ + private[sql] def uncacheQuery(query: DataFrame, blocking: Boolean = true): Unit = writeLock { + val planToCache = query.queryExecution.analyzed + val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) + require(dataIndex >= 0, s"Table $query is not cached.") + cachedData(dataIndex).cachedRepresentation.uncache(blocking) + cachedData.remove(dataIndex) + } + + /** Tries to remove the data for the given [[DataFrame]] from the cache if it's cached */ + private[sql] def tryUncacheQuery( + query: DataFrame, + blocking: Boolean = true): Boolean = writeLock { + val planToCache = query.queryExecution.analyzed + val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) + val found = dataIndex >= 0 + if (found) { + cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking) + cachedData.remove(dataIndex) + } + found + } + + /** Optionally returns cached data for the given [[DataFrame]] */ + private[sql] def lookupCachedData(query: DataFrame): Option[CachedData] = readLock { + lookupCachedData(query.queryExecution.analyzed) + } + + /** Optionally returns cached data for the given LogicalPlan. */ + private[sql] def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock { + cachedData.find(cd => plan.sameResult(cd.plan)) + } + + /** Replaces segments of the given logical plan with cached versions where possible. */ + private[sql] def useCachedData(plan: LogicalPlan): LogicalPlan = { + plan transformDown { + case currentFragment => + lookupCachedData(currentFragment) + .map(_.cachedRepresentation.withOutput(currentFragment.output)) + .getOrElse(currentFragment) + } + } + + /** + * Invalidates the cache of any data that contains `plan`. Note that it is possible that this + * function will over invalidate. + */ + private[sql] def invalidateCache(plan: LogicalPlan): Unit = writeLock { + cachedData.foreach { + case data if data.plan.collect { case p if p.sameResult(plan) => p }.nonEmpty => + data.cachedRepresentation.recache() + case _ => + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala index 3dbc383..65dd7ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala @@ -19,20 +19,21 @@ package org.apache.spark.sql.execution import java.util.{List => JList, Map => JMap} -import org.apache.spark.rdd.RDD - import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ import net.razorvine.pickle.{Pickler, Unpickler} + import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.python.{PythonBroadcast, PythonRDD} import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.sql.types._ import org.apache.spark.{Accumulator, Logging => SparkLogging} http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 099e1d8..4404ad8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -438,6 +438,7 @@ object functions { * }}} * * @group normal_funcs + * @since 1.4.0 */ def when(condition: Column, value: Any): Column = { CaseWhen(Seq(condition.expr, lit(value).expr)) http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala index a03ade3..40483d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala @@ -25,9 +25,9 @@ import org.apache.commons.lang3.StringUtils import org.apache.spark.{Logging, Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{Row, SpecificMutableRow} +import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.sql.types._ import org.apache.spark.sql.sources._ -import org.apache.spark.util.Utils private[sql] object JDBCRDD extends Logging { /** http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala index a8e69ae..8161151 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JacksonParser.scala @@ -26,6 +26,7 @@ import com.fasterxml.jackson.core._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.sql.json.JacksonUtils.nextUntil import org.apache.spark.sql.types._ http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index f62973d..4c32710 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -29,6 +29,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.ScalaReflection +import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.sql.types._ import org.apache.spark.Logging http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index ec0e76c..8cdbe07 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -19,10 +19,10 @@ package org.apache.spark.sql import org.scalatest.BeforeAndAfterAll +import org.apache.spark.sql.catalyst.DefaultParserDialect import org.apache.spark.sql.catalyst.errors.DialectException import org.apache.spark.sql.execution.GeneratedAggregate import org.apache.spark.sql.functions._ -import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.TestData._ import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.test.TestSQLContext.{udf => _, _} http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala index 263fafb..b06e338 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala @@ -24,6 +24,7 @@ import com.fasterxml.jackson.core.JsonFactory import org.scalactic.Tolerance._ import org.apache.spark.sql.TestData._ +import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.sql.functions._ import org.apache.spark.sql.json.InferSchema.compatibleType import org.apache.spark.sql.sources.LogicalRelation http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala index 7c371db..008443d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala @@ -35,6 +35,7 @@ import parquet.schema.{MessageType, MessageTypeParser} import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.test.TestSQLContext._ import org.apache.spark.sql.test.TestSQLContext.implicits._ http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 74ae984..7c7666f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.serde2.{io => hiveIo} import org.apache.hadoop.{io => hadoopIo} import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.sql.types import org.apache.spark.sql.types._ http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index b69312f..0b6f7a3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -35,7 +35,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.Logging import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD} import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.DateUtils +import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.util.Utils /** http://git-wip-us.apache.org/repos/asf/spark/blob/acd872bb/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 1d6393a..eaa9d6a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -17,8 +17,10 @@ package org.apache.spark.sql.hive.execution +import org.apache.spark.sql.catalyst.DefaultParserDialect import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries import org.apache.spark.sql.catalyst.errors.DialectException +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf} import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ @@ -26,7 +28,6 @@ import org.apache.spark.sql.hive.{HiveQLDialect, HiveShim, MetastoreRelation} import org.apache.spark.sql.parquet.FSBasedParquetRelation import org.apache.spark.sql.sources.LogicalRelation import org.apache.spark.sql.types._ -import org.apache.spark.sql.{AnalysisException, DefaultParserDialect, QueryTest, Row, SQLConf} case class Nested1(f1: Nested2) case class Nested2(f2: Nested3) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org