[SPARK-9763][SQL] Minimize exposure of internal SQL classes.

There are a few changes in this pull request:

1. Moved all data sources to execution.datasources, except the public JDBC APIs.
2. In order to maintain backward compatibility from 1, added a backward 
compatibility translation map in data source resolution.
3. Moved ui and metric package into execution.
4. Added more documentation on some internal classes.
5. Renamed DataSourceRegister.format -> shortName.
6. Added "override" modifier on shortName.
7. Removed IntSQLMetric.

Author: Reynold Xin <r...@databricks.com>

Closes #8056 from rxin/SPARK-9763 and squashes the following commits:

9df4801 [Reynold Xin] Removed hardcoded name in test cases.
d9babc6 [Reynold Xin] Shorten.
e484419 [Reynold Xin] Removed VisibleForTesting.
171b812 [Reynold Xin] MimaExcludes.
2041389 [Reynold Xin] Compile ...
79dda42 [Reynold Xin] Compile.
0818ba3 [Reynold Xin] Removed IntSQLMetric.
c46884f [Reynold Xin] Two more fixes.
f9aa88d [Reynold Xin] [SPARK-9763][SQL] Minimize exposure of internal SQL 
classes.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40ed2af5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40ed2af5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40ed2af5

Branch: refs/heads/master
Commit: 40ed2af587cedadc6e5249031857a922b3b234ca
Parents: 0fe6674
Author: Reynold Xin <r...@databricks.com>
Authored: Mon Aug 10 13:49:23 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Mon Aug 10 13:49:23 2015 -0700

----------------------------------------------------------------------
 project/MimaExcludes.scala                      |   24 +-
 ....apache.spark.sql.sources.DataSourceRegister |    6 +-
 .../sql/execution/ui/static/spark-sql-viz.css   |   37 +
 .../sql/execution/ui/static/spark-sql-viz.js    |  160 +++
 .../spark/sql/ui/static/spark-sql-viz.css       |   37 -
 .../apache/spark/sql/ui/static/spark-sql-viz.js |  160 ---
 .../scala/org/apache/spark/sql/DataFrame.scala  |    2 +-
 .../org/apache/spark/sql/DataFrameReader.scala  |    6 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |    6 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |    2 +-
 .../spark/sql/execution/SQLExecution.scala      |    2 +-
 .../apache/spark/sql/execution/SparkPlan.scala  |    8 +-
 .../spark/sql/execution/basicOperators.scala    |    2 +-
 .../sql/execution/datasources/DDLParser.scala   |  185 +++
 .../execution/datasources/DefaultSource.scala   |   64 +
 .../datasources/InsertIntoDataSource.scala      |   23 +-
 .../datasources/ResolvedDataSource.scala        |  204 +++
 .../spark/sql/execution/datasources/ddl.scala   |  352 +-----
 .../datasources/jdbc/DefaultSource.scala        |   62 +
 .../datasources/jdbc/DriverRegistry.scala       |   60 +
 .../datasources/jdbc/DriverWrapper.scala        |   48 +
 .../execution/datasources/jdbc/JDBCRDD.scala    |  489 ++++++++
 .../datasources/jdbc/JDBCRelation.scala         |  113 ++
 .../execution/datasources/jdbc/JdbcUtils.scala  |  219 ++++
 .../datasources/json/InferSchema.scala          |  207 ++++
 .../datasources/json/JSONRelation.scala         |  204 +++
 .../datasources/json/JacksonGenerator.scala     |  135 ++
 .../datasources/json/JacksonParser.scala        |  228 ++++
 .../datasources/json/JacksonUtils.scala         |   32 +
 .../parquet/CatalystReadSupport.scala           |  153 +++
 .../parquet/CatalystRecordMaterializer.scala    |   41 +
 .../parquet/CatalystRowConverter.scala          |  449 +++++++
 .../parquet/CatalystSchemaConverter.scala       |  592 +++++++++
 .../parquet/DirectParquetOutputCommitter.scala  |   87 ++
 .../datasources/parquet/ParquetConverter.scala  |   39 +
 .../datasources/parquet/ParquetFilters.scala    |  360 ++++++
 .../datasources/parquet/ParquetRelation.scala   |  796 ++++++++++++
 .../parquet/ParquetTableSupport.scala           |  322 +++++
 .../parquet/ParquetTypesConverter.scala         |  159 +++
 .../spark/sql/execution/metric/SQLMetrics.scala |  115 ++
 .../apache/spark/sql/execution/package.scala    |    8 +-
 .../sql/execution/ui/AllExecutionsPage.scala    |  238 ++++
 .../spark/sql/execution/ui/ExecutionPage.scala  |  127 ++
 .../spark/sql/execution/ui/SQLListener.scala    |  352 ++++++
 .../apache/spark/sql/execution/ui/SQLTab.scala  |   49 +
 .../spark/sql/execution/ui/SparkPlanGraph.scala |  118 ++
 .../org/apache/spark/sql/jdbc/JDBCRDD.scala     |  490 --------
 .../apache/spark/sql/jdbc/JDBCRelation.scala    |  152 ---
 .../org/apache/spark/sql/jdbc/JdbcUtils.scala   |   52 -
 .../scala/org/apache/spark/sql/jdbc/jdbc.scala  |  250 ----
 .../org/apache/spark/sql/json/InferSchema.scala |  207 ----
 .../apache/spark/sql/json/JSONRelation.scala    |  203 ---
 .../spark/sql/json/JacksonGenerator.scala       |  135 --
 .../apache/spark/sql/json/JacksonParser.scala   |  228 ----
 .../apache/spark/sql/json/JacksonUtils.scala    |   32 -
 .../apache/spark/sql/metric/SQLMetrics.scala    |  149 ---
 .../spark/sql/parquet/CatalystReadSupport.scala |  153 ---
 .../parquet/CatalystRecordMaterializer.scala    |   41 -
 .../sql/parquet/CatalystRowConverter.scala      |  449 -------
 .../sql/parquet/CatalystSchemaConverter.scala   |  592 ---------
 .../parquet/DirectParquetOutputCommitter.scala  |   87 --
 .../spark/sql/parquet/ParquetConverter.scala    |   39 -
 .../spark/sql/parquet/ParquetFilters.scala      |  360 ------
 .../spark/sql/parquet/ParquetRelation.scala     |  796 ------------
 .../spark/sql/parquet/ParquetTableSupport.scala |  322 -----
 .../sql/parquet/ParquetTypesConverter.scala     |  159 ---
 .../apache/spark/sql/sources/interfaces.scala   |   15 +-
 .../apache/spark/sql/ui/AllExecutionsPage.scala |  238 ----
 .../org/apache/spark/sql/ui/ExecutionPage.scala |  127 --
 .../org/apache/spark/sql/ui/SQLListener.scala   |  355 ------
 .../scala/org/apache/spark/sql/ui/SQLTab.scala  |   49 -
 .../apache/spark/sql/ui/SparkPlanGraph.scala    |  118 --
 .../parquet/test/avro/CompatibilityTest.java    |   17 +
 .../datasources/parquet/test/avro/Nested.java   |  196 +++
 .../parquet/test/avro/ParquetAvroCompat.java    | 1001 +++++++++++++++
 .../parquet/test/avro/CompatibilityTest.java    |   17 -
 .../spark/sql/parquet/test/avro/Nested.java     |  196 ---
 .../parquet/test/avro/ParquetAvroCompat.java    | 1001 ---------------
 .../org/apache/spark/sql/DataFrameSuite.scala   |    4 +-
 .../execution/datasources/json/JsonSuite.scala  | 1172 ++++++++++++++++++
 .../datasources/json/TestJsonData.scala         |  206 +++
 .../parquet/ParquetAvroCompatibilitySuite.scala |  125 ++
 .../parquet/ParquetCompatibilityTest.scala      |   67 +
 .../parquet/ParquetFilterSuite.scala            |  319 +++++
 .../datasources/parquet/ParquetIOSuite.scala    |  436 +++++++
 .../ParquetPartitionDiscoverySuite.scala        |  602 +++++++++
 .../datasources/parquet/ParquetQuerySuite.scala |  205 +++
 .../parquet/ParquetSchemaSuite.scala            |  916 ++++++++++++++
 .../datasources/parquet/ParquetTest.scala       |  100 ++
 .../ParquetThriftCompatibilitySuite.scala       |   78 ++
 .../sql/execution/metric/SQLMetricsSuite.scala  |  135 ++
 .../sql/execution/ui/SQLListenerSuite.scala     |  348 ++++++
 .../org/apache/spark/sql/json/JsonSuite.scala   | 1172 ------------------
 .../apache/spark/sql/json/TestJsonData.scala    |  206 ---
 .../spark/sql/metric/SQLMetricsSuite.scala      |  145 ---
 .../parquet/ParquetAvroCompatibilitySuite.scala |  125 --
 .../sql/parquet/ParquetCompatibilityTest.scala  |   67 -
 .../spark/sql/parquet/ParquetFilterSuite.scala  |  319 -----
 .../spark/sql/parquet/ParquetIOSuite.scala      |  436 -------
 .../ParquetPartitionDiscoverySuite.scala        |  602 ---------
 .../spark/sql/parquet/ParquetQuerySuite.scala   |  205 ---
 .../spark/sql/parquet/ParquetSchemaSuite.scala  |  916 --------------
 .../apache/spark/sql/parquet/ParquetTest.scala  |  100 --
 .../ParquetThriftCompatibilitySuite.scala       |   78 --
 .../sql/sources/CreateTableAsSelectSuite.scala  |   20 +-
 .../spark/sql/sources/DDLSourceLoadSuite.scala  |   59 +-
 .../sql/sources/ResolvedDataSourceSuite.scala   |   39 +-
 .../apache/spark/sql/ui/SQLListenerSuite.scala  |  348 ------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |    2 +-
 .../apache/spark/sql/hive/orc/OrcRelation.scala |    6 +-
 .../spark/sql/hive/HiveParquetSuite.scala       |    2 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |    2 +-
 .../hive/ParquetHiveCompatibilitySuite.scala    |    2 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |    2 +-
 .../apache/spark/sql/hive/parquetSuites.scala   |    2 +-
 .../sources/ParquetHadoopFsRelationSuite.scala  |    4 +-
 .../SimpleTextHadoopFsRelationSuite.scala       |    2 +-
 117 files changed, 12515 insertions(+), 12367 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index b60ae78..90261ca 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -62,8 +62,6 @@ object MimaExcludes {
               "org.apache.spark.ml.classification.LogisticCostFun.this"),
             // SQL execution is considered private.
             excludePackage("org.apache.spark.sql.execution"),
-            // Parquet support is considered private.
-            excludePackage("org.apache.spark.sql.parquet"),
             // The old JSON RDD is removed in favor of streaming Jackson
             
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.json.JsonRDD$"),
             
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.json.JsonRDD"),
@@ -155,7 +153,27 @@ object MimaExcludes {
             
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD$"),
             
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitionSpec$"),
             
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DescribeCommand"),
-            
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DDLException")
+            
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DDLException"),
+            // SPARK-9763 Minimize exposure of internal SQL classes
+            excludePackage("org.apache.spark.sql.parquet"),
+            excludePackage("org.apache.spark.sql.json"),
+            
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD$DecimalConversion$"),
+            
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCPartition"),
+            
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JdbcUtils$"),
+            
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD$DecimalConversion"),
+            
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCPartitioningInfo$"),
+            
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCPartition$"),
+            
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.package"),
+            
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD$JDBCConversion"),
+            
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD$"),
+            
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.package$DriverWrapper"),
+            
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD"),
+            
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCPartitioningInfo"),
+            
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JdbcUtils"),
+            
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.DefaultSource"),
+            
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRelation$"),
+            
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.package$"),
+            
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRelation")
           ) ++ Seq(
             // SPARK-4751 Dynamic allocation for standalone mode
             ProblemFilters.exclude[MissingMethodProblem](

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index cc32d4b..ca50000 100644
--- 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -1,3 +1,3 @@
-org.apache.spark.sql.jdbc.DefaultSource
-org.apache.spark.sql.json.DefaultSource
-org.apache.spark.sql.parquet.DefaultSource
+org.apache.spark.sql.execution.datasources.jdbc.DefaultSource
+org.apache.spark.sql.execution.datasources.json.DefaultSource
+org.apache.spark.sql.execution.datasources.parquet.DefaultSource

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css
 
b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css
new file mode 100644
index 0000000..ddd3a91
--- /dev/null
+++ 
b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#plan-viz-graph .label {
+  font-weight: normal;
+  text-shadow: none;
+}
+
+#plan-viz-graph svg g.node rect {
+  fill: #C3EBFF;
+  stroke: #3EC0FF;
+  stroke-width: 1px;
+}
+
+/* Hightlight the SparkPlan node name */
+#plan-viz-graph svg text :first-child {
+  font-weight: bold;
+}
+
+#plan-viz-graph svg path {
+  stroke: #444;
+  stroke-width: 1.5px;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js
 
b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js
new file mode 100644
index 0000000..5161fcd
--- /dev/null
+++ 
b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.js
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+var PlanVizConstants = {
+  svgMarginX: 16,
+  svgMarginY: 16
+};
+
+function renderPlanViz() {
+  var svg = planVizContainer().append("svg");
+  var metadata = d3.select("#plan-viz-metadata");
+  var dot = metadata.select(".dot-file").text().trim();
+  var graph = svg.append("g");
+
+  var g = graphlibDot.read(dot);
+  preprocessGraphLayout(g);
+  var renderer = new dagreD3.render();
+  renderer(graph, g);
+
+  // Round corners on rectangles
+  svg
+    .selectAll("rect")
+    .attr("rx", "5")
+    .attr("ry", "5");
+
+  var nodeSize = parseInt($("#plan-viz-metadata-size").text());
+  for (var i = 0; i < nodeSize; i++) {
+    setupTooltipForSparkPlanNode(i);
+  }
+
+  resizeSvg(svg)
+}
+
+/* -------------------- *
+ * | Helper functions | *
+ * -------------------- */
+
+function planVizContainer() { return d3.select("#plan-viz-graph"); }
+
+/*
+ * Set up the tooltip for a SparkPlan node using metadata. When the user moves 
the mouse on the
+ * node, it will display the details of this SparkPlan node in the right.
+ */
+function setupTooltipForSparkPlanNode(nodeId) {
+  var nodeTooltip = d3.select("#plan-meta-data-" + nodeId).text()
+  d3.select("svg g .node_" + nodeId)
+    .on('mouseover', function(d) {
+      var domNode = d3.select(this).node();
+      $(domNode).tooltip({
+        title: nodeTooltip, trigger: "manual", container: "body", placement: 
"right"
+      });
+      $(domNode).tooltip("show");
+    })
+    .on('mouseout', function(d) {
+      var domNode = d3.select(this).node();
+      $(domNode).tooltip("destroy");
+    })
+}
+
+/*
+ * Helper function to pre-process the graph layout.
+ * This step is necessary for certain styles that affect the positioning
+ * and sizes of graph elements, e.g. padding, font style, shape.
+ */
+function preprocessGraphLayout(g) {
+  var nodes = g.nodes();
+  for (var i = 0; i < nodes.length; i++) {
+      var node = g.node(nodes[i]);
+      node.padding = "5";
+  }
+  // Curve the edges
+  var edges = g.edges();
+  for (var j = 0; j < edges.length; j++) {
+    var edge = g.edge(edges[j]);
+    edge.lineInterpolate = "basis";
+  }
+}
+
+/*
+ * Helper function to size the SVG appropriately such that all elements are 
displayed.
+ * This assumes that all outermost elements are clusters (rectangles).
+ */
+function resizeSvg(svg) {
+  var allClusters = svg.selectAll("g rect")[0];
+  console.log(allClusters);
+  var startX = -PlanVizConstants.svgMarginX +
+    toFloat(d3.min(allClusters, function(e) {
+      console.log(e);
+      return getAbsolutePosition(d3.select(e)).x;
+    }));
+  var startY = -PlanVizConstants.svgMarginY +
+    toFloat(d3.min(allClusters, function(e) {
+      return getAbsolutePosition(d3.select(e)).y;
+    }));
+  var endX = PlanVizConstants.svgMarginX +
+    toFloat(d3.max(allClusters, function(e) {
+      var t = d3.select(e);
+      return getAbsolutePosition(t).x + toFloat(t.attr("width"));
+    }));
+  var endY = PlanVizConstants.svgMarginY +
+    toFloat(d3.max(allClusters, function(e) {
+      var t = d3.select(e);
+      return getAbsolutePosition(t).y + toFloat(t.attr("height"));
+    }));
+  var width = endX - startX;
+  var height = endY - startY;
+  svg.attr("viewBox", startX + " " + startY + " " + width + " " + height)
+     .attr("width", width)
+     .attr("height", height);
+}
+
+/* Helper function to convert attributes to numeric values. */
+function toFloat(f) {
+  if (f) {
+    return parseFloat(f.toString().replace(/px$/, ""));
+  } else {
+    return f;
+  }
+}
+
+/*
+ * Helper function to compute the absolute position of the specified element 
in our graph.
+ */
+function getAbsolutePosition(d3selection) {
+  if (d3selection.empty()) {
+    throw "Attempted to get absolute position of an empty selection.";
+  }
+  var obj = d3selection;
+  var _x = toFloat(obj.attr("x")) || 0;
+  var _y = toFloat(obj.attr("y")) || 0;
+  while (!obj.empty()) {
+    var transformText = obj.attr("transform");
+    if (transformText) {
+      var translate = d3.transform(transformText).translate;
+      _x += toFloat(translate[0]);
+      _y += toFloat(translate[1]);
+    }
+    // Climb upwards to find how our parents are translated
+    obj = d3.select(obj.node().parentNode);
+    // Stop when we've reached the graph container itself
+    if (obj.node() == planVizContainer().node()) {
+      break;
+    }
+  }
+  return { x: _x, y: _y };
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/resources/org/apache/spark/sql/ui/static/spark-sql-viz.css
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/resources/org/apache/spark/sql/ui/static/spark-sql-viz.css 
b/sql/core/src/main/resources/org/apache/spark/sql/ui/static/spark-sql-viz.css
deleted file mode 100644
index ddd3a91..0000000
--- 
a/sql/core/src/main/resources/org/apache/spark/sql/ui/static/spark-sql-viz.css
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#plan-viz-graph .label {
-  font-weight: normal;
-  text-shadow: none;
-}
-
-#plan-viz-graph svg g.node rect {
-  fill: #C3EBFF;
-  stroke: #3EC0FF;
-  stroke-width: 1px;
-}
-
-/* Hightlight the SparkPlan node name */
-#plan-viz-graph svg text :first-child {
-  font-weight: bold;
-}
-
-#plan-viz-graph svg path {
-  stroke: #444;
-  stroke-width: 1.5px;
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/resources/org/apache/spark/sql/ui/static/spark-sql-viz.js
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/resources/org/apache/spark/sql/ui/static/spark-sql-viz.js 
b/sql/core/src/main/resources/org/apache/spark/sql/ui/static/spark-sql-viz.js
deleted file mode 100644
index 5161fcd..0000000
--- 
a/sql/core/src/main/resources/org/apache/spark/sql/ui/static/spark-sql-viz.js
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-var PlanVizConstants = {
-  svgMarginX: 16,
-  svgMarginY: 16
-};
-
-function renderPlanViz() {
-  var svg = planVizContainer().append("svg");
-  var metadata = d3.select("#plan-viz-metadata");
-  var dot = metadata.select(".dot-file").text().trim();
-  var graph = svg.append("g");
-
-  var g = graphlibDot.read(dot);
-  preprocessGraphLayout(g);
-  var renderer = new dagreD3.render();
-  renderer(graph, g);
-
-  // Round corners on rectangles
-  svg
-    .selectAll("rect")
-    .attr("rx", "5")
-    .attr("ry", "5");
-
-  var nodeSize = parseInt($("#plan-viz-metadata-size").text());
-  for (var i = 0; i < nodeSize; i++) {
-    setupTooltipForSparkPlanNode(i);
-  }
-
-  resizeSvg(svg)
-}
-
-/* -------------------- *
- * | Helper functions | *
- * -------------------- */
-
-function planVizContainer() { return d3.select("#plan-viz-graph"); }
-
-/*
- * Set up the tooltip for a SparkPlan node using metadata. When the user moves 
the mouse on the
- * node, it will display the details of this SparkPlan node in the right.
- */
-function setupTooltipForSparkPlanNode(nodeId) {
-  var nodeTooltip = d3.select("#plan-meta-data-" + nodeId).text()
-  d3.select("svg g .node_" + nodeId)
-    .on('mouseover', function(d) {
-      var domNode = d3.select(this).node();
-      $(domNode).tooltip({
-        title: nodeTooltip, trigger: "manual", container: "body", placement: 
"right"
-      });
-      $(domNode).tooltip("show");
-    })
-    .on('mouseout', function(d) {
-      var domNode = d3.select(this).node();
-      $(domNode).tooltip("destroy");
-    })
-}
-
-/*
- * Helper function to pre-process the graph layout.
- * This step is necessary for certain styles that affect the positioning
- * and sizes of graph elements, e.g. padding, font style, shape.
- */
-function preprocessGraphLayout(g) {
-  var nodes = g.nodes();
-  for (var i = 0; i < nodes.length; i++) {
-      var node = g.node(nodes[i]);
-      node.padding = "5";
-  }
-  // Curve the edges
-  var edges = g.edges();
-  for (var j = 0; j < edges.length; j++) {
-    var edge = g.edge(edges[j]);
-    edge.lineInterpolate = "basis";
-  }
-}
-
-/*
- * Helper function to size the SVG appropriately such that all elements are 
displayed.
- * This assumes that all outermost elements are clusters (rectangles).
- */
-function resizeSvg(svg) {
-  var allClusters = svg.selectAll("g rect")[0];
-  console.log(allClusters);
-  var startX = -PlanVizConstants.svgMarginX +
-    toFloat(d3.min(allClusters, function(e) {
-      console.log(e);
-      return getAbsolutePosition(d3.select(e)).x;
-    }));
-  var startY = -PlanVizConstants.svgMarginY +
-    toFloat(d3.min(allClusters, function(e) {
-      return getAbsolutePosition(d3.select(e)).y;
-    }));
-  var endX = PlanVizConstants.svgMarginX +
-    toFloat(d3.max(allClusters, function(e) {
-      var t = d3.select(e);
-      return getAbsolutePosition(t).x + toFloat(t.attr("width"));
-    }));
-  var endY = PlanVizConstants.svgMarginY +
-    toFloat(d3.max(allClusters, function(e) {
-      var t = d3.select(e);
-      return getAbsolutePosition(t).y + toFloat(t.attr("height"));
-    }));
-  var width = endX - startX;
-  var height = endY - startY;
-  svg.attr("viewBox", startX + " " + startY + " " + width + " " + height)
-     .attr("width", width)
-     .attr("height", height);
-}
-
-/* Helper function to convert attributes to numeric values. */
-function toFloat(f) {
-  if (f) {
-    return parseFloat(f.toString().replace(/px$/, ""));
-  } else {
-    return f;
-  }
-}
-
-/*
- * Helper function to compute the absolute position of the specified element 
in our graph.
- */
-function getAbsolutePosition(d3selection) {
-  if (d3selection.empty()) {
-    throw "Attempted to get absolute position of an empty selection.";
-  }
-  var obj = d3selection;
-  var _x = toFloat(obj.attr("x")) || 0;
-  var _y = toFloat(obj.attr("y")) || 0;
-  while (!obj.empty()) {
-    var transformText = obj.attr("transform");
-    if (transformText) {
-      var translate = d3.transform(transformText).translate;
-      _x += toFloat(translate[0]);
-      _y += toFloat(translate[1]);
-    }
-    // Climb upwards to find how our parents are translated
-    obj = d3.select(obj.node().parentNode);
-    // Stop when we've reached the graph container itself
-    if (obj.node() == planVizContainer().node()) {
-      break;
-    }
-  }
-  return { x: _x, y: _y };
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 570b8b2..27b994f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, 
SqlParser}
 import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, 
LogicalRDD, SQLExecution}
 import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, 
LogicalRelation}
-import org.apache.spark.sql.json.JacksonGenerator
+import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
 import org.apache.spark.sql.sources.HadoopFsRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.storage.StorageLevel

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 85f33c5..9ea955b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -25,10 +25,10 @@ import org.apache.spark.annotation.Experimental
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, 
JDBCPartitioningInfo, JDBCRelation}
+import org.apache.spark.sql.execution.datasources.json.JSONRelation
+import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
 import org.apache.spark.sql.execution.datasources.{LogicalRelation, 
ResolvedDataSource}
-import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, 
JDBCRelation}
-import org.apache.spark.sql.json.JSONRelation
-import org.apache.spark.sql.parquet.ParquetRelation
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.{Logging, Partition}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 2a4992d..5fa11da 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -23,8 +23,8 @@ import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.catalyst.{SqlParser, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
+import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
 import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, 
ResolvedDataSource}
-import org.apache.spark.sql.jdbc.{JDBCWriteDetails, JdbcUtils}
 import org.apache.spark.sql.sources.HadoopFsRelation
 
 
@@ -264,7 +264,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
 
       // Create the table if the table didn't exist.
       if (!tableExists) {
-        val schema = JDBCWriteDetails.schemaString(df, url)
+        val schema = JdbcUtils.schemaString(df, url)
         val sql = s"CREATE TABLE $table ($schema)"
         conn.prepareStatement(sql).executeUpdate()
       }
@@ -272,7 +272,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
       conn.close()
     }
 
-    JDBCWriteDetails.saveTable(df, url, table, connectionProperties)
+    JdbcUtils.saveTable(df, url, table, connectionProperties)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 8325725..f73bb04 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -43,7 +43,7 @@ import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.ui.{SQLListener, SQLTab}
+import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.Utils
 

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index 97f1323..cee5821 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicLong
 
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.ui.SparkPlanGraph
+import org.apache.spark.sql.execution.ui.SparkPlanGraph
 import org.apache.spark.util.Utils
 
 private[sql] object SQLExecution {

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 1915496..9ba5cf2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.metric.{IntSQLMetric, LongSQLMetric, SQLMetric, 
SQLMetrics}
+import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetric, 
SQLMetrics}
 import org.apache.spark.sql.types.DataType
 
 object SparkPlan {
@@ -99,12 +99,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
   private[sql] def metrics: Map[String, SQLMetric[_, _]] = defaultMetrics
 
   /**
-   * Return a IntSQLMetric according to the name.
-   */
-  private[sql] def intMetric(name: String): IntSQLMetric =
-    metrics(name).asInstanceOf[IntSQLMetric]
-
-  /**
    * Return a LongSQLMetric according to the name.
    */
   private[sql] def longMetric(name: String): LongSQLMetric =

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 24950f2..bf2de24 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.CatalystTypeConverters
 import org.apache.spark.sql.catalyst.errors._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.metric.SQLMetrics
+import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.collection.ExternalSorter
 import org.apache.spark.util.collection.unsafe.sort.PrefixComparator

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
new file mode 100644
index 0000000..6c462fa
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
@@ -0,0 +1,185 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.datasources
+
+import scala.language.implicitConversions
+import scala.util.matching.Regex
+
+import org.apache.spark.Logging
+import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.catalyst.{TableIdentifier, AbstractSparkSQLParser}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.types._
+
+
+/**
+ * A parser for foreign DDL commands.
+ */
+class DDLParser(parseQuery: String => LogicalPlan)
+  extends AbstractSparkSQLParser with DataTypeParser with Logging {
+
+  def parse(input: String, exceptionOnError: Boolean): LogicalPlan = {
+    try {
+      parse(input)
+    } catch {
+      case ddlException: DDLException => throw ddlException
+      case _ if !exceptionOnError => parseQuery(input)
+      case x: Throwable => throw x
+    }
+  }
+
+  // Keyword is a convention with AbstractSparkSQLParser, which will scan all 
of the `Keyword`
+  // properties via reflection the class in runtime for constructing the 
SqlLexical object
+  protected val CREATE = Keyword("CREATE")
+  protected val TEMPORARY = Keyword("TEMPORARY")
+  protected val TABLE = Keyword("TABLE")
+  protected val IF = Keyword("IF")
+  protected val NOT = Keyword("NOT")
+  protected val EXISTS = Keyword("EXISTS")
+  protected val USING = Keyword("USING")
+  protected val OPTIONS = Keyword("OPTIONS")
+  protected val DESCRIBE = Keyword("DESCRIBE")
+  protected val EXTENDED = Keyword("EXTENDED")
+  protected val AS = Keyword("AS")
+  protected val COMMENT = Keyword("COMMENT")
+  protected val REFRESH = Keyword("REFRESH")
+
+  protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | 
refreshTable
+
+  protected def start: Parser[LogicalPlan] = ddl
+
+  /**
+   * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
+   * USING org.apache.spark.sql.avro
+   * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
+   * or
+   * `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) 
[IF NOT EXISTS]
+   * USING org.apache.spark.sql.avro
+   * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
+   * or
+   * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
+   * USING org.apache.spark.sql.avro
+   * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
+   * AS SELECT ...
+   */
+  protected lazy val createTable: Parser[LogicalPlan] = {
+    // TODO: Support database.table.
+    (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident ~
+      tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> 
restInput).? ^^ {
+      case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ 
query =>
+        if (temp.isDefined && allowExisting.isDefined) {
+          throw new DDLException(
+            "a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS 
clause.")
+        }
+
+        val options = opts.getOrElse(Map.empty[String, String])
+        if (query.isDefined) {
+          if (columns.isDefined) {
+            throw new DDLException(
+              "a CREATE TABLE AS SELECT statement does not allow column 
definitions.")
+          }
+          // When IF NOT EXISTS clause appears in the query, the save mode 
will be ignore.
+          val mode = if (allowExisting.isDefined) {
+            SaveMode.Ignore
+          } else if (temp.isDefined) {
+            SaveMode.Overwrite
+          } else {
+            SaveMode.ErrorIfExists
+          }
+
+          val queryPlan = parseQuery(query.get)
+          CreateTableUsingAsSelect(tableName,
+            provider,
+            temp.isDefined,
+            Array.empty[String],
+            mode,
+            options,
+            queryPlan)
+        } else {
+          val userSpecifiedSchema = columns.flatMap(fields => 
Some(StructType(fields)))
+          CreateTableUsing(
+            tableName,
+            userSpecifiedSchema,
+            provider,
+            temp.isDefined,
+            options,
+            allowExisting.isDefined,
+            managedIfNoPath = false)
+        }
+    }
+  }
+
+  protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> 
repsep(column, ",") <~ ")"
+
+  /*
+   * describe [extended] table avroTable
+   * This will display all columns of table `avroTable` includes 
column_name,column_type,comment
+   */
+  protected lazy val describeTable: Parser[LogicalPlan] =
+    (DESCRIBE ~> opt(EXTENDED)) ~ (ident <~ ".").? ~ ident  ^^ {
+      case e ~ db ~ tbl =>
+        val tblIdentifier = db match {
+          case Some(dbName) =>
+            Seq(dbName, tbl)
+          case None =>
+            Seq(tbl)
+        }
+        DescribeCommand(UnresolvedRelation(tblIdentifier, None), e.isDefined)
+    }
+
+  protected lazy val refreshTable: Parser[LogicalPlan] =
+    REFRESH ~> TABLE ~> (ident <~ ".").? ~ ident ^^ {
+      case maybeDatabaseName ~ tableName =>
+        RefreshTable(TableIdentifier(tableName, maybeDatabaseName))
+    }
+
+  protected lazy val options: Parser[Map[String, String]] =
+    "(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => 
s.toMap }
+
+  protected lazy val className: Parser[String] = repsep(ident, ".") ^^ { case 
s => s.mkString(".")}
+
+  override implicit def regexToParser(regex: Regex): Parser[String] = 
acceptMatch(
+    s"identifier matching regex $regex", {
+      case lexical.Identifier(str) if regex.unapplySeq(str).isDefined => str
+      case lexical.Keyword(str) if regex.unapplySeq(str).isDefined => str
+    }
+  )
+
+  protected lazy val optionPart: Parser[String] = "[_a-zA-Z][_a-zA-Z0-9]*".r 
^^ {
+    case name => name
+  }
+
+  protected lazy val optionName: Parser[String] = repsep(optionPart, ".") ^^ {
+    case parts => parts.mkString(".")
+  }
+
+  protected lazy val pair: Parser[(String, String)] =
+    optionName ~ stringLit ^^ { case k ~ v => (k, v) }
+
+  protected lazy val column: Parser[StructField] =
+    ident ~ dataType ~ (COMMENT ~> stringLit).?  ^^ { case columnName ~ typ ~ 
cm =>
+      val meta = cm match {
+        case Some(comment) =>
+          new MetadataBuilder().putString(COMMENT.str.toLowerCase, 
comment).build()
+        case None => Metadata.empty
+      }
+
+      StructField(columnName, typ, nullable = true, meta)
+    }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DefaultSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DefaultSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DefaultSource.scala
new file mode 100644
index 0000000..6e4cc4d
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DefaultSource.scala
@@ -0,0 +1,64 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.Properties
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.execution.datasources.jdbc.{JDBCRelation, 
JDBCPartitioningInfo, DriverRegistry}
+import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider}
+
+
+class DefaultSource extends RelationProvider with DataSourceRegister {
+
+  override def shortName(): String = "jdbc"
+
+  /** Returns a new base relation with the given parameters. */
+  override def createRelation(
+      sqlContext: SQLContext,
+      parameters: Map[String, String]): BaseRelation = {
+    val url = parameters.getOrElse("url", sys.error("Option 'url' not 
specified"))
+    val driver = parameters.getOrElse("driver", null)
+    val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' 
not specified"))
+    val partitionColumn = parameters.getOrElse("partitionColumn", null)
+    val lowerBound = parameters.getOrElse("lowerBound", null)
+    val upperBound = parameters.getOrElse("upperBound", null)
+    val numPartitions = parameters.getOrElse("numPartitions", null)
+
+    if (driver != null) DriverRegistry.register(driver)
+
+    if (partitionColumn != null
+      && (lowerBound == null || upperBound == null || numPartitions == null)) {
+      sys.error("Partitioning incompletely specified")
+    }
+
+    val partitionInfo = if (partitionColumn == null) {
+      null
+    } else {
+      JDBCPartitioningInfo(
+        partitionColumn,
+        lowerBound.toLong,
+        upperBound.toLong,
+        numPartitions.toInt)
+    }
+    val parts = JDBCRelation.columnPartition(partitionInfo)
+    val properties = new Properties() // Additional properties that we will 
pass to getConnection
+    parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
+    JDBCRelation(url, table, parts, properties)(sqlContext)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
index 6ccde76..3b7dc2e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSource.scala
@@ -17,27 +17,10 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import java.io.IOException
-import java.util.{Date, UUID}
-
-import scala.collection.JavaConversions.asScalaIterator
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => 
MapReduceFileOutputCommitter, FileOutputFormat}
-import org.apache.spark._
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
-import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.execution.{RunnableCommand, SQLExecution}
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.StringType
-import org.apache.spark.util.{Utils, SerializableConfiguration}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.RunnableCommand
+import org.apache.spark.sql.sources.InsertableRelation
 
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
new file mode 100644
index 0000000..7770bbd
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
@@ -0,0 +1,204 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.datasources
+
+import java.util.ServiceLoader
+
+import scala.collection.JavaConversions._
+import scala.language.{existentials, implicitConversions}
+import scala.util.{Success, Failure, Try}
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.Logging
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.{DataFrame, SaveMode, AnalysisException, 
SQLContext}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
+import org.apache.spark.util.Utils
+
+
+case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)
+
+
+object ResolvedDataSource extends Logging {
+
+  /** A map to maintain backward compatibility in case we move data sources 
around. */
+  private val backwardCompatibilityMap = Map(
+    "org.apache.spark.sql.jdbc" -> 
classOf[jdbc.DefaultSource].getCanonicalName,
+    "org.apache.spark.sql.jdbc.DefaultSource" -> 
classOf[jdbc.DefaultSource].getCanonicalName,
+    "org.apache.spark.sql.json" -> 
classOf[json.DefaultSource].getCanonicalName,
+    "org.apache.spark.sql.json.DefaultSource" -> 
classOf[json.DefaultSource].getCanonicalName,
+    "org.apache.spark.sql.parquet" -> 
classOf[parquet.DefaultSource].getCanonicalName,
+    "org.apache.spark.sql.parquet.DefaultSource" -> 
classOf[parquet.DefaultSource].getCanonicalName
+  )
+
+  /** Given a provider name, look up the data source class definition. */
+  def lookupDataSource(provider0: String): Class[_] = {
+    val provider = backwardCompatibilityMap.getOrElse(provider0, provider0)
+    val provider2 = s"$provider.DefaultSource"
+    val loader = Utils.getContextOrSparkClassLoader
+    val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
+
+    
serviceLoader.iterator().filter(_.shortName().equalsIgnoreCase(provider)).toList
 match {
+      /** the provider format did not match any given registered aliases */
+      case Nil => 
Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match {
+        case Success(dataSource) => dataSource
+        case Failure(error) =>
+          if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
+            throw new ClassNotFoundException(
+              "The ORC data source must be used with Hive support enabled.", 
error)
+          } else {
+            throw new ClassNotFoundException(
+              s"Failed to load class for data source: $provider.", error)
+          }
+      }
+      /** there is exactly one registered alias */
+      case head :: Nil => head.getClass
+      /** There are multiple registered aliases for the input */
+      case sources => sys.error(s"Multiple sources found for $provider, " +
+        s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
+        "please specify the fully qualified class name.")
+    }
+  }
+
+  /** Create a [[ResolvedDataSource]] for reading data in. */
+  def apply(
+      sqlContext: SQLContext,
+      userSpecifiedSchema: Option[StructType],
+      partitionColumns: Array[String],
+      provider: String,
+      options: Map[String, String]): ResolvedDataSource = {
+    val clazz: Class[_] = lookupDataSource(provider)
+    def className: String = clazz.getCanonicalName
+    val relation = userSpecifiedSchema match {
+      case Some(schema: StructType) => clazz.newInstance() match {
+        case dataSource: SchemaRelationProvider =>
+          dataSource.createRelation(sqlContext, new 
CaseInsensitiveMap(options), schema)
+        case dataSource: HadoopFsRelationProvider =>
+          val maybePartitionsSchema = if (partitionColumns.isEmpty) {
+            None
+          } else {
+            Some(partitionColumnsSchema(schema, partitionColumns))
+          }
+
+          val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+          val paths = {
+            val patternPath = new Path(caseInsensitiveOptions("path"))
+            val fs = 
patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+            val qualifiedPattern = patternPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory)
+            
SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
+          }
+
+          val dataSchema =
+            StructType(schema.filterNot(f => 
partitionColumns.contains(f.name))).asNullable
+
+          dataSource.createRelation(
+            sqlContext,
+            paths,
+            Some(dataSchema),
+            maybePartitionsSchema,
+            caseInsensitiveOptions)
+        case dataSource: org.apache.spark.sql.sources.RelationProvider =>
+          throw new AnalysisException(s"$className does not allow 
user-specified schemas.")
+        case _ =>
+          throw new AnalysisException(s"$className is not a RelationProvider.")
+      }
+
+      case None => clazz.newInstance() match {
+        case dataSource: RelationProvider =>
+          dataSource.createRelation(sqlContext, new 
CaseInsensitiveMap(options))
+        case dataSource: HadoopFsRelationProvider =>
+          val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+          val paths = {
+            val patternPath = new Path(caseInsensitiveOptions("path"))
+            val fs = 
patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+            val qualifiedPattern = patternPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory)
+            
SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
+          }
+          dataSource.createRelation(sqlContext, paths, None, None, 
caseInsensitiveOptions)
+        case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>
+          throw new AnalysisException(
+            s"A schema needs to be specified when using $className.")
+        case _ =>
+          throw new AnalysisException(
+            s"$className is neither a RelationProvider nor a 
FSBasedRelationProvider.")
+      }
+    }
+    new ResolvedDataSource(clazz, relation)
+  }
+
+  private def partitionColumnsSchema(
+      schema: StructType,
+      partitionColumns: Array[String]): StructType = {
+    StructType(partitionColumns.map { col =>
+      schema.find(_.name == col).getOrElse {
+        throw new RuntimeException(s"Partition column $col not found in schema 
$schema")
+      }
+    }).asNullable
+  }
+
+  /** Create a [[ResolvedDataSource]] for saving the content of the given 
DataFrame. */
+  def apply(
+      sqlContext: SQLContext,
+      provider: String,
+      partitionColumns: Array[String],
+      mode: SaveMode,
+      options: Map[String, String],
+      data: DataFrame): ResolvedDataSource = {
+    if 
(data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
+      throw new AnalysisException("Cannot save interval data type into 
external storage.")
+    }
+    val clazz: Class[_] = lookupDataSource(provider)
+    val relation = clazz.newInstance() match {
+      case dataSource: CreatableRelationProvider =>
+        dataSource.createRelation(sqlContext, mode, options, data)
+      case dataSource: HadoopFsRelationProvider =>
+        // Don't glob path for the write path.  The contracts here are:
+        //  1. Only one output path can be specified on the write path;
+        //  2. Output path must be a legal HDFS style file system path;
+        //  3. It's OK that the output path doesn't exist yet;
+        val caseInsensitiveOptions = new CaseInsensitiveMap(options)
+        val outputPath = {
+          val path = new Path(caseInsensitiveOptions("path"))
+          val fs = 
path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+          path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+        }
+        val dataSchema = StructType(data.schema.filterNot(f => 
partitionColumns.contains(f.name)))
+        val r = dataSource.createRelation(
+          sqlContext,
+          Array(outputPath.toString),
+          Some(dataSchema.asNullable),
+          Some(partitionColumnsSchema(data.schema, partitionColumns)),
+          caseInsensitiveOptions)
+
+        // For partitioned relation r, r.schema's column ordering can be 
different from the column
+        // ordering of data.logicalPlan (partition columns are all moved after 
data column).  This
+        // will be adjusted within InsertIntoHadoopFsRelation.
+        sqlContext.executePlan(
+          InsertIntoHadoopFsRelation(
+            r,
+            data.logicalPlan,
+            mode)).toRdd
+        r
+      case _ =>
+        sys.error(s"${clazz.getCanonicalName} does not allow create table as 
select.")
+    }
+    ResolvedDataSource(clazz, relation)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 8c2f297..ecd304c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -17,340 +17,12 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import java.util.ServiceLoader
-
-import scala.collection.Iterator
-import scala.collection.JavaConversions._
-import scala.language.{existentials, implicitConversions}
-import scala.util.{Failure, Success, Try}
-import scala.util.matching.Regex
-
-import org.apache.hadoop.fs.Path
-
-import org.apache.spark.Logging
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.{AbstractSparkSQLParser, TableIdentifier}
 import org.apache.spark.sql.execution.RunnableCommand
-import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SQLContext, 
SaveMode}
-import org.apache.spark.util.Utils
-
-/**
- * A parser for foreign DDL commands.
- */
-private[sql] class DDLParser(
-    parseQuery: String => LogicalPlan)
-  extends AbstractSparkSQLParser with DataTypeParser with Logging {
-
-  def parse(input: String, exceptionOnError: Boolean): LogicalPlan = {
-    try {
-      parse(input)
-    } catch {
-      case ddlException: DDLException => throw ddlException
-      case _ if !exceptionOnError => parseQuery(input)
-      case x: Throwable => throw x
-    }
-  }
-
-  // Keyword is a convention with AbstractSparkSQLParser, which will scan all 
of the `Keyword`
-  // properties via reflection the class in runtime for constructing the 
SqlLexical object
-  protected val CREATE = Keyword("CREATE")
-  protected val TEMPORARY = Keyword("TEMPORARY")
-  protected val TABLE = Keyword("TABLE")
-  protected val IF = Keyword("IF")
-  protected val NOT = Keyword("NOT")
-  protected val EXISTS = Keyword("EXISTS")
-  protected val USING = Keyword("USING")
-  protected val OPTIONS = Keyword("OPTIONS")
-  protected val DESCRIBE = Keyword("DESCRIBE")
-  protected val EXTENDED = Keyword("EXTENDED")
-  protected val AS = Keyword("AS")
-  protected val COMMENT = Keyword("COMMENT")
-  protected val REFRESH = Keyword("REFRESH")
-
-  protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | 
refreshTable
-
-  protected def start: Parser[LogicalPlan] = ddl
-
-  /**
-   * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
-   * USING org.apache.spark.sql.avro
-   * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
-   * or
-   * `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) 
[IF NOT EXISTS]
-   * USING org.apache.spark.sql.avro
-   * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
-   * or
-   * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
-   * USING org.apache.spark.sql.avro
-   * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
-   * AS SELECT ...
-   */
-  protected lazy val createTable: Parser[LogicalPlan] =
-    // TODO: Support database.table.
-    (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident ~
-      tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> 
restInput).? ^^ {
-      case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ 
query =>
-        if (temp.isDefined && allowExisting.isDefined) {
-          throw new DDLException(
-            "a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS 
clause.")
-        }
-
-        val options = opts.getOrElse(Map.empty[String, String])
-        if (query.isDefined) {
-          if (columns.isDefined) {
-            throw new DDLException(
-              "a CREATE TABLE AS SELECT statement does not allow column 
definitions.")
-          }
-          // When IF NOT EXISTS clause appears in the query, the save mode 
will be ignore.
-          val mode = if (allowExisting.isDefined) {
-            SaveMode.Ignore
-          } else if (temp.isDefined) {
-            SaveMode.Overwrite
-          } else {
-            SaveMode.ErrorIfExists
-          }
-
-          val queryPlan = parseQuery(query.get)
-          CreateTableUsingAsSelect(tableName,
-            provider,
-            temp.isDefined,
-            Array.empty[String],
-            mode,
-            options,
-            queryPlan)
-        } else {
-          val userSpecifiedSchema = columns.flatMap(fields => 
Some(StructType(fields)))
-          CreateTableUsing(
-            tableName,
-            userSpecifiedSchema,
-            provider,
-            temp.isDefined,
-            options,
-            allowExisting.isDefined,
-            managedIfNoPath = false)
-        }
-    }
-
-  protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> 
repsep(column, ",") <~ ")"
-
-  /*
-   * describe [extended] table avroTable
-   * This will display all columns of table `avroTable` includes 
column_name,column_type,comment
-   */
-  protected lazy val describeTable: Parser[LogicalPlan] =
-    (DESCRIBE ~> opt(EXTENDED)) ~ (ident <~ ".").? ~ ident  ^^ {
-      case e ~ db ~ tbl =>
-        val tblIdentifier = db match {
-          case Some(dbName) =>
-            Seq(dbName, tbl)
-          case None =>
-            Seq(tbl)
-        }
-        DescribeCommand(UnresolvedRelation(tblIdentifier, None), e.isDefined)
-   }
-
-  protected lazy val refreshTable: Parser[LogicalPlan] =
-    REFRESH ~> TABLE ~> (ident <~ ".").? ~ ident ^^ {
-      case maybeDatabaseName ~ tableName =>
-        RefreshTable(TableIdentifier(tableName, maybeDatabaseName))
-    }
-
-  protected lazy val options: Parser[Map[String, String]] =
-    "(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => 
s.toMap }
-
-  protected lazy val className: Parser[String] = repsep(ident, ".") ^^ { case 
s => s.mkString(".")}
-
-  override implicit def regexToParser(regex: Regex): Parser[String] = 
acceptMatch(
-    s"identifier matching regex $regex", {
-      case lexical.Identifier(str) if regex.unapplySeq(str).isDefined => str
-      case lexical.Keyword(str) if regex.unapplySeq(str).isDefined => str
-    }
-  )
-
-  protected lazy val optionPart: Parser[String] = "[_a-zA-Z][_a-zA-Z0-9]*".r 
^^ {
-    case name => name
-  }
-
-  protected lazy val optionName: Parser[String] = repsep(optionPart, ".") ^^ {
-    case parts => parts.mkString(".")
-  }
-
-  protected lazy val pair: Parser[(String, String)] =
-    optionName ~ stringLit ^^ { case k ~ v => (k, v) }
-
-  protected lazy val column: Parser[StructField] =
-    ident ~ dataType ~ (COMMENT ~> stringLit).?  ^^ { case columnName ~ typ ~ 
cm =>
-      val meta = cm match {
-        case Some(comment) =>
-          new MetadataBuilder().putString(COMMENT.str.toLowerCase, 
comment).build()
-        case None => Metadata.empty
-      }
-
-      StructField(columnName, typ, nullable = true, meta)
-    }
-}
-
-private[sql] object ResolvedDataSource extends Logging {
-
-  /** Given a provider name, look up the data source class definition. */
-  def lookupDataSource(provider: String): Class[_] = {
-    val provider2 = s"$provider.DefaultSource"
-    val loader = Utils.getContextOrSparkClassLoader
-    val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
-
-    
serviceLoader.iterator().filter(_.format().equalsIgnoreCase(provider)).toList 
match {
-      /** the provider format did not match any given registered aliases */
-      case Nil => 
Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match {
-        case Success(dataSource) => dataSource
-        case Failure(error) => if 
(provider.startsWith("org.apache.spark.sql.hive.orc")) {
-          throw new ClassNotFoundException(
-            "The ORC data source must be used with Hive support enabled.", 
error)
-        } else {
-          throw new ClassNotFoundException(
-            s"Failed to load class for data source: $provider", error)
-        }
-      }
-      /** there is exactly one registered alias */
-      case head :: Nil => head.getClass
-      /** There are multiple registered aliases for the input */
-      case sources => sys.error(s"Multiple sources found for $provider, " +
-        s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
-        "please specify the fully qualified class name")
-    }
-  }
-
-  /** Create a [[ResolvedDataSource]] for reading data in. */
-  def apply(
-      sqlContext: SQLContext,
-      userSpecifiedSchema: Option[StructType],
-      partitionColumns: Array[String],
-      provider: String,
-      options: Map[String, String]): ResolvedDataSource = {
-    val clazz: Class[_] = lookupDataSource(provider)
-    def className: String = clazz.getCanonicalName
-    val relation = userSpecifiedSchema match {
-      case Some(schema: StructType) => clazz.newInstance() match {
-        case dataSource: SchemaRelationProvider =>
-          dataSource.createRelation(sqlContext, new 
CaseInsensitiveMap(options), schema)
-        case dataSource: HadoopFsRelationProvider =>
-          val maybePartitionsSchema = if (partitionColumns.isEmpty) {
-            None
-          } else {
-            Some(partitionColumnsSchema(schema, partitionColumns))
-          }
-
-          val caseInsensitiveOptions = new CaseInsensitiveMap(options)
-          val paths = {
-            val patternPath = new Path(caseInsensitiveOptions("path"))
-            val fs = 
patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-            val qualifiedPattern = patternPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory)
-            
SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
-          }
-
-          val dataSchema =
-            StructType(schema.filterNot(f => 
partitionColumns.contains(f.name))).asNullable
-
-          dataSource.createRelation(
-            sqlContext,
-            paths,
-            Some(dataSchema),
-            maybePartitionsSchema,
-            caseInsensitiveOptions)
-        case dataSource: org.apache.spark.sql.sources.RelationProvider =>
-          throw new AnalysisException(s"$className does not allow 
user-specified schemas.")
-        case _ =>
-          throw new AnalysisException(s"$className is not a RelationProvider.")
-      }
-
-      case None => clazz.newInstance() match {
-        case dataSource: RelationProvider =>
-          dataSource.createRelation(sqlContext, new 
CaseInsensitiveMap(options))
-        case dataSource: HadoopFsRelationProvider =>
-          val caseInsensitiveOptions = new CaseInsensitiveMap(options)
-          val paths = {
-            val patternPath = new Path(caseInsensitiveOptions("path"))
-            val fs = 
patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-            val qualifiedPattern = patternPath.makeQualified(fs.getUri, 
fs.getWorkingDirectory)
-            
SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
-          }
-          dataSource.createRelation(sqlContext, paths, None, None, 
caseInsensitiveOptions)
-        case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>
-          throw new AnalysisException(
-            s"A schema needs to be specified when using $className.")
-        case _ =>
-          throw new AnalysisException(
-            s"$className is neither a RelationProvider nor a 
FSBasedRelationProvider.")
-      }
-    }
-    new ResolvedDataSource(clazz, relation)
-  }
-
-  private def partitionColumnsSchema(
-      schema: StructType,
-      partitionColumns: Array[String]): StructType = {
-    StructType(partitionColumns.map { col =>
-      schema.find(_.name == col).getOrElse {
-        throw new RuntimeException(s"Partition column $col not found in schema 
$schema")
-      }
-    }).asNullable
-  }
-
-  /** Create a [[ResolvedDataSource]] for saving the content of the given 
[[DataFrame]]. */
-  def apply(
-      sqlContext: SQLContext,
-      provider: String,
-      partitionColumns: Array[String],
-      mode: SaveMode,
-      options: Map[String, String],
-      data: DataFrame): ResolvedDataSource = {
-    if 
(data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
-      throw new AnalysisException("Cannot save interval data type into 
external storage.")
-    }
-    val clazz: Class[_] = lookupDataSource(provider)
-    val relation = clazz.newInstance() match {
-      case dataSource: CreatableRelationProvider =>
-        dataSource.createRelation(sqlContext, mode, options, data)
-      case dataSource: HadoopFsRelationProvider =>
-        // Don't glob path for the write path.  The contracts here are:
-        //  1. Only one output path can be specified on the write path;
-        //  2. Output path must be a legal HDFS style file system path;
-        //  3. It's OK that the output path doesn't exist yet;
-        val caseInsensitiveOptions = new CaseInsensitiveMap(options)
-        val outputPath = {
-          val path = new Path(caseInsensitiveOptions("path"))
-          val fs = 
path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-          path.makeQualified(fs.getUri, fs.getWorkingDirectory)
-        }
-        val dataSchema = StructType(data.schema.filterNot(f => 
partitionColumns.contains(f.name)))
-        val r = dataSource.createRelation(
-          sqlContext,
-          Array(outputPath.toString),
-          Some(dataSchema.asNullable),
-          Some(partitionColumnsSchema(data.schema, partitionColumns)),
-          caseInsensitiveOptions)
-
-        // For partitioned relation r, r.schema's column ordering can be 
different from the column
-        // ordering of data.logicalPlan (partition columns are all moved after 
data column).  This
-        // will be adjusted within InsertIntoHadoopFsRelation.
-        sqlContext.executePlan(
-          InsertIntoHadoopFsRelation(
-            r,
-            data.logicalPlan,
-            mode)).toRdd
-        r
-      case _ =>
-        sys.error(s"${clazz.getCanonicalName} does not allow create table as 
select.")
-    }
-    new ResolvedDataSource(clazz, relation)
-  }
-}
-
-private[sql] case class ResolvedDataSource(provider: Class[_], relation: 
BaseRelation)
+import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
 
 /**
  * Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
@@ -358,11 +30,12 @@ private[sql] case class ResolvedDataSource(provider: 
Class[_], relation: BaseRel
  * @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
  *                   It is effective only when the table is a Hive table.
  */
-private[sql] case class DescribeCommand(
+case class DescribeCommand(
     table: LogicalPlan,
     isExtended: Boolean) extends LogicalPlan with Command {
 
   override def children: Seq[LogicalPlan] = Seq.empty
+
   override val output: Seq[Attribute] = Seq(
     // Column names are based on Hive.
     AttributeReference("col_name", StringType, nullable = false,
@@ -370,7 +43,8 @@ private[sql] case class DescribeCommand(
     AttributeReference("data_type", StringType, nullable = false,
       new MetadataBuilder().putString("comment", "data type of the 
column").build())(),
     AttributeReference("comment", StringType, nullable = false,
-      new MetadataBuilder().putString("comment", "comment of the 
column").build())())
+      new MetadataBuilder().putString("comment", "comment of the 
column").build())()
+  )
 }
 
 /**
@@ -378,7 +52,7 @@ private[sql] case class DescribeCommand(
   * @param allowExisting If it is true, we will do nothing when the table 
already exists.
   *                      If it is false, an exception will be thrown
   */
-private[sql] case class CreateTableUsing(
+case class CreateTableUsing(
     tableName: String,
     userSpecifiedSchema: Option[StructType],
     provider: String,
@@ -397,7 +71,7 @@ private[sql] case class CreateTableUsing(
  * can analyze the logical plan that will be used to populate the table.
  * So, [[PreWriteCheck]] can detect cases that are not allowed.
  */
-private[sql] case class CreateTableUsingAsSelect(
+case class CreateTableUsingAsSelect(
     tableName: String,
     provider: String,
     temporary: Boolean,
@@ -410,7 +84,7 @@ private[sql] case class CreateTableUsingAsSelect(
   // override lazy val resolved = databaseName != None && childrenResolved
 }
 
-private[sql] case class CreateTempTableUsing(
+case class CreateTempTableUsing(
     tableName: String,
     userSpecifiedSchema: Option[StructType],
     provider: String,
@@ -425,7 +99,7 @@ private[sql] case class CreateTempTableUsing(
   }
 }
 
-private[sql] case class CreateTempTableUsingAsSelect(
+case class CreateTempTableUsingAsSelect(
     tableName: String,
     provider: String,
     partitionColumns: Array[String],
@@ -443,7 +117,7 @@ private[sql] case class CreateTempTableUsingAsSelect(
   }
 }
 
-private[sql] case class RefreshTable(tableIdent: TableIdentifier)
+case class RefreshTable(tableIdent: TableIdentifier)
   extends RunnableCommand {
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
@@ -472,7 +146,7 @@ private[sql] case class RefreshTable(tableIdent: 
TableIdentifier)
 /**
  * Builds a map in which keys are case insensitive
  */
-protected[sql] class CaseInsensitiveMap(map: Map[String, String]) extends 
Map[String, String]
+class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
   with Serializable {
 
   val baseMap = map.map(kv => kv.copy(_1 = kv._1.toLowerCase))
@@ -490,4 +164,4 @@ protected[sql] class CaseInsensitiveMap(map: Map[String, 
String]) extends Map[St
 /**
  * The exception thrown from the DDL parser.
  */
-protected[sql] class DDLException(message: String) extends Exception(message)
+class DDLException(message: String) extends RuntimeException(message)

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala
new file mode 100644
index 0000000..6773afc
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DefaultSource.scala
@@ -0,0 +1,62 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.datasources.jdbc
+
+import java.util.Properties
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.sources.{BaseRelation, RelationProvider, 
DataSourceRegister}
+
+class DefaultSource extends RelationProvider with DataSourceRegister {
+
+  override def shortName(): String = "jdbc"
+
+  /** Returns a new base relation with the given parameters. */
+  override def createRelation(
+      sqlContext: SQLContext,
+      parameters: Map[String, String]): BaseRelation = {
+    val url = parameters.getOrElse("url", sys.error("Option 'url' not 
specified"))
+    val driver = parameters.getOrElse("driver", null)
+    val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' 
not specified"))
+    val partitionColumn = parameters.getOrElse("partitionColumn", null)
+    val lowerBound = parameters.getOrElse("lowerBound", null)
+    val upperBound = parameters.getOrElse("upperBound", null)
+    val numPartitions = parameters.getOrElse("numPartitions", null)
+
+    if (driver != null) DriverRegistry.register(driver)
+
+    if (partitionColumn != null
+      && (lowerBound == null || upperBound == null || numPartitions == null)) {
+      sys.error("Partitioning incompletely specified")
+    }
+
+    val partitionInfo = if (partitionColumn == null) {
+      null
+    } else {
+      JDBCPartitioningInfo(
+        partitionColumn,
+        lowerBound.toLong,
+        upperBound.toLong,
+        numPartitions.toInt)
+    }
+    val parts = JDBCRelation.columnPartition(partitionInfo)
+    val properties = new Properties() // Additional properties that we will 
pass to getConnection
+    parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
+    JDBCRelation(url, table, parts, properties)(sqlContext)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverRegistry.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverRegistry.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverRegistry.scala
new file mode 100644
index 0000000..7ccd61e
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverRegistry.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.jdbc
+
+import java.sql.{Driver, DriverManager}
+
+import scala.collection.mutable
+
+import org.apache.spark.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * java.sql.DriverManager is always loaded by bootstrap classloader,
+ * so it can't load JDBC drivers accessible by Spark ClassLoader.
+ *
+ * To solve the problem, drivers from user-supplied jars are wrapped into thin 
wrapper.
+ */
+object DriverRegistry extends Logging {
+
+  private val wrapperMap: mutable.Map[String, DriverWrapper] = 
mutable.Map.empty
+
+  def register(className: String): Unit = {
+    val cls = Utils.getContextOrSparkClassLoader.loadClass(className)
+    if (cls.getClassLoader == null) {
+      logTrace(s"$className has been loaded with bootstrap ClassLoader, 
wrapper is not required")
+    } else if (wrapperMap.get(className).isDefined) {
+      logTrace(s"Wrapper for $className already exists")
+    } else {
+      synchronized {
+        if (wrapperMap.get(className).isEmpty) {
+          val wrapper = new 
DriverWrapper(cls.newInstance().asInstanceOf[Driver])
+          DriverManager.registerDriver(wrapper)
+          wrapperMap(className) = wrapper
+          logTrace(s"Wrapper for $className registered")
+        }
+      }
+    }
+  }
+
+  def getDriverClassName(url: String): String = DriverManager.getDriver(url) 
match {
+    case wrapper: DriverWrapper => wrapper.wrapped.getClass.getCanonicalName
+    case driver => driver.getClass.getCanonicalName
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/40ed2af5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverWrapper.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverWrapper.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverWrapper.scala
new file mode 100644
index 0000000..18263fe
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/DriverWrapper.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.jdbc
+
+import java.sql.{Connection, Driver, DriverPropertyInfo, 
SQLFeatureNotSupportedException}
+import java.util.Properties
+
+/**
+ * A wrapper for a JDBC Driver to work around SPARK-6913.
+ *
+ * The problem is in `java.sql.DriverManager` class that can't access drivers 
loaded by
+ * Spark ClassLoader.
+ */
+class DriverWrapper(val wrapped: Driver) extends Driver {
+  override def acceptsURL(url: String): Boolean = wrapped.acceptsURL(url)
+
+  override def jdbcCompliant(): Boolean = wrapped.jdbcCompliant()
+
+  override def getPropertyInfo(url: String, info: Properties): 
Array[DriverPropertyInfo] = {
+    wrapped.getPropertyInfo(url, info)
+  }
+
+  override def getMinorVersion: Int = wrapped.getMinorVersion
+
+  def getParentLogger: java.util.logging.Logger = {
+    throw new SQLFeatureNotSupportedException(
+      s"${this.getClass.getName}.getParentLogger is not yet implemented.")
+  }
+
+  override def connect(url: String, info: Properties): Connection = 
wrapped.connect(url, info)
+
+  override def getMajorVersion: Int = wrapped.getMajorVersion
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to