[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/16517


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-17 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96566857
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 ---
@@ -276,40 +276,31 @@ case class InsertIntoHiveTable(
   }
 }
 
-val jobConf = new JobConf(hadoopConf)
-val jobConfSer = new SerializableJobConf(jobConf)
-
-// When speculation is on and output committer class name contains 
"Direct", we should warn
-// users that they may loss data if they are using a direct output 
committer.
-val speculationEnabled = 
sqlContext.sparkContext.conf.getBoolean("spark.speculation", false)
-val outputCommitterClass = 
jobConf.get("mapred.output.committer.class", "")
-if (speculationEnabled && outputCommitterClass.contains("Direct")) {
--- End diff --

seems this change is unnecessary and users may still use direct output 
committer (they can still find the code on Internet). Let's keep the warning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r9652
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 ---
@@ -276,40 +276,31 @@ case class InsertIntoHiveTable(
   }
 }
 
-val jobConf = new JobConf(hadoopConf)
-val jobConfSer = new SerializableJobConf(jobConf)
-
-// When speculation is on and output committer class name contains 
"Direct", we should warn
-// users that they may loss data if they are using a direct output 
committer.
-val speculationEnabled = 
sqlContext.sparkContext.conf.getBoolean("spark.speculation", false)
-val outputCommitterClass = 
jobConf.get("mapred.output.committer.class", "")
-if (speculationEnabled && outputCommitterClass.contains("Direct")) {
--- End diff --

the direct commit has been removed already.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96566616
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
+import org.apache.hadoop.hive.serde2.Serializer
+import 
org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{JobConf, Reporter}
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil}
+import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => 
FileSinkDesc}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableJobConf
+
+/**
+ * `FileFormat` for writing Hive tables.
+ *
+ * TODO: implement the read logic.
+ */
+class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
+  override def inferSchema(
+  sparkSession: SparkSession,
+  options: Map[String, String],
+  files: Seq[FileStatus]): Option[StructType] = {
+throw new UnsupportedOperationException(s"inferSchema is not supported 
for hive data source.")
+  }
+
+  override def prepareWrite(
+  sparkSession: SparkSession,
+  job: Job,
+  options: Map[String, String],
+  dataSchema: StructType): OutputWriterFactory = {
+val conf = job.getConfiguration
+val tableDesc = fileSinkConf.getTableInfo
+conf.set("mapred.output.format.class", 
tableDesc.getOutputFileFormatClassName)
+
+// Add table properties from storage handler to hadoopConf, so any 
custom storage
+// handler settings can be set to hadoopConf
+HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, conf, 
false)
+Utilities.copyTableJobPropertiesToConf(tableDesc, conf)
--- End diff --

the `tableDesc` is created at 
https://github.com/apache/spark/pull/16517/files#diff-d579db9a8f27e0bbef37720ab14ec3f6R223
 . So it will never be null, and the previous null check is unnecessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-17 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96566523
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 ---
@@ -276,40 +276,31 @@ case class InsertIntoHiveTable(
   }
 }
 
-val jobConf = new JobConf(hadoopConf)
-val jobConfSer = new SerializableJobConf(jobConf)
-
-// When speculation is on and output committer class name contains 
"Direct", we should warn
-// users that they may loss data if they are using a direct output 
committer.
-val speculationEnabled = 
sqlContext.sparkContext.conf.getBoolean("spark.speculation", false)
-val outputCommitterClass = 
jobConf.get("mapred.output.committer.class", "")
-if (speculationEnabled && outputCommitterClass.contains("Direct")) {
--- End diff --

Do we still need this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-17 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96566290
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
+import org.apache.hadoop.hive.serde2.Serializer
+import 
org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{JobConf, Reporter}
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil}
+import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => 
FileSinkDesc}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableJobConf
+
+/**
+ * `FileFormat` for writing Hive tables.
+ *
+ * TODO: implement the read logic.
+ */
+class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
+  override def inferSchema(
+  sparkSession: SparkSession,
+  options: Map[String, String],
+  files: Seq[FileStatus]): Option[StructType] = {
+throw new UnsupportedOperationException(s"inferSchema is not supported 
for hive data source.")
+  }
+
+  override def prepareWrite(
+  sparkSession: SparkSession,
+  job: Job,
+  options: Map[String, String],
+  dataSchema: StructType): OutputWriterFactory = {
+val conf = job.getConfiguration
+val tableDesc = fileSinkConf.getTableInfo
+conf.set("mapred.output.format.class", 
tableDesc.getOutputFileFormatClassName)
+
+// Add table properties from storage handler to hadoopConf, so any 
custom storage
+// handler settings can be set to hadoopConf
+HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, conf, 
false)
+Utilities.copyTableJobPropertiesToConf(tableDesc, conf)
--- End diff --

Will tableDesc be null?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-17 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96566171
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -86,6 +86,47 @@ class DetermineHiveSerde(conf: SQLConf) extends 
Rule[LogicalPlan] {
   }
 }
 
+class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan 
resolveOperators {
+case InsertIntoTable(table: MetastoreRelation, partSpec, query, 
overwrite, ifNotExists)
+if hasBeenPreprocessed(table.output, 
table.partitionKeys.toStructType, partSpec, query) =>
+  InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)
+
+case CreateTable(tableDesc, mode, Some(query)) if 
DDLUtils.isHiveTable(tableDesc) =>
+  // Currently `DataFrameWriter.saveAsTable` doesn't support the 
Append mode of hive serde
+  // tables yet.
+  if (mode == SaveMode.Append) {
+throw new AnalysisException(
+  "CTAS for hive serde tables does not support append semantics.")
+  }
+
+  val dbName = 
tableDesc.identifier.database.getOrElse(session.catalog.currentDatabase)
+  CreateHiveTableAsSelectCommand(
+tableDesc.copy(identifier = tableDesc.identifier.copy(database = 
Some(dbName))),
+query,
+mode == SaveMode.Ignore)
+  }
+
+  /**
+   * Returns true if the [[InsertIntoTable]] plan has already been 
preprocessed by analyzer rule
+   * [[PreprocessTableInsertion]]. It is important that this 
rule([[HiveAnalysis]]) has to
+   * be run after [[PreprocessTableInsertion]], to normalize the column 
names in partition spec and
--- End diff --

I think this version is good for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-17 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96549456
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
+import org.apache.hadoop.hive.serde2.Serializer
+import 
org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{JobConf, Reporter}
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil}
+import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => 
FileSinkDesc}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableJobConf
+
+/**
+ * `FileFormat` for writing Hive tables.
+ *
+ * TODO: implement the read logic.
+ */
+class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
+  override def inferSchema(
+  sparkSession: SparkSession,
+  options: Map[String, String],
+  files: Seq[FileStatus]): Option[StructType] = None
--- End diff --

ok. Let's throw an exception at here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96331554
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
+import org.apache.hadoop.hive.serde2.Serializer
+import 
org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{JobConf, Reporter}
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil}
+import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => 
FileSinkDesc}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableJobConf
+
+/**
+ * `FileFormat` for writing Hive tables.
+ *
+ * TODO: implement the read logic.
+ */
+class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
+  override def inferSchema(
+  sparkSession: SparkSession,
+  options: Map[String, String],
+  files: Seq[FileStatus]): Option[StructType] = None
+
+  override def prepareWrite(
+  sparkSession: SparkSession,
+  job: Job,
+  options: Map[String, String],
+  dataSchema: StructType): OutputWriterFactory = {
+val conf = job.getConfiguration
+val tableDesc = fileSinkConf.getTableInfo
+conf.set("mapred.output.format.class", 
tableDesc.getOutputFileFormatClassName)
+
+// Add table properties from storage handler to hadoopConf, so any 
custom storage
+// handler settings can be set to hadoopConf
+HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, conf, 
false)
+Utilities.copyTableJobPropertiesToConf(tableDesc, conf)
+
+// Avoid referencing the outer object.
+val fileSinkConfSer = fileSinkConf
+new OutputWriterFactory {
+  private val jobConf = new SerializableJobConf(new JobConf(conf))
+  @transient private lazy val outputFormat =
+
jobConf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]]
+
+  override def getFileExtension(context: TaskAttemptContext): String = 
{
+Utilities.getFileExtension(jobConf.value, 
fileSinkConfSer.getCompressed, outputFormat)
+  }
+
+  override def newInstance(
+  path: String,
+  dataSchema: StructType,
+  context: TaskAttemptContext): OutputWriter = {
+new HiveOutputWriter(path, fileSinkConfSer, jobConf.value, 
dataSchema)
+  }
+}
+  }
+}
+
+class HiveOutputWriter(
+path: String,
+fileSinkConf: FileSinkDesc,
+jobConf: JobConf,
+dataSchema: StructType) extends OutputWriter with HiveInspectors {
+
+  private def tableDesc = fileSinkConf.getTableInfo
+
+  private val serializer = {
+val serializer = 
tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
+serializer.initialize(null, tableDesc.getProperties)
+serializer
+  }
+
+  private val hiveWriter = HiveFileFormatUtils.getHiveRecordWriter(
+jobConf,
+tableDesc,
+serializer.getSerializedClass,
+fileSinkConf,
+new Path(path),
+Reporter.NULL)
+
+  private val 

[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96331505
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
+import org.apache.hadoop.hive.serde2.Serializer
+import 
org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{JobConf, Reporter}
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil}
+import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => 
FileSinkDesc}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableJobConf
+
+/**
+ * `FileFormat` for writing Hive tables.
+ *
+ * TODO: implement the read logic.
+ */
+class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
+  override def inferSchema(
+  sparkSession: SparkSession,
+  options: Map[String, String],
+  files: Seq[FileStatus]): Option[StructType] = None
+
+  override def prepareWrite(
+  sparkSession: SparkSession,
+  job: Job,
+  options: Map[String, String],
+  dataSchema: StructType): OutputWriterFactory = {
+val conf = job.getConfiguration
+val tableDesc = fileSinkConf.getTableInfo
+conf.set("mapred.output.format.class", 
tableDesc.getOutputFileFormatClassName)
+
+// Add table properties from storage handler to hadoopConf, so any 
custom storage
+// handler settings can be set to hadoopConf
+HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, conf, 
false)
+Utilities.copyTableJobPropertiesToConf(tableDesc, conf)
+
+// Avoid referencing the outer object.
+val fileSinkConfSer = fileSinkConf
+new OutputWriterFactory {
+  private val jobConf = new SerializableJobConf(new JobConf(conf))
+  @transient private lazy val outputFormat =
+
jobConf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]]
+
+  override def getFileExtension(context: TaskAttemptContext): String = 
{
+Utilities.getFileExtension(jobConf.value, 
fileSinkConfSer.getCompressed, outputFormat)
+  }
+
+  override def newInstance(
+  path: String,
+  dataSchema: StructType,
+  context: TaskAttemptContext): OutputWriter = {
+new HiveOutputWriter(path, fileSinkConfSer, jobConf.value, 
dataSchema)
+  }
+}
+  }
+}
+
+class HiveOutputWriter(
+path: String,
+fileSinkConf: FileSinkDesc,
+jobConf: JobConf,
+dataSchema: StructType) extends OutputWriter with HiveInspectors {
+
+  private def tableDesc = fileSinkConf.getTableInfo
+
+  private val serializer = {
+val serializer = 
tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
+serializer.initialize(null, tableDesc.getProperties)
+serializer
+  }
+
+  private val hiveWriter = HiveFileFormatUtils.getHiveRecordWriter(
--- End diff --

this is moved from 
https://github.com/apache/spark/pull/16517/files#diff-92b05808926122b334c2fdd2fd1e4221L121


---
If your project is set up for it, you 

[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96331458
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
+import org.apache.hadoop.hive.serde2.Serializer
+import 
org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{JobConf, Reporter}
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil}
+import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => 
FileSinkDesc}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableJobConf
+
+/**
+ * `FileFormat` for writing Hive tables.
+ *
+ * TODO: implement the read logic.
+ */
+class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
+  override def inferSchema(
+  sparkSession: SparkSession,
+  options: Map[String, String],
+  files: Seq[FileStatus]): Option[StructType] = None
+
+  override def prepareWrite(
+  sparkSession: SparkSession,
+  job: Job,
+  options: Map[String, String],
+  dataSchema: StructType): OutputWriterFactory = {
+val conf = job.getConfiguration
+val tableDesc = fileSinkConf.getTableInfo
+conf.set("mapred.output.format.class", 
tableDesc.getOutputFileFormatClassName)
+
+// Add table properties from storage handler to hadoopConf, so any 
custom storage
+// handler settings can be set to hadoopConf
+HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, conf, 
false)
+Utilities.copyTableJobPropertiesToConf(tableDesc, conf)
+
+// Avoid referencing the outer object.
+val fileSinkConfSer = fileSinkConf
+new OutputWriterFactory {
+  private val jobConf = new SerializableJobConf(new JobConf(conf))
+  @transient private lazy val outputFormat =
+
jobConf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]]
+
+  override def getFileExtension(context: TaskAttemptContext): String = 
{
+Utilities.getFileExtension(jobConf.value, 
fileSinkConfSer.getCompressed, outputFormat)
+  }
+
+  override def newInstance(
+  path: String,
+  dataSchema: StructType,
+  context: TaskAttemptContext): OutputWriter = {
+new HiveOutputWriter(path, fileSinkConfSer, jobConf.value, 
dataSchema)
+  }
+}
+  }
+}
+
+class HiveOutputWriter(
+path: String,
+fileSinkConf: FileSinkDesc,
+jobConf: JobConf,
+dataSchema: StructType) extends OutputWriter with HiveInspectors {
+
+  private def tableDesc = fileSinkConf.getTableInfo
+
+  private val serializer = {
--- End diff --

this is moved from 
https://github.com/apache/spark/pull/16517/files#diff-92b05808926122b334c2fdd2fd1e4221L160


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket

[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96331392
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
+import org.apache.hadoop.hive.serde2.Serializer
+import 
org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{JobConf, Reporter}
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil}
+import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => 
FileSinkDesc}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableJobConf
+
+/**
+ * `FileFormat` for writing Hive tables.
+ *
+ * TODO: implement the read logic.
+ */
+class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
+  override def inferSchema(
+  sparkSession: SparkSession,
+  options: Map[String, String],
+  files: Seq[FileStatus]): Option[StructType] = None
+
+  override def prepareWrite(
+  sparkSession: SparkSession,
+  job: Job,
+  options: Map[String, String],
+  dataSchema: StructType): OutputWriterFactory = {
+val conf = job.getConfiguration
+val tableDesc = fileSinkConf.getTableInfo
+conf.set("mapred.output.format.class", 
tableDesc.getOutputFileFormatClassName)
+
+// Add table properties from storage handler to hadoopConf, so any 
custom storage
+// handler settings can be set to hadoopConf
+HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, conf, 
false)
+Utilities.copyTableJobPropertiesToConf(tableDesc, conf)
+
+// Avoid referencing the outer object.
+val fileSinkConfSer = fileSinkConf
+new OutputWriterFactory {
+  private val jobConf = new SerializableJobConf(new JobConf(conf))
+  @transient private lazy val outputFormat =
+
jobConf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]]
+
+  override def getFileExtension(context: TaskAttemptContext): String = 
{
+Utilities.getFileExtension(jobConf.value, 
fileSinkConfSer.getCompressed, outputFormat)
--- End diff --

this is moved from 
https://github.com/apache/spark/pull/16517/files#diff-92b05808926122b334c2fdd2fd1e4221L102


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96331272
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
+import org.apache.hadoop.hive.serde2.Serializer
+import 
org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{JobConf, Reporter}
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil}
+import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => 
FileSinkDesc}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableJobConf
+
+/**
+ * `FileFormat` for writing Hive tables.
+ *
+ * TODO: implement the read logic.
+ */
+class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
+  override def inferSchema(
+  sparkSession: SparkSession,
+  options: Map[String, String],
+  files: Seq[FileStatus]): Option[StructType] = None
+
+  override def prepareWrite(
+  sparkSession: SparkSession,
+  job: Job,
+  options: Map[String, String],
+  dataSchema: StructType): OutputWriterFactory = {
+val conf = job.getConfiguration
+val tableDesc = fileSinkConf.getTableInfo
+conf.set("mapred.output.format.class", 
tableDesc.getOutputFileFormatClassName)
+
+// Add table properties from storage handler to hadoopConf, so any 
custom storage
+// handler settings can be set to hadoopConf
+HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, conf, 
false)
+Utilities.copyTableJobPropertiesToConf(tableDesc, conf)
+
+// Avoid referencing the outer object.
+val fileSinkConfSer = fileSinkConf
+new OutputWriterFactory {
+  private val jobConf = new SerializableJobConf(new JobConf(conf))
+  @transient private lazy val outputFormat =
+
jobConf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]]
+
+  override def getFileExtension(context: TaskAttemptContext): String = 
{
+Utilities.getFileExtension(jobConf.value, 
fileSinkConfSer.getCompressed, outputFormat)
+  }
+
+  override def newInstance(
+  path: String,
+  dataSchema: StructType,
+  context: TaskAttemptContext): OutputWriter = {
+new HiveOutputWriter(path, fileSinkConfSer, jobConf.value, 
dataSchema)
+  }
+}
--- End diff --

I followed other `FileFormat` implementations here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96331228
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
+import org.apache.hadoop.hive.serde2.Serializer
+import 
org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{JobConf, Reporter}
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil}
+import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => 
FileSinkDesc}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableJobConf
+
+/**
+ * `FileFormat` for writing Hive tables.
+ *
+ * TODO: implement the read logic.
+ */
+class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
+  override def inferSchema(
+  sparkSession: SparkSession,
+  options: Map[String, String],
+  files: Seq[FileStatus]): Option[StructType] = None
+
+  override def prepareWrite(
+  sparkSession: SparkSession,
+  job: Job,
+  options: Map[String, String],
+  dataSchema: StructType): OutputWriterFactory = {
+val conf = job.getConfiguration
+val tableDesc = fileSinkConf.getTableInfo
+conf.set("mapred.output.format.class", 
tableDesc.getOutputFileFormatClassName)
+
+// Add table properties from storage handler to hadoopConf, so any 
custom storage
--- End diff --

this is moved from 
https://github.com/apache/spark/pull/16517/files#diff-92b05808926122b334c2fdd2fd1e4221L64


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96331180
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
+import org.apache.hadoop.hive.serde2.Serializer
+import 
org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{JobConf, Reporter}
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil}
+import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => 
FileSinkDesc}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableJobConf
+
+/**
+ * `FileFormat` for writing Hive tables.
+ *
+ * TODO: implement the read logic.
+ */
+class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
+  override def inferSchema(
+  sparkSession: SparkSession,
+  options: Map[String, String],
+  files: Seq[FileStatus]): Option[StructType] = None
+
+  override def prepareWrite(
+  sparkSession: SparkSession,
+  job: Job,
+  options: Map[String, String],
+  dataSchema: StructType): OutputWriterFactory = {
--- End diff --

The preparation logic is dispersive, I collected all of them and put them 
here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96331108
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
+import org.apache.hadoop.hive.serde2.Serializer
+import 
org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{JobConf, Reporter}
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil}
+import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => 
FileSinkDesc}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableJobConf
+
+/**
+ * `FileFormat` for writing Hive tables.
+ *
+ * TODO: implement the read logic.
+ */
+class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
+  override def inferSchema(
+  sparkSession: SparkSession,
+  options: Map[String, String],
+  files: Seq[FileStatus]): Option[StructType] = None
+
+  override def prepareWrite(
+  sparkSession: SparkSession,
+  job: Job,
+  options: Map[String, String],
+  dataSchema: StructType): OutputWriterFactory = {
+val conf = job.getConfiguration
+val tableDesc = fileSinkConf.getTableInfo
+conf.set("mapred.output.format.class", 
tableDesc.getOutputFileFormatClassName)
--- End diff --

this is moved from 
https://github.com/apache/spark/pull/16517/files#diff-d579db9a8f27e0bbef37720ab14ec3f6L203


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96331015
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
+import org.apache.hadoop.hive.serde2.Serializer
+import 
org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{JobConf, Reporter}
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil}
+import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => 
FileSinkDesc}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableJobConf
+
+/**
+ * `FileFormat` for writing Hive tables.
+ *
+ * TODO: implement the read logic.
+ */
+class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
+  override def inferSchema(
+  sparkSession: SparkSession,
+  options: Map[String, String],
+  files: Seq[FileStatus]): Option[StructType] = None
--- End diff --

yea, because we are not going to use it in read path.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96330923
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -86,6 +86,47 @@ class DetermineHiveSerde(conf: SQLConf) extends 
Rule[LogicalPlan] {
   }
 }
 
+class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan 
resolveOperators {
+case InsertIntoTable(table: MetastoreRelation, partSpec, query, 
overwrite, ifNotExists)
+if hasBeenPreprocessed(table.output, 
table.partitionKeys.toStructType, partSpec, query) =>
+  InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)
+
+case CreateTable(tableDesc, mode, Some(query)) if 
DDLUtils.isHiveTable(tableDesc) =>
+  // Currently `DataFrameWriter.saveAsTable` doesn't support the 
Append mode of hive serde
--- End diff --

the code block below is mostly moved from 
https://github.com/apache/spark/pull/16517/files#diff-c4ed9859978dd6ac271b6a40ee945e4bL112


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-16 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96316695
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -86,6 +86,47 @@ class DetermineHiveSerde(conf: SQLConf) extends 
Rule[LogicalPlan] {
   }
 }
 
+class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan 
resolveOperators {
+case InsertIntoTable(table: MetastoreRelation, partSpec, query, 
overwrite, ifNotExists)
+if hasBeenPreprocessed(table.output, 
table.partitionKeys.toStructType, partSpec, query) =>
+  InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)
+
+case CreateTable(tableDesc, mode, Some(query)) if 
DDLUtils.isHiveTable(tableDesc) =>
+  // Currently `DataFrameWriter.saveAsTable` doesn't support the 
Append mode of hive serde
+  // tables yet.
+  if (mode == SaveMode.Append) {
+throw new AnalysisException(
+  "CTAS for hive serde tables does not support append semantics.")
+  }
+
+  val dbName = 
tableDesc.identifier.database.getOrElse(session.catalog.currentDatabase)
+  CreateHiveTableAsSelectCommand(
+tableDesc.copy(identifier = tableDesc.identifier.copy(database = 
Some(dbName))),
+query,
+mode == SaveMode.Ignore)
+  }
+
+  /**
+   * Returns true if the [[InsertIntoTable]] plan has already been 
preprocessed by analyzer rule
+   * [[PreprocessTableInsertion]]. It is important that this 
rule([[HiveAnalysis]]) has to
+   * be run after [[PreprocessTableInsertion]], to normalize the column 
names in partition spec and
--- End diff --

This rule is in the same batch with PreprocessTableInsertion, right? If so, 
we cannot guarantee that PreprocessTableInsertion will always fire first for a 
command before InsertIntoTable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-16 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96317272
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
+import org.apache.hadoop.hive.serde2.Serializer
+import 
org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{JobConf, Reporter}
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil}
+import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => 
FileSinkDesc}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableJobConf
+
+/**
+ * `FileFormat` for writing Hive tables.
+ *
+ * TODO: implement the read logic.
+ */
+class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
+  override def inferSchema(
+  sparkSession: SparkSession,
+  options: Map[String, String],
+  files: Seq[FileStatus]): Option[StructType] = None
+
+  override def prepareWrite(
+  sparkSession: SparkSession,
+  job: Job,
+  options: Map[String, String],
+  dataSchema: StructType): OutputWriterFactory = {
+val conf = job.getConfiguration
+val tableDesc = fileSinkConf.getTableInfo
+conf.set("mapred.output.format.class", 
tableDesc.getOutputFileFormatClassName)
+
+// Add table properties from storage handler to hadoopConf, so any 
custom storage
+// handler settings can be set to hadoopConf
+HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, conf, 
false)
+Utilities.copyTableJobPropertiesToConf(tableDesc, conf)
+
+// Avoid referencing the outer object.
+val fileSinkConfSer = fileSinkConf
+new OutputWriterFactory {
+  private val jobConf = new SerializableJobConf(new JobConf(conf))
+  @transient private lazy val outputFormat =
+
jobConf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]]
+
+  override def getFileExtension(context: TaskAttemptContext): String = 
{
+Utilities.getFileExtension(jobConf.value, 
fileSinkConfSer.getCompressed, outputFormat)
+  }
+
+  override def newInstance(
+  path: String,
+  dataSchema: StructType,
+  context: TaskAttemptContext): OutputWriter = {
+new HiveOutputWriter(path, fileSinkConfSer, jobConf.value, 
dataSchema)
+  }
+}
--- End diff --

Should we just create a class instead of using an anonymous class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-16 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96317211
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
+import org.apache.hadoop.hive.serde2.Serializer
+import 
org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{JobConf, Reporter}
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil}
+import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => 
FileSinkDesc}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableJobConf
+
+/**
+ * `FileFormat` for writing Hive tables.
+ *
+ * TODO: implement the read logic.
+ */
+class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
+  override def inferSchema(
+  sparkSession: SparkSession,
+  options: Map[String, String],
+  files: Seq[FileStatus]): Option[StructType] = None
+
+  override def prepareWrite(
+  sparkSession: SparkSession,
+  job: Job,
+  options: Map[String, String],
+  dataSchema: StructType): OutputWriterFactory = {
--- End diff --

Want to comment the original source of code in this function?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-16 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96316863
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -86,6 +86,47 @@ class DetermineHiveSerde(conf: SQLConf) extends 
Rule[LogicalPlan] {
   }
 }
 
+class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan 
resolveOperators {
+case InsertIntoTable(table: MetastoreRelation, partSpec, query, 
overwrite, ifNotExists)
+if hasBeenPreprocessed(table.output, 
table.partitionKeys.toStructType, partSpec, query) =>
+  InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)
+
+case CreateTable(tableDesc, mode, Some(query)) if 
DDLUtils.isHiveTable(tableDesc) =>
+  // Currently `DataFrameWriter.saveAsTable` doesn't support the 
Append mode of hive serde
+  // tables yet.
+  if (mode == SaveMode.Append) {
+throw new AnalysisException(
+  "CTAS for hive serde tables does not support append semantics.")
+  }
+
+  val dbName = 
tableDesc.identifier.database.getOrElse(session.catalog.currentDatabase)
+  CreateHiveTableAsSelectCommand(
+tableDesc.copy(identifier = tableDesc.identifier.copy(database = 
Some(dbName))),
+query,
+mode == SaveMode.Ignore)
+  }
+
+  /**
+   * Returns true if the [[InsertIntoTable]] plan has already been 
preprocessed by analyzer rule
+   * [[PreprocessTableInsertion]]. It is important that this 
rule([[HiveAnalysis]]) has to
+   * be run after [[PreprocessTableInsertion]], to normalize the column 
names in partition spec and
--- End diff --

Or, you mean that we use this function to determine if 
PreprocessTableInsertion has fired?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-16 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96316302
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala ---
@@ -108,35 +108,30 @@ class QueryExecution(val sparkSession: SparkSession, 
val logical: LogicalPlan) {
 
 
   /**
-   * Returns the result as a hive compatible sequence of strings.  For 
native commands, the
-   * execution is simply passed back to Hive.
+   * Returns the result as a hive compatible sequence of strings. This is 
for testing only.
*/
   def hiveResultString(): Seq[String] = executedPlan match {
 case ExecutedCommandExec(desc: DescribeTableCommand) =>
-  SQLExecution.withNewExecutionId(sparkSession, this) {
--- End diff --

Explain the reason that `SQLExecution.withNewExecutionId(sparkSession, 
this)` is not needed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-16 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96316982
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -86,6 +86,47 @@ class DetermineHiveSerde(conf: SQLConf) extends 
Rule[LogicalPlan] {
   }
 }
 
+class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan 
resolveOperators {
+case InsertIntoTable(table: MetastoreRelation, partSpec, query, 
overwrite, ifNotExists)
+if hasBeenPreprocessed(table.output, 
table.partitionKeys.toStructType, partSpec, query) =>
+  InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)
+
+case CreateTable(tableDesc, mode, Some(query)) if 
DDLUtils.isHiveTable(tableDesc) =>
+  // Currently `DataFrameWriter.saveAsTable` doesn't support the 
Append mode of hive serde
+  // tables yet.
+  if (mode == SaveMode.Append) {
+throw new AnalysisException(
+  "CTAS for hive serde tables does not support append semantics.")
+  }
+
+  val dbName = 
tableDesc.identifier.database.getOrElse(session.catalog.currentDatabase)
+  CreateHiveTableAsSelectCommand(
+tableDesc.copy(identifier = tableDesc.identifier.copy(database = 
Some(dbName))),
+query,
+mode == SaveMode.Ignore)
+  }
+
+  /**
+   * Returns true if the [[InsertIntoTable]] plan has already been 
preprocessed by analyzer rule
+   * [[PreprocessTableInsertion]]. It is important that this 
rule([[HiveAnalysis]]) has to
+   * be run after [[PreprocessTableInsertion]], to normalize the column 
names in partition spec and
--- End diff --

Should this function actually be part of the resolved method of 
InsertIntoTable?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-16 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96317150
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
+import org.apache.hadoop.hive.serde2.Serializer
+import 
org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
+import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{JobConf, Reporter}
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, 
OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil}
+import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => 
FileSinkDesc}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableJobConf
+
+/**
+ * `FileFormat` for writing Hive tables.
+ *
+ * TODO: implement the read logic.
+ */
+class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat {
+  override def inferSchema(
+  sparkSession: SparkSession,
+  options: Map[String, String],
+  files: Seq[FileStatus]): Option[StructType] = None
--- End diff --

Is it safe to return None?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96158823
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -99,7 +99,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: 
String)
   }
 
   private def getFilename(taskContext: TaskAttemptContext, ext: String): 
String = {
-// The file name looks like 
part-r-0-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_3.gz.parquet
+// The file name looks like 
part-0-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_3.gz.parquet
--- End diff --

ok I should update this string, `c000` is files-count, which is added 
recently.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-15 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96158740
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 ---
@@ -69,34 +69,31 @@ import org.apache.spark.util.SerializableJobConf
  *  {{{
  *  Map('a' -> Some('1'), 'b' -> None)
  *  }}}.
- * @param child the logical plan representing data to write to.
+ * @param query the logical plan representing data to write to.
  * @param overwrite overwrite existing table or partitions.
  * @param ifNotExists If true, only write if the table or partition does 
not exist.
  */
 case class InsertIntoHiveTable(
 table: MetastoreRelation,
 partition: Map[String, Option[String]],
-child: SparkPlan,
+query: LogicalPlan,
 overwrite: Boolean,
-ifNotExists: Boolean) extends UnaryExecNode {
+ifNotExists: Boolean) extends RunnableCommand {
 
-  @transient private val sessionState = 
sqlContext.sessionState.asInstanceOf[HiveSessionState]
-  @transient private val externalCatalog = 
sqlContext.sharedState.externalCatalog
+  override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
--- End diff --

We can't. We only replace `InsertIntoTable` with `InsertIntoHiveTable` at 
planner.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-14 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96131580
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -99,7 +99,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: 
String)
   }
 
   private def getFilename(taskContext: TaskAttemptContext, ext: String): 
String = {
-// The file name looks like 
part-r-0-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_3.gz.parquet
+// The file name looks like 
part-0-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_3.gz.parquet
--- End diff --

After more reading, the exist hive table writers do not have such an issue. 
It is based on a unique ID 
[`TaskAttemptID`](https://hadoop.apache.org/docs/r2.6.3/api/org/apache/hadoop/mapreduce/TaskAttemptID.html),
 which is generated by the function call of `FileOutputFormat.getTaskOutputPath`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-14 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96130318
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -86,6 +86,42 @@ class DetermineHiveSerde(conf: SQLConf) extends 
Rule[LogicalPlan] {
   }
 }
 
+class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan 
resolveOperators {
+case InsertIntoTable(table: MetastoreRelation, partSpec, query, 
overwrite, ifNotExists)
+if hasBeenPreprocessed(table.output, 
table.partitionKeys.toStructType, partSpec, query) =>
+  InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)
+
+case CreateTable(tableDesc, mode, Some(query)) if 
DDLUtils.isHiveTable(tableDesc) =>
+  // Currently we will never hit this branch, as SQL string API can 
only use `Ignore` or
+  // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't 
support hive serde
+  // tables yet.
+  if (mode == SaveMode.Append || mode == SaveMode.Overwrite) {
+throw new AnalysisException(
+  "CTAS for hive serde tables does not support append or overwrite 
semantics.")
+  }
--- End diff --

The above codes need to merge from the latest master build.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-14 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96130310
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala ---
@@ -86,6 +86,42 @@ class DetermineHiveSerde(conf: SQLConf) extends 
Rule[LogicalPlan] {
   }
 }
 
+class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan 
resolveOperators {
+case InsertIntoTable(table: MetastoreRelation, partSpec, query, 
overwrite, ifNotExists)
+if hasBeenPreprocessed(table.output, 
table.partitionKeys.toStructType, partSpec, query) =>
+  InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)
+
+case CreateTable(tableDesc, mode, Some(query)) if 
DDLUtils.isHiveTable(tableDesc) =>
+  // Currently we will never hit this branch, as SQL string API can 
only use `Ignore` or
+  // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't 
support hive serde
+  // tables yet.
+  if (mode == SaveMode.Append || mode == SaveMode.Overwrite) {
+throw new AnalysisException(
+  "CTAS for hive serde tables does not support append or overwrite 
semantics.")
+  }
+
+  val dbName = 
tableDesc.identifier.database.getOrElse(session.catalog.currentDatabase)
+  CreateHiveTableAsSelectCommand(
+tableDesc.copy(identifier = tableDesc.identifier.copy(database = 
Some(dbName))),
+query,
+mode == SaveMode.Ignore)
+  }
+
+  private def hasBeenPreprocessed(
--- End diff --

Also add a code comment for this func?
```
  /**
   * Returns true if the [[InsertIntoTable]] plan has already been 
preprocessed by analyzer rule
   * [[PreprocessTableInsertion]]. It is important that this 
rule([[HiveAnalysis]]) has to
   * be run after [[PreprocessTableInsertion]], to normalize the column 
names in partition spec and
   * fix the schema mismatch by adding Cast.
   */
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-14 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96130080
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -99,7 +99,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: 
String)
   }
 
   private def getFilename(taskContext: TaskAttemptContext, ext: String): 
String = {
-// The file name looks like 
part-r-0-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_3.gz.parquet
+// The file name looks like 
part-0-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_3.gz.parquet
--- End diff --

The ext string is always starting from `c`. Below is the example I got from 
a test case. 

`part-0-fd8f3fdd-653a-4ea0-ab6d-5c8ad610b184-c000.snappy.parquet`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-14 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r96128666
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 ---
@@ -69,34 +69,31 @@ import org.apache.spark.util.SerializableJobConf
  *  {{{
  *  Map('a' -> Some('1'), 'b' -> None)
  *  }}}.
- * @param child the logical plan representing data to write to.
+ * @param query the logical plan representing data to write to.
  * @param overwrite overwrite existing table or partitions.
  * @param ifNotExists If true, only write if the table or partition does 
not exist.
  */
 case class InsertIntoHiveTable(
 table: MetastoreRelation,
 partition: Map[String, Option[String]],
-child: SparkPlan,
+query: LogicalPlan,
 overwrite: Boolean,
-ifNotExists: Boolean) extends UnaryExecNode {
+ifNotExists: Boolean) extends RunnableCommand {
 
-  @transient private val sessionState = 
sqlContext.sessionState.asInstanceOf[HiveSessionState]
-  @transient private val externalCatalog = 
sqlContext.sharedState.externalCatalog
+  override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
--- End diff --

+1

Let me see whether we can add such a test case to hit the bug without it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16517: [SPARK-18243][SQL] Port Hive writing to use FileF...

2017-01-09 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16517#discussion_r95298791
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala ---
@@ -1,356 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.text.NumberFormat
-import java.util.{Date, Locale}
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.common.FileUtils
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
-import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
-import org.apache.hadoop.hive.ql.plan.TableDesc
-import org.apache.hadoop.hive.serde2.Serializer
-import 
org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, 
StructObjectInspector}
-import 
org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapred._
-import org.apache.hadoop.mapreduce.TaskType
-
-import org.apache.spark._
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.io.SparkHadoopWriterUtils
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.UnsafeKVExternalSorter
-import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => 
FileSinkDesc}
-import org.apache.spark.sql.types._
-import org.apache.spark.util.SerializableJobConf
-import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
-
-/**
- * Internal helper class that saves an RDD using a Hive OutputFormat.
- * It is based on `SparkHadoopWriter`.
- */
-private[hive] class SparkHiveWriterContainer(
-@transient private val jobConf: JobConf,
-fileSinkConf: FileSinkDesc,
-inputSchema: Seq[Attribute])
-  extends Logging
-  with HiveInspectors
-  with Serializable {
-
-  private val now = new Date()
-  private val tableDesc: TableDesc = fileSinkConf.getTableInfo
-  // Add table properties from storage handler to jobConf, so any custom 
storage
-  // handler settings can be set to jobConf
-  if (tableDesc != null) {
-HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, 
jobConf, false)
-Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf)
-  }
-  protected val conf = new SerializableJobConf(jobConf)
-
-  private var jobID = 0
-  private var splitID = 0
-  private var attemptID = 0
-  private var jID: SerializableWritable[JobID] = null
-  private var taID: SerializableWritable[TaskAttemptID] = null
-
-  @transient private var writer: FileSinkOperator.RecordWriter = null
-  @transient protected lazy val committer = conf.value.getOutputCommitter
-  @transient protected lazy val jobContext = new 
JobContextImpl(conf.value, jID.value)
-  @transient private lazy val taskContext = new 
TaskAttemptContextImpl(conf.value, taID.value)
-  @transient private lazy val outputFormat =
-conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, 
Writable]]
-
-  def driverSideSetup() {
-setIDs(0, 0, 0)
-setConfParams()
-committer.setupJob(jobContext)
-  }
-
-  def executorSideSetup(jobId: Int, splitId: Int, attemptId: Int) {
-setIDs(jobId, splitId, attemptId)
-setConfParams()
-committer.setupTask(taskContext)
-initWriters()
-  }
-
-  protected def getOutputName: String = {
-val numberFormat = NumberFormat.getInstance(Locale.US)
-numberFormat.setMinimumIntegerDigits(5)
-numberFormat.setGroupingUsed(false)
-val extension = Utilities.getFileExtension(conf.value, 
fileSinkConf.getCompressed, outputFormat)