[GitHub] spark issue #20167: Allow providing Mesos principal & secret via files (SPAR...

2018-01-05 Thread felixcheung
Github user felixcheung commented on the issue:

https://github.com/apache/spark/pull/20167
  
is putting secrets as plain text files a good practice..?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20154: [SPARK-22960][k8s] Make build-push-docker-images.sh more...

2018-01-05 Thread felixcheung
Github user felixcheung commented on the issue:

https://github.com/apache/spark/pull/20154
  
that's good, I think we should still address the finer point of 
https://github.com/apache/spark/pull/20154#pullrequestreview-86833216
- if docker hub can't build spark-base then pretty much we are crossing out 
the possibility of releasing the docker images with 2.3.0 release as ASF.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19690: [SPARK-22467]Added a switch to support whether `stdout_s...

2018-01-05 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/19690
  
Under most conditions the users shouldn't ignore the printed error/warning 
messages, have you observed much redundant buzz texts?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20076: [SPARK-21786][SQL] When acquiring 'compressionCodecClass...

2018-01-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20076
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85740/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20076: [SPARK-21786][SQL] When acquiring 'compressionCodecClass...

2018-01-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20076
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20076: [SPARK-21786][SQL] When acquiring 'compressionCodecClass...

2018-01-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20076
  
**[Test build #85740 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85740/testReport)**
 for PR 20076 at commit 
[`9466797`](https://github.com/apache/spark/commit/946679745f16838932e74fabb70f2ad702fa4640).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20076: [SPARK-21786][SQL] When acquiring 'compressionCodecClass...

2018-01-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20076
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85739/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20076: [SPARK-21786][SQL] When acquiring 'compressionCodecClass...

2018-01-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20076
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20076: [SPARK-21786][SQL] When acquiring 'compressionCodecClass...

2018-01-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20076
  
**[Test build #85739 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85739/testReport)**
 for PR 20076 at commit 
[`26c1c61`](https://github.com/apache/spark/commit/26c1c61ffd5742b71aefdec33ddcb69ba944).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2018-01-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19943
  
**[Test build #85744 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85744/testReport)**
 for PR 19943 at commit 
[`aeb6abd`](https://github.com/apache/spark/commit/aeb6abd66ee3338635edf9dca85894c14a05fb72).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2018-01-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19943
  
**[Test build #85743 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85743/testReport)**
 for PR 19943 at commit 
[`fca6a5f`](https://github.com/apache/spark/commit/fca6a5fe83c46d9cb1c2bdf163920a82fcd0b7a2).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...

2018-01-05 Thread sameeragarwal
Github user sameeragarwal commented on a diff in the pull request:

https://github.com/apache/spark/pull/19080#discussion_r160017940
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -73,46 +127,31 @@ case class OrderedDistribution(ordering: 
Seq[SortOrder]) extends Distribution {
   "An AllTuples should be used to represent a distribution that only 
has " +
   "a single partition.")
 
-  // TODO: This is not really valid...
-  def clustering: Set[Expression] = ordering.map(_.child).toSet
+  override def requiredNumPartitions: Option[Int] = None
+
+  override def createPartitioning(numPartitions: Int): Partitioning = {
+RangePartitioning(ordering, numPartitions)
+  }
 }
 
 /**
  * Represents data where tuples are broadcasted to every node. It is quite 
common that the
  * entire set of tuples is transformed into different data structure.
  */
-case class BroadcastDistribution(mode: BroadcastMode) extends Distribution
+case class BroadcastDistribution(mode: BroadcastMode) extends Distribution 
{
--- End diff --

Similarly, how about `BroadcastPartitioning` just satisfying the 
`AllTuples` distribution?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...

2018-01-05 Thread sameeragarwal
Github user sameeragarwal commented on a diff in the pull request:

https://github.com/apache/spark/pull/19080#discussion_r160017879
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -51,12 +76,41 @@ case object AllTuples extends Distribution
  */
 case class ClusteredDistribution(
 clustering: Seq[Expression],
-numPartitions: Option[Int] = None) extends Distribution {
+requiredNumPartitions: Option[Int] = None) extends Distribution {
   require(
 clustering != Nil,
 "The clustering expressions of a ClusteredDistribution should not be 
Nil. " +
   "An AllTuples should be used to represent a distribution that only 
has " +
   "a single partition.")
+
+  override def createPartitioning(numPartitions: Int): Partitioning = {
+assert(requiredNumPartitions.isEmpty || requiredNumPartitions.get == 
numPartitions,
+  s"This ClusteredDistribution requires ${requiredNumPartitions.get} 
partitions, but " +
+s"the actual number of partitions is $numPartitions.")
+HashPartitioning(clustering, numPartitions)
+  }
+}
+
+/**
+ * Represents data where tuples have been partitioned according to the 
hash of the given
+ * `expressions`. The hash function is defined as 
`HashPartitioning.partitionIdExpression`, so only
+ * [[HashPartitioning]] can satisfy this distribution.
+ *
+ * This is a strictly stronger guarantee than [[ClusteredDistribution]]. 
Given a tuple and the
+ * number of partitions, this distribution strictly requires which 
partition the tuple should be in.
+ */
+case class HashPartitionedDistribution(expressions: Seq[Expression]) 
extends Distribution {
--- End diff --

Semantically, a `Partitioning` satisfies a `Distribution` so it'd be better 
not to call this `HashPartitioned`. How about we call this 
`DeterminsticClusteredDistribution` or `HashClusteredDistribution`? Also 
perhaps this can just extend `ClusteredDistribution`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...

2018-01-05 Thread sameeragarwal
Github user sameeragarwal commented on a diff in the pull request:

https://github.com/apache/spark/pull/19080#discussion_r160018028
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
 ---
@@ -73,46 +127,31 @@ case class OrderedDistribution(ordering: 
Seq[SortOrder]) extends Distribution {
   "An AllTuples should be used to represent a distribution that only 
has " +
   "a single partition.")
 
-  // TODO: This is not really valid...
-  def clustering: Set[Expression] = ordering.map(_.child).toSet
+  override def requiredNumPartitions: Option[Int] = None
--- End diff --

Out of curiosity, should an `OrderedDistribution` make any guarantees 
around clustering? Do we care if "tuples that share the same value for the 
ordering expressions will never be split across partitions"?
  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20013: [SPARK-20657][core] Speed up rendering of the sta...

2018-01-05 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/20013#discussion_r160017934
  
--- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala ---
@@ -119,118 +121,115 @@ private class LiveTask(
 
   import LiveEntityHelpers._
 
-  private var recordedMetrics: v1.TaskMetrics = null
+  private var metrics: MetricsTracker = new MetricsTracker()
 
   var errorMessage: Option[String] = None
 
   /**
* Update the metrics for the task and return the difference between the 
previous and new
* values.
*/
-  def updateMetrics(metrics: TaskMetrics): v1.TaskMetrics = {
+  def updateMetrics(metrics: TaskMetrics): MetricsTracker = {
 if (metrics != null) {
-  val old = recordedMetrics
-  recordedMetrics = new v1.TaskMetrics(
-metrics.executorDeserializeTime,
-metrics.executorDeserializeCpuTime,
-metrics.executorRunTime,
-metrics.executorCpuTime,
-metrics.resultSize,
-metrics.jvmGCTime,
-metrics.resultSerializationTime,
-metrics.memoryBytesSpilled,
-metrics.diskBytesSpilled,
-metrics.peakExecutionMemory,
-new v1.InputMetrics(
-  metrics.inputMetrics.bytesRead,
-  metrics.inputMetrics.recordsRead),
-new v1.OutputMetrics(
-  metrics.outputMetrics.bytesWritten,
-  metrics.outputMetrics.recordsWritten),
-new v1.ShuffleReadMetrics(
-  metrics.shuffleReadMetrics.remoteBlocksFetched,
-  metrics.shuffleReadMetrics.localBlocksFetched,
-  metrics.shuffleReadMetrics.fetchWaitTime,
-  metrics.shuffleReadMetrics.remoteBytesRead,
-  metrics.shuffleReadMetrics.remoteBytesReadToDisk,
-  metrics.shuffleReadMetrics.localBytesRead,
-  metrics.shuffleReadMetrics.recordsRead),
-new v1.ShuffleWriteMetrics(
-  metrics.shuffleWriteMetrics.bytesWritten,
-  metrics.shuffleWriteMetrics.writeTime,
-  metrics.shuffleWriteMetrics.recordsWritten))
-  if (old != null) calculateMetricsDelta(recordedMetrics, old) else 
recordedMetrics
+  val old = this.metrics
+  val newMetrics = new MetricsTracker()
--- End diff --

Changing so many fields here seems ugly..But I respect you preference


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20013: [SPARK-20657][core] Speed up rendering of the stages pag...

2018-01-05 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/20013
  
lgtm


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20013: [SPARK-20657][core] Speed up rendering of the sta...

2018-01-05 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20013#discussion_r160017859
  
--- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala ---
@@ -119,118 +121,115 @@ private class LiveTask(
 
   import LiveEntityHelpers._
 
-  private var recordedMetrics: v1.TaskMetrics = null
+  private var metrics: MetricsTracker = new MetricsTracker()
 
   var errorMessage: Option[String] = None
 
   /**
* Update the metrics for the task and return the difference between the 
previous and new
* values.
*/
-  def updateMetrics(metrics: TaskMetrics): v1.TaskMetrics = {
+  def updateMetrics(metrics: TaskMetrics): MetricsTracker = {
 if (metrics != null) {
-  val old = recordedMetrics
-  recordedMetrics = new v1.TaskMetrics(
-metrics.executorDeserializeTime,
-metrics.executorDeserializeCpuTime,
-metrics.executorRunTime,
-metrics.executorCpuTime,
-metrics.resultSize,
-metrics.jvmGCTime,
-metrics.resultSerializationTime,
-metrics.memoryBytesSpilled,
-metrics.diskBytesSpilled,
-metrics.peakExecutionMemory,
-new v1.InputMetrics(
-  metrics.inputMetrics.bytesRead,
-  metrics.inputMetrics.recordsRead),
-new v1.OutputMetrics(
-  metrics.outputMetrics.bytesWritten,
-  metrics.outputMetrics.recordsWritten),
-new v1.ShuffleReadMetrics(
-  metrics.shuffleReadMetrics.remoteBlocksFetched,
-  metrics.shuffleReadMetrics.localBlocksFetched,
-  metrics.shuffleReadMetrics.fetchWaitTime,
-  metrics.shuffleReadMetrics.remoteBytesRead,
-  metrics.shuffleReadMetrics.remoteBytesReadToDisk,
-  metrics.shuffleReadMetrics.localBytesRead,
-  metrics.shuffleReadMetrics.recordsRead),
-new v1.ShuffleWriteMetrics(
-  metrics.shuffleWriteMetrics.bytesWritten,
-  metrics.shuffleWriteMetrics.writeTime,
-  metrics.shuffleWriteMetrics.recordsWritten))
-  if (old != null) calculateMetricsDelta(recordedMetrics, old) else 
recordedMetrics
+  val old = this.metrics
+  val newMetrics = new MetricsTracker()
+  newMetrics.executorDeserializeTime = metrics.executorDeserializeTime
+  newMetrics.executorDeserializeCpuTime = 
metrics.executorDeserializeCpuTime
+  newMetrics.executorRunTime = metrics.executorRunTime
+  newMetrics.executorCpuTime = metrics.executorCpuTime
+  newMetrics.resultSize = metrics.resultSize
+  newMetrics.jvmGcTime = metrics.jvmGCTime
+  newMetrics.resultSerializationTime = metrics.resultSerializationTime
+  newMetrics.memoryBytesSpilled = metrics.memoryBytesSpilled
+  newMetrics.diskBytesSpilled = metrics.diskBytesSpilled
+  newMetrics.peakExecutionMemory = metrics.peakExecutionMemory
+  newMetrics.inputBytesRead = metrics.inputMetrics.bytesRead
+  newMetrics.inputRecordsRead = metrics.inputMetrics.recordsRead
+  newMetrics.outputBytesWritten = metrics.outputMetrics.bytesWritten
+  newMetrics.outputRecordsWritten = 
metrics.outputMetrics.recordsWritten
+  newMetrics.shuffleRemoteBlocksFetched = 
metrics.shuffleReadMetrics.remoteBlocksFetched
+  newMetrics.shuffleLocalBlocksFetched = 
metrics.shuffleReadMetrics.localBlocksFetched
+  newMetrics.shuffleFetchWaitTime = 
metrics.shuffleReadMetrics.fetchWaitTime
+  newMetrics.shuffleRemoteBytesRead = 
metrics.shuffleReadMetrics.remoteBytesRead
+  newMetrics.shuffleRemoteBytesReadToDisk = 
metrics.shuffleReadMetrics.remoteBytesReadToDisk
+  newMetrics.shuffleLocalBytesRead = 
metrics.shuffleReadMetrics.localBytesRead
+  newMetrics.shuffleRecordsRead = 
metrics.shuffleReadMetrics.recordsRead
+  newMetrics.shuffleBytesWritten = 
metrics.shuffleWriteMetrics.bytesWritten
+  newMetrics.shuffleWriteTime = metrics.shuffleWriteMetrics.writeTime
+  newMetrics.shuffleRecordsWritten = 
metrics.shuffleWriteMetrics.recordsWritten
+
+  this.metrics = newMetrics
+  if (old.executorDeserializeTime >= 0L) {
+old.subtract(newMetrics)
+old
+  } else {
+newMetrics
+  }
 } else {
   null
 }
   }
 
-  /**
-   * Return a new TaskMetrics object containing the delta of the various 
fields of the given
-   * metrics objects. This is currently targeted at updating stage data, 
so it does not
-   * necessarily calculate deltas for all the fields.
-   */
-  private def calculateMetricsDelta(
-  metrics: v1.TaskMetrics,
-  old: v1.TaskMetrics): v1.TaskMetrics = {
-val 

[GitHub] spark issue #20013: [SPARK-20657][core] Speed up rendering of the stages pag...

2018-01-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20013
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20013: [SPARK-20657][core] Speed up rendering of the stages pag...

2018-01-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20013
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85737/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20013: [SPARK-20657][core] Speed up rendering of the stages pag...

2018-01-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20013
  
**[Test build #85737 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85737/testReport)**
 for PR 20013 at commit 
[`c4e7f61`](https://github.com/apache/spark/commit/c4e7f6149fbff39c9f3955f536095ed4fd5df2ff).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20163: [SPARK-22966][PySpark] Spark SQL should handle Py...

2018-01-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20163#discussion_r160017637
  
--- Diff: python/pyspark/sql/udf.py ---
@@ -26,6 +26,28 @@
 
 
 def _wrap_function(sc, func, returnType):
+def coerce_to_str(v):
+import datetime
+if type(v) == datetime.date or type(v) == datetime.datetime:
+return str(v)
+else:
+return v
+
+# Pyrolite will unpickle both Python datetime.date and 
datetime.datetime objects
+# into java.util.Calendar objects, so the type information on the 
Python side is lost.
+# This is problematic when Spark SQL needs to cast such objects into 
Spark SQL string type,
+# because the format of the string should be different, depending on 
the type of the input
+# object. So for those two specific types we eagerly convert them to 
string here, where the
+# Python type information is still intact.
+if returnType == StringType():
--- End diff --

I have a question, why we need to handle this type conversion? If we expect 
correct string format, isn't it more reasonable to convert the date/datetime to 
strings in the udf, instead of adding this conversion implicitly?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20163: [SPARK-22966][PySpark] Spark SQL should handle Py...

2018-01-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20163#discussion_r160017370
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 ---
@@ -193,6 +193,24 @@ object DateTimeUtils {
 millisToDays(date.getTime)
   }
 
+  /**
+   * Returns the number of days since epoch from java.util.Calendar
+   */
+  def fromJavaCalendarForDate(cal: Calendar): SQLDate = {
+val ms = cal.getTimeInMillis
+cal.getTimeZone match {
+  case null => millisToDays(ms)
+  case tz => millisToDays(ms, tz)
+}
+  }
+
+  /**
+   * Returns SQLTimestamp from java.util.Calendar (microseconds since 
epoch)
--- End diff --

(Matching the comment of fromJavaCalendarForDate.)
nit: Returns the number of microseconds since epoch from java.util.Calendar.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2018-01-05 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r160017579
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ * After creating, `initialize` and `setRequiredSchema` should be called 
sequentially.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] {
+  import OrcColumnarBatchReader._
+
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var recordReader: org.apache.orc.RecordReader = _
+
+  /**
+   * Required Schema.
+   */
+  private var requiredSchema: StructType = _
+
+  /**
+   * ColumnarBatch for vectorized execution by whole-stage codegen.
+   */
+  private var columnarBatch: ColumnarBatch = _
+
+  /**
+   * Writable column vectors of ColumnarBatch.
+   */
+  private var columnVectors: Seq[WritableColumnVector] = _
+
+  /**
+   * The number of rows read and considered to be returned.
+   */
+  private var rowsReturned: Long = 0L
+
+  /**
+   * Total number of rows.
+   */
+  private var totalRowCount: Long = 0L
+
+  override def getCurrentKey: Void = null
+
+  override def getCurrentValue: ColumnarBatch = columnarBatch
+
+  override def getProgress: Float = rowsReturned.toFloat / totalRowCount
+
+  override def nextKeyValue(): Boolean = nextBatch()
+
+  override def close(): Unit = {
+if (columnarBatch != null) {
+  columnarBatch.close()
+  columnarBatch = null
+}
+if (recordReader != null) {
+  recordReader.close()
+  recordReader = null
+}
+  }
+
+  /**
+   * Initialize ORC file reader and batch record reader.
+   * Please note that `setRequiredSchema` is needed to be called after 
this.
+   */
+  override def initialize(inputSplit: InputSplit, taskAttemptContext: 
TaskAttemptContext): Unit = {
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+val conf = taskAttemptContext.getConfiguration
+reader = OrcFile.createReader(
+  fileSplit.getPath,
+  OrcFile.readerOptions(conf)
+.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+.filesystem(fileSplit.getPath.getFileSystem(conf)))
+
+val options = OrcInputFormat.buildOptions(conf, reader, 
fileSplit.getStart, fileSplit.getLength)
+recordReader = reader.rows(options)
+totalRowCount = reader.getNumberOfRows
+  }
+
+  /**
+   * Set required schema and partition information.
+   * With this information, this creates ColumnarBatch with the full 
schema.
+   */
+  def setRequiredSchema(
+  orcSchema: TypeDescription,
+  requestedColIds: Array[Int],
+  resultSchema: StructType,
+  

[GitHub] spark issue #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2018-01-05 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/19943
  
I answered at the comment~


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2018-01-05 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r160017549
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] with Logging {
+  import OrcColumnarBatchReader._
+
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var rows: org.apache.orc.RecordReader = _
+
+  /**
+   * Required Schema.
+   */
+  private var requiredSchema: StructType = _
+
+  /**
+   * ColumnarBatch for vectorized execution by whole-stage codegen.
+   */
+  private var columnarBatch: ColumnarBatch = _
+
+  /**
+   * Writable columnVectors of ColumnarBatch.
+   */
+  private var columnVectors: Seq[WritableColumnVector] = _
+
+  /**
+   * The number of rows read and considered to be returned.
+   */
+  private var rowsReturned: Long = 0L
+
+  /**
+   * Total number of rows.
+   */
+  private var totalRowCount: Long = 0L
+
+  override def getCurrentKey: Void = null
+
+  override def getCurrentValue: ColumnarBatch = columnarBatch
+
+  override def getProgress: Float = rowsReturned.toFloat / totalRowCount
+
+  override def nextKeyValue(): Boolean = nextBatch()
+
+  override def close(): Unit = {
+if (columnarBatch != null) {
+  columnarBatch.close()
+  columnarBatch = null
+}
+if (rows != null) {
+  rows.close()
+  rows = null
+}
+  }
+
+  /**
+   * Initialize ORC file reader and batch record reader.
+   * Please note that `setRequiredSchema` is needed to be called after 
this.
+   */
+  override def initialize(inputSplit: InputSplit, taskAttemptContext: 
TaskAttemptContext): Unit = {
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+val conf = taskAttemptContext.getConfiguration
+reader = OrcFile.createReader(
+  fileSplit.getPath,
+  OrcFile.readerOptions(conf)
+.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+.filesystem(fileSplit.getPath.getFileSystem(conf)))
+
+val options = OrcInputFormat.buildOptions(conf, reader, 
fileSplit.getStart, fileSplit.getLength)
+rows = reader.rows(options)
+  }
+
+  /**
+   * Set required schema and partition information.
+   * With this information, this creates ColumnarBatch with the full 
schema.
+   */
+  def setRequiredSchema(
+  orcSchema: TypeDescription,
+  requestedColIds: Array[Int],
+  resultSchema: StructType,
+  requiredSchema: StructType,
+  partitionValues: InternalRow): Unit = {
+batch = orcSchema.createRowBatch(DEFAULT_SIZE)
+assert(!batch.selectedInUse)
+

[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2018-01-05 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r160017493
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala ---
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.orc
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure ORC read performance.
+ *
+ * This is in `sql/hive` module in order to compare `sql/core` and 
`sql/hive` ORC data sources.
+ */
+// scalastyle:off line.size.limit
+object OrcReadBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("OrcReadBenchmark")
+.config(conf)
+.getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private val NATIVE_ORC_FORMAT = 
"org.apache.spark.sql.execution.datasources.orc.OrcFileFormat"
+  private val HIVE_ORC_FORMAT = 
"org.apache.spark.sql.hive.orc.OrcFileFormat"
+
+  private def prepareTable(dir: File, df: DataFrame, partition: 
Option[String] = None): Unit = {
+val dirORC = dir.getCanonicalPath
+
+if (partition.isDefined) {
+  df.write.partitionBy(partition.get).orc(dirORC)
+} else {
+  df.write.orc(dirORC)
+}
+
+
spark.read.format(NATIVE_ORC_FORMAT).load(dirORC).createOrReplaceTempView("nativeOrcTable")
+
spark.read.format(HIVE_ORC_FORMAT).load(dirORC).createOrReplaceTempView("hiveOrcTable")
+  }
+
+  def numericScanBenchmark(values: Int, dataType: DataType): Unit = {
+val sqlBenchmark = new Benchmark(s"SQL Single ${dataType.sql} Column 
Scan", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
+import spark.implicits._
+spark.range(values).map(_ => 
Random.nextLong).createOrReplaceTempView("t1")
+
+prepareTable(dir, spark.sql(s"SELECT CAST(value as 
${dataType.sql}) id FROM t1"))
+
+sqlBenchmark.addCase("Native ORC Vectorized") { _ =>
+  spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
+}
+
+sqlBenchmark.addCase("Native ORC MR") { _ =>
+  withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> 
"false") {
+spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
+  }
+}
+
+sqlBenchmark.addCase("Hive built-in ORC") { _ =>
+  spark.sql("SELECT sum(id) FROM hiveOrcTable").collect()
+}
+
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.1
+Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+SQL Single TINYINT 

[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2018-01-05 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r160017477
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
 ---
@@ -110,4 +107,23 @@ object OrcUtils extends Logging {
   }
 }
   }
+
+  /**
+   * Return a fixed ORC schema with data schema information, if needed.
+   * The schema inside old ORC files might consist of invalid column names 
like '_col0'.
+   */
+  def getFixedTypeDescription(
--- End diff --

Okay. I'll remove this from this PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20168: SPARK-22730 Add ImageSchema support for non-integer imag...

2018-01-05 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/20168
  
Let's fix the PR title to `[SPARK-22730][ML] ...` BTW.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2018-01-05 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r160017338
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
 ---
@@ -139,15 +146,25 @@ class OrcFileFormat
   }
 }
 
+val resultSchema = StructType(requiredSchema.fields ++ 
partitionSchema.fields)
+val enableVectorizedReader = 
sparkSession.sessionState.conf.orcVectorizedReaderEnabled &&
+  supportBatch(sparkSession, resultSchema)
--- End diff --

Right. It's fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2018-01-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r160017293
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ * After creating, `initialize` and `setRequiredSchema` should be called 
sequentially.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] {
+  import OrcColumnarBatchReader._
+
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var recordReader: org.apache.orc.RecordReader = _
+
+  /**
+   * Required Schema.
+   */
+  private var requiredSchema: StructType = _
+
+  /**
+   * ColumnarBatch for vectorized execution by whole-stage codegen.
+   */
+  private var columnarBatch: ColumnarBatch = _
+
+  /**
+   * Writable column vectors of ColumnarBatch.
+   */
+  private var columnVectors: Seq[WritableColumnVector] = _
+
+  /**
+   * The number of rows read and considered to be returned.
+   */
+  private var rowsReturned: Long = 0L
+
+  /**
+   * Total number of rows.
+   */
+  private var totalRowCount: Long = 0L
+
+  override def getCurrentKey: Void = null
+
+  override def getCurrentValue: ColumnarBatch = columnarBatch
+
+  override def getProgress: Float = rowsReturned.toFloat / totalRowCount
+
+  override def nextKeyValue(): Boolean = nextBatch()
+
+  override def close(): Unit = {
+if (columnarBatch != null) {
+  columnarBatch.close()
+  columnarBatch = null
+}
+if (recordReader != null) {
+  recordReader.close()
+  recordReader = null
+}
+  }
+
+  /**
+   * Initialize ORC file reader and batch record reader.
+   * Please note that `setRequiredSchema` is needed to be called after 
this.
+   */
+  override def initialize(inputSplit: InputSplit, taskAttemptContext: 
TaskAttemptContext): Unit = {
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+val conf = taskAttemptContext.getConfiguration
+reader = OrcFile.createReader(
+  fileSplit.getPath,
+  OrcFile.readerOptions(conf)
+.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+.filesystem(fileSplit.getPath.getFileSystem(conf)))
+
+val options = OrcInputFormat.buildOptions(conf, reader, 
fileSplit.getStart, fileSplit.getLength)
+recordReader = reader.rows(options)
+totalRowCount = reader.getNumberOfRows
+  }
+
+  /**
+   * Set required schema and partition information.
+   * With this information, this creates ColumnarBatch with the full 
schema.
+   */
+  def setRequiredSchema(
+  orcSchema: TypeDescription,
+  requestedColIds: Array[Int],
+  resultSchema: StructType,
+  requiredSchema: 

[GitHub] spark issue #20168: SPARK-22730 Add ImageSchema support for non-integer imag...

2018-01-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20168
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20142: [SPARK-22930][PYTHON][SQL] Improve the description of Ve...

2018-01-05 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/20142
  
LGTM with minor comments regarding naming.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20168: SPARK-22730 Add ImageSchema support for non-integer imag...

2018-01-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20168
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85742/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20168: SPARK-22730 Add ImageSchema support for non-integer imag...

2018-01-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20168
  
**[Test build #85742 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85742/testReport)**
 for PR 20168 at commit 
[`70bae2f`](https://github.com/apache/spark/commit/70bae2f7e9d85a5f464f1bfc3a9426136259d5d1).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20142: [SPARK-22930][PYTHON][SQL] Improve the descriptio...

2018-01-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20142#discussion_r160017182
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3950,6 +3975,33 @@ def 
test_vectorized_udf_timestamps_respect_session_timezone(self):
 finally:
 self.spark.conf.set("spark.sql.session.timeZone", orig_tz)
 
+def test_nondeterministic_udf(self):
+# Test that nondeterministic UDFs are evaluated only once in 
chained UDF evaluations
+from pyspark.sql.functions import udf, pandas_udf, col
+
+@pandas_udf('double')
+def plus_ten(v):
+return v + 10
+random_udf = self.random_udf
+
+df = self.spark.range(10).withColumn('rand', random_udf(col('id')))
+result1 = df.withColumn('plus_ten(rand)', 
plus_ten(df['rand'])).toPandas()
+
+self.assertEqual(random_udf.deterministic, False)
+self.assertTrue(result1['plus_ten(rand)'].equals(result1['rand'] + 
10))
+
+def test_nondeterministic_udf_in_aggregate(self):
--- End diff --

test_vectorized_nondeterministic_udf_in_aggregate


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20142: [SPARK-22930][PYTHON][SQL] Improve the descriptio...

2018-01-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20142#discussion_r160017215
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3567,6 +3580,18 @@ def tearDownClass(cls):
 time.tzset()
 ReusedSQLTestCase.tearDownClass()
 
+@property
+def random_udf(self):
--- End diff --

Maybe `nondeterministic_udf`. So we don't have duplicate name to 
`random_udf` too.`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20142: [SPARK-22930][PYTHON][SQL] Improve the descriptio...

2018-01-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/20142#discussion_r160017176
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3950,6 +3975,33 @@ def 
test_vectorized_udf_timestamps_respect_session_timezone(self):
 finally:
 self.spark.conf.set("spark.sql.session.timeZone", orig_tz)
 
+def test_nondeterministic_udf(self):
--- End diff --

test_vectorized_nondeterministic_udf


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20142: [SPARK-22930][PYTHON][SQL] Improve the description of Ve...

2018-01-05 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/20142
  
LGTM except for the one minor comment


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2018-01-05 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r160017124
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ * After creating, `initialize` and `setRequiredSchema` should be called 
sequentially.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] {
+  import OrcColumnarBatchReader._
+
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var recordReader: org.apache.orc.RecordReader = _
+
+  /**
+   * Required Schema.
+   */
+  private var requiredSchema: StructType = _
+
+  /**
+   * ColumnarBatch for vectorized execution by whole-stage codegen.
+   */
+  private var columnarBatch: ColumnarBatch = _
+
+  /**
+   * Writable column vectors of ColumnarBatch.
+   */
+  private var columnVectors: Seq[WritableColumnVector] = _
+
+  /**
+   * The number of rows read and considered to be returned.
+   */
+  private var rowsReturned: Long = 0L
+
+  /**
+   * Total number of rows.
+   */
+  private var totalRowCount: Long = 0L
+
+  override def getCurrentKey: Void = null
+
+  override def getCurrentValue: ColumnarBatch = columnarBatch
+
+  override def getProgress: Float = rowsReturned.toFloat / totalRowCount
+
+  override def nextKeyValue(): Boolean = nextBatch()
+
+  override def close(): Unit = {
+if (columnarBatch != null) {
+  columnarBatch.close()
+  columnarBatch = null
+}
+if (recordReader != null) {
+  recordReader.close()
+  recordReader = null
+}
+  }
+
+  /**
+   * Initialize ORC file reader and batch record reader.
+   * Please note that `setRequiredSchema` is needed to be called after 
this.
+   */
+  override def initialize(inputSplit: InputSplit, taskAttemptContext: 
TaskAttemptContext): Unit = {
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+val conf = taskAttemptContext.getConfiguration
+reader = OrcFile.createReader(
+  fileSplit.getPath,
+  OrcFile.readerOptions(conf)
+.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+.filesystem(fileSplit.getPath.getFileSystem(conf)))
+
+val options = OrcInputFormat.buildOptions(conf, reader, 
fileSplit.getStart, fileSplit.getLength)
+recordReader = reader.rows(options)
+totalRowCount = reader.getNumberOfRows
+  }
+
+  /**
+   * Set required schema and partition information.
+   * With this information, this creates ColumnarBatch with the full 
schema.
+   */
+  def setRequiredSchema(
+  orcSchema: TypeDescription,
+  requestedColIds: Array[Int],
+  resultSchema: StructType,
+  

[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2018-01-05 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r160017101
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ * After creating, `initialize` and `setRequiredSchema` should be called 
sequentially.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] {
+  import OrcColumnarBatchReader._
+
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var recordReader: org.apache.orc.RecordReader = _
+
+  /**
+   * Required Schema.
+   */
+  private var requiredSchema: StructType = _
+
+  /**
+   * ColumnarBatch for vectorized execution by whole-stage codegen.
+   */
+  private var columnarBatch: ColumnarBatch = _
+
+  /**
+   * Writable column vectors of ColumnarBatch.
+   */
+  private var columnVectors: Seq[WritableColumnVector] = _
+
+  /**
+   * The number of rows read and considered to be returned.
+   */
+  private var rowsReturned: Long = 0L
+
+  /**
+   * Total number of rows.
+   */
+  private var totalRowCount: Long = 0L
+
+  override def getCurrentKey: Void = null
+
+  override def getCurrentValue: ColumnarBatch = columnarBatch
+
+  override def getProgress: Float = rowsReturned.toFloat / totalRowCount
+
+  override def nextKeyValue(): Boolean = nextBatch()
+
+  override def close(): Unit = {
+if (columnarBatch != null) {
+  columnarBatch.close()
+  columnarBatch = null
+}
+if (recordReader != null) {
+  recordReader.close()
+  recordReader = null
+}
+  }
+
+  /**
+   * Initialize ORC file reader and batch record reader.
+   * Please note that `setRequiredSchema` is needed to be called after 
this.
+   */
+  override def initialize(inputSplit: InputSplit, taskAttemptContext: 
TaskAttemptContext): Unit = {
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+val conf = taskAttemptContext.getConfiguration
+reader = OrcFile.createReader(
+  fileSplit.getPath,
+  OrcFile.readerOptions(conf)
+.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+.filesystem(fileSplit.getPath.getFileSystem(conf)))
+
+val options = OrcInputFormat.buildOptions(conf, reader, 
fileSplit.getStart, fileSplit.getLength)
+recordReader = reader.rows(options)
+totalRowCount = reader.getNumberOfRows
+  }
+
+  /**
+   * Set required schema and partition information.
+   * With this information, this creates ColumnarBatch with the full 
schema.
+   */
+  def setRequiredSchema(
+  orcSchema: TypeDescription,
+  requestedColIds: Array[Int],
+  resultSchema: StructType,
+  

[GitHub] spark pull request #20142: [SPARK-22930][PYTHON][SQL] Improve the descriptio...

2018-01-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20142#discussion_r160017026
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -3567,6 +3580,18 @@ def tearDownClass(cls):
 time.tzset()
 ReusedSQLTestCase.tearDownClass()
 
+@property
+def random_udf(self):
--- End diff --

Could we add "nondeterministic" in its name somehow?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20168: SPARK-22730 Add ImageSchema support for non-integ...

2018-01-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20168#discussion_r160016767
  
--- Diff: python/pyspark/ml/image.py ---
@@ -71,9 +88,30 @@ def ocvTypes(self):
 """
 
 if self._ocvTypes is None:
-ctx = SparkContext._active_spark_context
-self._ocvTypes = 
dict(ctx._jvm.org.apache.spark.ml.image.ImageSchema.javaOcvTypes())
-return self._ocvTypes
+ctx = SparkContext.getOrCreate()
+ocvTypeList = 
ctx._jvm.org.apache.spark.ml.image.ImageSchema.javaOcvTypes()
+self._ocvTypes = [self._OcvType(name=x.name(),
+mode=x.mode(),
+nChannels=x.nChannels(),
+dataType=x.dataType(),
+
nptype=self._ocvToNumpyMap[x.dataType()])
+  for x in ocvTypeList]
+return self._ocvTypes[:]
+
+def ocvTypeByName(self, name):
+if self._ocvTypesByName is None:
+self._ocvTypesByName = {x.name: x for x in self.ocvTypes}
+if name not in self._ocvTypesByName:
+raise ValueError(
+"Can not find matching OpenCvFormat for type = '%s'; 
supported formats are = %s" %
+(name, str(
+self._ocvTypesByName.keys(
+return self._ocvTypesByName[name]
+
+def ocvTypeByMode(self, mode):
--- End diff --

Is it meant to be public? Seems doc is missing and this one doesn't look 
consistent with Scala side?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20168: SPARK-22730 Add ImageSchema support for non-integ...

2018-01-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20168#discussion_r160016719
  
--- Diff: python/pyspark/ml/image.py ---
@@ -71,9 +88,30 @@ def ocvTypes(self):
 """
 
 if self._ocvTypes is None:
-ctx = SparkContext._active_spark_context
-self._ocvTypes = 
dict(ctx._jvm.org.apache.spark.ml.image.ImageSchema.javaOcvTypes())
-return self._ocvTypes
+ctx = SparkContext.getOrCreate()
+ocvTypeList = 
ctx._jvm.org.apache.spark.ml.image.ImageSchema.javaOcvTypes()
+self._ocvTypes = [self._OcvType(name=x.name(),
+mode=x.mode(),
+nChannels=x.nChannels(),
+dataType=x.dataType(),
+
nptype=self._ocvToNumpyMap[x.dataType()])
+  for x in ocvTypeList]
+return self._ocvTypes[:]
--- End diff --

Is it for copy? I usually do `list(self._ocvTypes)` tho.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20168: SPARK-22730 Add ImageSchema support for non-integ...

2018-01-05 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/20168#discussion_r160016683
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala 
---
@@ -37,20 +37,51 @@ import org.apache.spark.sql.types._
 @Since("2.3.0")
 object ImageSchema {
 
-  val undefinedImageType = "Undefined"
-
   /**
-   * (Scala-specific) OpenCV type mapping supported
+   * OpenCv type representation
+   * @param mode ordinal for the type
+   * @param dataType open cv data type
+   * @param nChannels number of color channels
*/
-  val ocvTypes: Map[String, Int] = Map(
-undefinedImageType -> -1,
-"CV_8U" -> 0, "CV_8UC1" -> 0, "CV_8UC3" -> 16, "CV_8UC4" -> 24
-  )
+  case class OpenCvType(mode: Int, dataType: String, nChannels: Int) {
+def name: String = "CV_" + dataType + "C" + nChannels
+override def toString: String = "OpenCvType(mode = " + mode + ", name 
= " + name + ")"
+  }
+
+  object OpenCvType {
+def get(name: String): OpenCvType = {
+  ocvTypes.find(x => x.name == name).getOrElse(
+throw new IllegalArgumentException("Unknown open cv type " + name))
+}
+def get(mode: Int): OpenCvType = {
+  ocvTypes.find(x => x.mode == mode).getOrElse(
+throw new IllegalArgumentException("Unknown open cv mode " + mode))
+}
+val undefinedType = OpenCvType(-1, "N/A", -1)
+  }
 
   /**
-   * (Java-specific) OpenCV type mapping supported
+   * A Mapping of Type to Numbers in OpenCV
+   *
+   *C1 C2  C3  C4
+   * CV_8U   0  8  16  24
+   * CV_8S   1  9  17  25
+   * CV_16U  2 10  18  26
+   * CV_16S  3 11  19  27
+   * CV_32S  4 12  20  28
+   * CV_32F  5 13  21  29
+   * CV_64F  6 14  22  30
*/
-  val javaOcvTypes: java.util.Map[String, Int] = ocvTypes.asJava
+  val ocvTypes = {
+val types =
+  for (nc <- Array(1, 2, 3, 4);
+   dt <- Array("8U", "8S", "16U", "16S", "32S", "32F", "64F"))
+yield (dt, nc)
+val ordinals = for (i <- 0 to 3; j <- 0 to 6) yield ( i * 8 + j)
+OpenCvType.undefinedType +: (ordinals zip types).map(x => 
OpenCvType(x._1, x._2._1, x._2._2))
+  }
+
+  val javaOcvTypes = ocvTypes.asJava
--- End diff --

Hm .. why did we remove the doc here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20168: SPARK-22730 Add ImageSchema support for non-integer imag...

2018-01-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20168
  
**[Test build #85742 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85742/testReport)**
 for PR 20168 at commit 
[`70bae2f`](https://github.com/apache/spark/commit/70bae2f7e9d85a5f464f1bfc3a9426136259d5d1).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20168: SPARK-22730 Add ImageSchema support for non-integer imag...

2018-01-05 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/20168
  
cc @jkbradley, @imatiach-msft, @MrBago and @thunterdb.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20168: SPARK-22730 Add ImageSchema support for non-integer imag...

2018-01-05 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/20168
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20163: [SPARK-22966][PySpark] Spark SQL should handle Python UD...

2018-01-05 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/20163
  
@ueshin @icexelloss @cloud-fan @rednaxelafx, which one would you prefer?

To me, I like 1 at most. If the perf diff is trivial, 2. is also fine. If 
3. works fine, I think I am also fine with it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2018-01-05 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/19943
  
 overall looks good, my major concern is 
https://github.com/apache/spark/pull/19943/files#r159221758 , do you have an 
answer? This may be a big drawback compared to the wrapper solution.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2018-01-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r160016468
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala ---
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.orc
+
+import java.io.File
+
+import scala.util.{Random, Try}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{Benchmark, Utils}
+
+
+/**
+ * Benchmark to measure ORC read performance.
+ *
+ * This is in `sql/hive` module in order to compare `sql/core` and 
`sql/hive` ORC data sources.
+ */
+// scalastyle:off line.size.limit
+object OrcReadBenchmark {
+  val conf = new SparkConf()
+  conf.set("orc.compression", "snappy")
+
+  private val spark = SparkSession.builder()
+.master("local[1]")
+.appName("OrcReadBenchmark")
+.config(conf)
+.getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true")
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  private val NATIVE_ORC_FORMAT = 
"org.apache.spark.sql.execution.datasources.orc.OrcFileFormat"
+  private val HIVE_ORC_FORMAT = 
"org.apache.spark.sql.hive.orc.OrcFileFormat"
+
+  private def prepareTable(dir: File, df: DataFrame, partition: 
Option[String] = None): Unit = {
+val dirORC = dir.getCanonicalPath
+
+if (partition.isDefined) {
+  df.write.partitionBy(partition.get).orc(dirORC)
+} else {
+  df.write.orc(dirORC)
+}
+
+
spark.read.format(NATIVE_ORC_FORMAT).load(dirORC).createOrReplaceTempView("nativeOrcTable")
+
spark.read.format(HIVE_ORC_FORMAT).load(dirORC).createOrReplaceTempView("hiveOrcTable")
+  }
+
+  def numericScanBenchmark(values: Int, dataType: DataType): Unit = {
+val sqlBenchmark = new Benchmark(s"SQL Single ${dataType.sql} Column 
Scan", values)
+
+withTempPath { dir =>
+  withTempTable("t1", "nativeOrcTable", "hiveOrcTable") {
+import spark.implicits._
+spark.range(values).map(_ => 
Random.nextLong).createOrReplaceTempView("t1")
+
+prepareTable(dir, spark.sql(s"SELECT CAST(value as 
${dataType.sql}) id FROM t1"))
+
+sqlBenchmark.addCase("Native ORC Vectorized") { _ =>
+  spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
+}
+
+sqlBenchmark.addCase("Native ORC MR") { _ =>
+  withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> 
"false") {
+spark.sql("SELECT sum(id) FROM nativeOrcTable").collect()
+  }
+}
+
+sqlBenchmark.addCase("Hive built-in ORC") { _ =>
+  spark.sql("SELECT sum(id) FROM hiveOrcTable").collect()
+}
+
+/*
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.1
+Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
+
+SQL Single TINYINT Column 

[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2018-01-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r160016446
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
 ---
@@ -110,4 +107,23 @@ object OrcUtils extends Logging {
   }
 }
   }
+
+  /**
+   * Return a fixed ORC schema with data schema information, if needed.
+   * The schema inside old ORC files might consist of invalid column names 
like '_col0'.
+   */
+  def getFixedTypeDescription(
--- End diff --

Do we really need this? The ORC schema is used to create the ORC batch, for 
the batch data I think we only care about the data types not field names.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2018-01-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r160016431
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ * After creating, `initialize` and `setRequiredSchema` should be called 
sequentially.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] {
+  import OrcColumnarBatchReader._
+
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var recordReader: org.apache.orc.RecordReader = _
+
+  /**
+   * Required Schema.
+   */
+  private var requiredSchema: StructType = _
+
+  /**
+   * ColumnarBatch for vectorized execution by whole-stage codegen.
+   */
+  private var columnarBatch: ColumnarBatch = _
+
+  /**
+   * Writable column vectors of ColumnarBatch.
+   */
+  private var columnVectors: Seq[WritableColumnVector] = _
+
+  /**
+   * The number of rows read and considered to be returned.
+   */
+  private var rowsReturned: Long = 0L
+
+  /**
+   * Total number of rows.
+   */
+  private var totalRowCount: Long = 0L
+
+  override def getCurrentKey: Void = null
+
+  override def getCurrentValue: ColumnarBatch = columnarBatch
+
+  override def getProgress: Float = rowsReturned.toFloat / totalRowCount
+
+  override def nextKeyValue(): Boolean = nextBatch()
+
+  override def close(): Unit = {
+if (columnarBatch != null) {
+  columnarBatch.close()
+  columnarBatch = null
+}
+if (recordReader != null) {
+  recordReader.close()
+  recordReader = null
+}
+  }
+
+  /**
+   * Initialize ORC file reader and batch record reader.
+   * Please note that `setRequiredSchema` is needed to be called after 
this.
+   */
+  override def initialize(inputSplit: InputSplit, taskAttemptContext: 
TaskAttemptContext): Unit = {
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+val conf = taskAttemptContext.getConfiguration
+reader = OrcFile.createReader(
+  fileSplit.getPath,
+  OrcFile.readerOptions(conf)
+.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+.filesystem(fileSplit.getPath.getFileSystem(conf)))
+
+val options = OrcInputFormat.buildOptions(conf, reader, 
fileSplit.getStart, fileSplit.getLength)
+recordReader = reader.rows(options)
+totalRowCount = reader.getNumberOfRows
+  }
+
+  /**
+   * Set required schema and partition information.
+   * With this information, this creates ColumnarBatch with the full 
schema.
+   */
+  def setRequiredSchema(
+  orcSchema: TypeDescription,
+  requestedColIds: Array[Int],
+  resultSchema: StructType,
+  requiredSchema: 

[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2018-01-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r160016423
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ * After creating, `initialize` and `setRequiredSchema` should be called 
sequentially.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] {
+  import OrcColumnarBatchReader._
+
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var recordReader: org.apache.orc.RecordReader = _
+
+  /**
+   * Required Schema.
+   */
+  private var requiredSchema: StructType = _
+
+  /**
+   * ColumnarBatch for vectorized execution by whole-stage codegen.
+   */
+  private var columnarBatch: ColumnarBatch = _
+
+  /**
+   * Writable column vectors of ColumnarBatch.
+   */
+  private var columnVectors: Seq[WritableColumnVector] = _
+
+  /**
+   * The number of rows read and considered to be returned.
+   */
+  private var rowsReturned: Long = 0L
+
+  /**
+   * Total number of rows.
+   */
+  private var totalRowCount: Long = 0L
+
+  override def getCurrentKey: Void = null
+
+  override def getCurrentValue: ColumnarBatch = columnarBatch
+
+  override def getProgress: Float = rowsReturned.toFloat / totalRowCount
+
+  override def nextKeyValue(): Boolean = nextBatch()
+
+  override def close(): Unit = {
+if (columnarBatch != null) {
+  columnarBatch.close()
+  columnarBatch = null
+}
+if (recordReader != null) {
+  recordReader.close()
+  recordReader = null
+}
+  }
+
+  /**
+   * Initialize ORC file reader and batch record reader.
+   * Please note that `setRequiredSchema` is needed to be called after 
this.
+   */
+  override def initialize(inputSplit: InputSplit, taskAttemptContext: 
TaskAttemptContext): Unit = {
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+val conf = taskAttemptContext.getConfiguration
+reader = OrcFile.createReader(
+  fileSplit.getPath,
+  OrcFile.readerOptions(conf)
+.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+.filesystem(fileSplit.getPath.getFileSystem(conf)))
+
+val options = OrcInputFormat.buildOptions(conf, reader, 
fileSplit.getStart, fileSplit.getLength)
+recordReader = reader.rows(options)
+totalRowCount = reader.getNumberOfRows
+  }
+
+  /**
+   * Set required schema and partition information.
+   * With this information, this creates ColumnarBatch with the full 
schema.
+   */
+  def setRequiredSchema(
+  orcSchema: TypeDescription,
+  requestedColIds: Array[Int],
+  resultSchema: StructType,
+  requiredSchema: 

[GitHub] spark issue #20076: [SPARK-21786][SQL] When acquiring 'compressionCodecClass...

2018-01-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20076
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20076: [SPARK-21786][SQL] When acquiring 'compressionCodecClass...

2018-01-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20076
  
**[Test build #85741 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85741/testReport)**
 for PR 20076 at commit 
[`1a8c654`](https://github.com/apache/spark/commit/1a8c654805656d3e143c2e63355c7b6365dac471).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20076: [SPARK-21786][SQL] When acquiring 'compressionCodecClass...

2018-01-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20076
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85741/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20076: [SPARK-21786][SQL] When acquiring 'compressionCodecClass...

2018-01-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20076
  
**[Test build #85741 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85741/testReport)**
 for PR 20076 at commit 
[`1a8c654`](https://github.com/apache/spark/commit/1a8c654805656d3e143c2e63355c7b6365dac471).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2018-01-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r160016334
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ * After creating, `initialize` and `setRequiredSchema` should be called 
sequentially.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] {
+  import OrcColumnarBatchReader._
+
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var recordReader: org.apache.orc.RecordReader = _
+
+  /**
+   * Required Schema.
+   */
+  private var requiredSchema: StructType = _
+
+  /**
+   * ColumnarBatch for vectorized execution by whole-stage codegen.
+   */
+  private var columnarBatch: ColumnarBatch = _
+
+  /**
+   * Writable column vectors of ColumnarBatch.
+   */
+  private var columnVectors: Seq[WritableColumnVector] = _
+
+  /**
+   * The number of rows read and considered to be returned.
+   */
+  private var rowsReturned: Long = 0L
+
+  /**
+   * Total number of rows.
+   */
+  private var totalRowCount: Long = 0L
+
+  override def getCurrentKey: Void = null
+
+  override def getCurrentValue: ColumnarBatch = columnarBatch
+
+  override def getProgress: Float = rowsReturned.toFloat / totalRowCount
+
+  override def nextKeyValue(): Boolean = nextBatch()
+
+  override def close(): Unit = {
+if (columnarBatch != null) {
+  columnarBatch.close()
+  columnarBatch = null
+}
+if (recordReader != null) {
+  recordReader.close()
+  recordReader = null
+}
+  }
+
+  /**
+   * Initialize ORC file reader and batch record reader.
+   * Please note that `setRequiredSchema` is needed to be called after 
this.
+   */
+  override def initialize(inputSplit: InputSplit, taskAttemptContext: 
TaskAttemptContext): Unit = {
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+val conf = taskAttemptContext.getConfiguration
+reader = OrcFile.createReader(
+  fileSplit.getPath,
+  OrcFile.readerOptions(conf)
+.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+.filesystem(fileSplit.getPath.getFileSystem(conf)))
+
+val options = OrcInputFormat.buildOptions(conf, reader, 
fileSplit.getStart, fileSplit.getLength)
+recordReader = reader.rows(options)
+totalRowCount = reader.getNumberOfRows
+  }
+
+  /**
+   * Set required schema and partition information.
+   * With this information, this creates ColumnarBatch with the full 
schema.
+   */
+  def setRequiredSchema(
+  orcSchema: TypeDescription,
+  requestedColIds: Array[Int],
+  resultSchema: StructType,
+  requiredSchema: 

[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader

2018-01-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19943#discussion_r160016341
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.orc
+
+import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
+import org.apache.hadoop.mapreduce.lib.input.FileSplit
+import org.apache.orc._
+import org.apache.orc.mapred.OrcInputFormat
+import org.apache.orc.storage.ql.exec.vector._
+import org.apache.orc.storage.serde2.io.HiveDecimalWritable
+
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.vectorized._
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.vectorized._
+
+
+/**
+ * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
+ * After creating, `initialize` and `setRequiredSchema` should be called 
sequentially.
+ */
+private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] {
+  import OrcColumnarBatchReader._
+
+  /**
+   * ORC File Reader.
+   */
+  private var reader: Reader = _
+
+  /**
+   * Vectorized Row Batch.
+   */
+  private var batch: VectorizedRowBatch = _
+
+  /**
+   * Requested Column IDs.
+   */
+  private var requestedColIds: Array[Int] = _
+
+  /**
+   * Record reader from row batch.
+   */
+  private var recordReader: org.apache.orc.RecordReader = _
+
+  /**
+   * Required Schema.
+   */
+  private var requiredSchema: StructType = _
+
+  /**
+   * ColumnarBatch for vectorized execution by whole-stage codegen.
+   */
+  private var columnarBatch: ColumnarBatch = _
+
+  /**
+   * Writable column vectors of ColumnarBatch.
+   */
+  private var columnVectors: Seq[WritableColumnVector] = _
+
+  /**
+   * The number of rows read and considered to be returned.
+   */
+  private var rowsReturned: Long = 0L
+
+  /**
+   * Total number of rows.
+   */
+  private var totalRowCount: Long = 0L
+
+  override def getCurrentKey: Void = null
+
+  override def getCurrentValue: ColumnarBatch = columnarBatch
+
+  override def getProgress: Float = rowsReturned.toFloat / totalRowCount
+
+  override def nextKeyValue(): Boolean = nextBatch()
+
+  override def close(): Unit = {
+if (columnarBatch != null) {
+  columnarBatch.close()
+  columnarBatch = null
+}
+if (recordReader != null) {
+  recordReader.close()
+  recordReader = null
+}
+  }
+
+  /**
+   * Initialize ORC file reader and batch record reader.
+   * Please note that `setRequiredSchema` is needed to be called after 
this.
+   */
+  override def initialize(inputSplit: InputSplit, taskAttemptContext: 
TaskAttemptContext): Unit = {
+val fileSplit = inputSplit.asInstanceOf[FileSplit]
+val conf = taskAttemptContext.getConfiguration
+reader = OrcFile.createReader(
+  fileSplit.getPath,
+  OrcFile.readerOptions(conf)
+.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
+.filesystem(fileSplit.getPath.getFileSystem(conf)))
+
+val options = OrcInputFormat.buildOptions(conf, reader, 
fileSplit.getStart, fileSplit.getLength)
+recordReader = reader.rows(options)
+totalRowCount = reader.getNumberOfRows
+  }
+
+  /**
+   * Set required schema and partition information.
+   * With this information, this creates ColumnarBatch with the full 
schema.
+   */
+  def setRequiredSchema(
+  orcSchema: TypeDescription,
+  requestedColIds: Array[Int],
+  resultSchema: StructType,
+  requiredSchema: 

[GitHub] spark issue #20076: [SPARK-21786][SQL] When acquiring 'compressionCodecClass...

2018-01-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20076
  
**[Test build #85740 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85740/testReport)**
 for PR 20076 at commit 
[`9466797`](https://github.com/apache/spark/commit/946679745f16838932e74fabb70f2ad702fa4640).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20029: [SPARK-22793][SQL]Memory leak in Spark Thrift Server

2018-01-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20029
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85736/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20076: [SPARK-21786][SQL] When acquiring 'compressionCodecClass...

2018-01-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20076
  
**[Test build #85739 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85739/testReport)**
 for PR 20076 at commit 
[`26c1c61`](https://github.com/apache/spark/commit/26c1c61ffd5742b71aefdec33ddcb69ba944).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20029: [SPARK-22793][SQL]Memory leak in Spark Thrift Server

2018-01-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20029
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20029: [SPARK-22793][SQL]Memory leak in Spark Thrift Server

2018-01-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20029
  
**[Test build #85736 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85736/testReport)**
 for PR 20029 at commit 
[`2b1e166`](https://github.com/apache/spark/commit/2b1e166f4f43b300d272fc7ce1d9d7997f7ae3cd).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20166: [SPARK-22973][SQL] Fix incorrect results of Casting Map ...

2018-01-05 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/20166
  
LGTM except one minor comment


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20166: [SPARK-22973][SQL] Fix incorrect results of Casti...

2018-01-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20166#discussion_r160016007
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 ---
@@ -228,6 +228,35 @@ case class Cast(child: Expression, dataType: DataType, 
timeZoneId: Option[String
 builder.append("]")
 builder.build()
   })
+case MapType(kt, vt, _) =>
+  buildCast[MapData](_, map => {
+val builder = new UTF8StringBuilder
+builder.append("[")
+if (map.numElements > 0) {
+  val keyToUTF8String = castToString(kt)
+  val valueToUTF8String = castToString(vt)
+  builder.append(keyToUTF8String(map.keyArray().get(0, 
kt)).asInstanceOf[UTF8String])
--- End diff --

`map.keyArray()` and `map.valueArray` appear many times, we can create 2 
local variables for them.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20029: [SPARK-22793][SQL]Memory leak in Spark Thrift Server

2018-01-05 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/20029
  
> The hiveClient created for the resourceLoader is only used to addJar, 
which is, in turn, to add Jar to the shared IsolatedClientLoader. Then we can 
just use the shared hive client for this purpose.

Shouldn't `addJar` be session-based? At least seems in Hive it is: 
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Cli#LanguageManualCli-HiveResources

Although looks like in `SessionResourceLoader` for `SessionState`, `addJar` 
isn't session-based too. So at least seems we have consistent behavior.







---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20163: [SPARK-22966][PySpark] Spark SQL should handle Python UD...

2018-01-05 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/20163
  
Hey @rednaxelafx that's fine. We all make mistake and I usually think it's 
always better then not trying. I also made a mistake at the first time. It was 
easier to debug this with your comments and details in the PR description. 
Thank you.

> I'd like to wait for more discussions / suggestions on whether or not we 
want a behavior change that makes this reproducer work, or a simple document 
change that'll just say PySpark doesn't support mismatching returnType.

So, few options might be ...

1. Simple document this

2. `str` logics in `type.StringType` - in this case, I think we should do a 
small banchmark. It'd would be so hard and I think you could reuse commands I 
used here - https://github.com/apache/spark/pull/19246#discussion_r139874732

3. Investigate the way to register a custom Pyrolite unpickler that 
converts `datetime.date*` to `Timestamp` or `Date`. I believe we already have 
some custom fixes there.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over Struct...

2018-01-05 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/20010
  
Overall, it is reasonable. What is the current behavior in Hive?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20170: [SPARK-22960][K8S] Revert use of ARG base_image in image...

2018-01-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20170
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20170: [SPARK-22960][K8S] Revert use of ARG base_image in image...

2018-01-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20170
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85734/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20170: [SPARK-22960][K8S] Revert use of ARG base_image in image...

2018-01-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20170
  
**[Test build #85734 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85734/testReport)**
 for PR 20170 at commit 
[`2f8d0b9`](https://github.com/apache/spark/commit/2f8d0b96babb47abce2f2af8d36c33c429eb7257).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17968: [SPARK-9792] Make DenseMatrix equality semantical

2018-01-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17968
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17968: [SPARK-9792] Make DenseMatrix equality semantical

2018-01-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/17968
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85738/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17968: [SPARK-9792] Make DenseMatrix equality semantical

2018-01-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17968
  
**[Test build #85738 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85738/testReport)**
 for PR 17968 at commit 
[`311c94a`](https://github.com/apache/spark/commit/311c94a3d608b0b86f3ce39415639ec260e5af37).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17968: [SPARK-9792] Make DenseMatrix equality semantical

2018-01-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17968
  
**[Test build #85738 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85738/testReport)**
 for PR 17968 at commit 
[`311c94a`](https://github.com/apache/spark/commit/311c94a3d608b0b86f3ce39415639ec260e5af37).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17968: [SPARK-9792] Make DenseMatrix equality semantical

2018-01-05 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/17968
  
cc @WeichenXu123 @yanboliang 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17968: [SPARK-9792] Make DenseMatrix equality semantical

2018-01-05 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/17968
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20170: [SPARK-22960][K8S] Revert use of ARG base_image i...

2018-01-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20170


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20170: [SPARK-22960][K8S] Revert use of ARG base_image in image...

2018-01-05 Thread vanzin
Github user vanzin commented on the issue:

https://github.com/apache/spark/pull/20170
  
Tests are taking to long...

Merging to master / 2.3.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20098: [SPARK-22914][DEPLOY] Register history.ui.port

2018-01-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20098


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20135: [SPARK-22937][SQL] SQL elt output binary for bina...

2018-01-05 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20135


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20135: [SPARK-22937][SQL] SQL elt output binary for bina...

2018-01-05 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/20135#discussion_r160012524
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 ---
@@ -271,33 +271,45 @@ case class ConcatWs(children: Seq[Expression])
   }
 }
 
+/**
+ * An expression that returns the `n`-th input in given inputs.
+ * If all inputs are binary, `elt` returns an output as binary. Otherwise, 
it returns as string.
+ * If any input is null, `elt` returns null.
+ */
 // scalastyle:off line.size.limit
 @ExpressionDescription(
-  usage = "_FUNC_(n, str1, str2, ...) - Returns the `n`-th string, e.g., 
returns `str2` when `n` is 2.",
+  usage = "_FUNC_(n, input1, input2, ...) - Returns the `n`-th input, 
e.g., returns `input2` when `n` is 2.",
   examples = """
 Examples:
   > SELECT _FUNC_(1, 'scala', 'java');
scala
   """)
 // scalastyle:on line.size.limit
-case class Elt(children: Seq[Expression])
-  extends Expression with ImplicitCastInputTypes {
+case class Elt(children: Seq[Expression]) extends Expression {
 
   private lazy val indexExpr = children.head
-  private lazy val stringExprs = children.tail.toArray
+  private lazy val inputExprs = children.tail.toArray
 
   /** This expression is always nullable because it returns null if index 
is out of range. */
   override def nullable: Boolean = true
 
-  override def dataType: DataType = StringType
-
-  override def inputTypes: Seq[DataType] = IntegerType +: 
Seq.fill(children.size - 1)(StringType)
+  override def dataType: DataType = 
inputExprs.map(_.dataType).headOption.getOrElse(StringType)
--- End diff --

We issue an exception when the input argument is 1.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20135: [SPARK-22937][SQL] SQL elt output binary for binary inpu...

2018-01-05 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/20135
  
Thanks! Merged to master/2.3


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20135: [SPARK-22937][SQL] SQL elt output binary for binary inpu...

2018-01-05 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/20135
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20098: [SPARK-22914][DEPLOY] Register history.ui.port

2018-01-05 Thread vanzin
Github user vanzin commented on the issue:

https://github.com/apache/spark/pull/20098
  
Merging to master / 2.3.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20013: [SPARK-20657][core] Speed up rendering of the stages pag...

2018-01-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20013
  
**[Test build #85737 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85737/testReport)**
 for PR 20013 at commit 
[`c4e7f61`](https://github.com/apache/spark/commit/c4e7f6149fbff39c9f3955f536095ed4fd5df2ff).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20029: [SPARK-22793][SQL]Memory leak in Spark Thrift Server

2018-01-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20029
  
**[Test build #85736 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85736/testReport)**
 for PR 20029 at commit 
[`2b1e166`](https://github.com/apache/spark/commit/2b1e166f4f43b300d272fc7ce1d9d7997f7ae3cd).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20029: [SPARK-22793][SQL]Memory leak in Spark Thrift Server

2018-01-05 Thread liufengdb
Github user liufengdb commented on the issue:

https://github.com/apache/spark/pull/20029
  
lgtm!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20029: [SPARK-22793][SQL]Memory leak in Spark Thrift Server

2018-01-05 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/20029
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20097: [SPARK-22912] v2 data source support in MicroBatchExecut...

2018-01-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20097
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20097: [SPARK-22912] v2 data source support in MicroBatchExecut...

2018-01-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20097
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85730/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20097: [SPARK-22912] v2 data source support in MicroBatchExecut...

2018-01-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20097
  
**[Test build #85730 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85730/testReport)**
 for PR 20097 at commit 
[`5f0a6e2`](https://github.com/apache/spark/commit/5f0a6e271bec953c79b2298190cb848129f5b84e).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160009573
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
 ---
@@ -19,7 +19,8 @@ package org.apache.spark.sql.kafka010
 
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+import org.apache.spark.sql.execution.streaming.{Offset => OffsetV1, 
SerializedOffset}
--- End diff --

Ummm.. i think its better to rename new Offset to OffsetV2 than this rename 
old one to OffsetV1. This will keep it more consistent with other APIs which 
have V2 in them. Also, the MicroBatchExecution in the other PR also uses 
OffsetV2. Sorry for the nothing think ahead on this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20169: [SPARK-17088][hive] Fix 'sharesHadoopClasses' option whe...

2018-01-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20169
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20169: [SPARK-17088][hive] Fix 'sharesHadoopClasses' option whe...

2018-01-05 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20169
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85733/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20169: [SPARK-17088][hive] Fix 'sharesHadoopClasses' option whe...

2018-01-05 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20169
  
**[Test build #85733 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85733/testReport)**
 for PR 20169 at commit 
[`668fcba`](https://github.com/apache/spark/commit/668fcbac6b24e6c1e4b6c720459654ca8e88f03c).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160004815
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * A [[ContinuousReader]] for data from kafka.
+ *
+ * @param offsetReader  a reader used to get kafka offsets. Note that the 
actual data will be
+ *  read by per-task consumers generated later.
+ * @param kafkaParams   String params for per-task Kafka consumers.
+ * @param sourceOptions The 
[[org.apache.spark.sql.sources.v2.DataSourceV2Options]] params which
+ *  are not Kafka consumer params.
+ * @param metadataPath Path to a directory this reader can use for writing 
metadata.
+ * @param initialOffsets The Kafka offsets to start reading data at.
+ * @param failOnDataLoss Flag indicating whether reading should fail in 
data loss
+ *   scenarios, where some offsets after the specified 
initial ones can't be
+ *   properly read.
+ */
+class KafkaContinuousReader(
+offsetReader: KafkaOffsetReader,
+kafkaParams: java.util.Map[String, Object],
--- End diff --

Since there are lots of different uses of java.util.* ... you can probably 
rename java.util to ju. Thats what the file KafkaSourceProvider class does.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160007884
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -418,11 +418,16 @@ abstract class StreamExecution(
* Blocks the current thread until processing for data from the given 
`source` has reached at
* least the given `Offset`. This method is intended for use primarily 
when writing tests.
*/
-  private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = {
+  private[sql] def awaitOffset(sourceIndex: Int, newOffset: Offset): Unit 
= {
 assertAwaitThread()
 def notDone = {
   val localCommittedOffsets = committedOffsets
-  !localCommittedOffsets.contains(source) || 
localCommittedOffsets(source) != newOffset
+  if (sources.length <= sourceIndex) {
+false
--- End diff --

The race condition is present because `sources` is initialized to Seq.empty 
and then assigned to the actual sources. You can actually initialize `sources` 
to null, and then return `notDone = false` when `sources` is null. Any other 
mismatch should throw error. I dont like this current code which hides 
erroneous situations.
  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160006676
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousWriter.scala
 ---
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.clients.producer.{Callback, ProducerRecord, 
RecordMetadata}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
Literal, UnsafeProjection}
+import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{kafkaParamsForProducer, 
TOPIC_OPTION_KEY}
+import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.{BinaryType, StringType, StructType}
+
+/**
+ * Dummy commit message. The DataSourceV2 framework requires a commit 
message implementation but we
+ * don't need to really send one.
+ */
+case object KafkaWriterCommitMessage extends WriterCommitMessage
+
+/**
+ * A [[ContinuousWriter]] for Kafka writing. Responsible for generating 
the writer factory.
+ * @param topic The topic this writer is responsible for. If None, topic 
will be inferred from
+ *  a `topic` field in the incoming data.
+ * @param producerParams Parameters for Kafka producers in each task.
+ * @param schema The schema of the input data.
+ */
+class KafkaContinuousWriter(
+topic: Option[String], producerParams: Map[String, String], schema: 
StructType)
+  extends ContinuousWriter with SupportsWriteInternalRow {
+
+  override def createInternalRowWriterFactory(): 
KafkaContinuousWriterFactory =
+KafkaContinuousWriterFactory(topic, producerParams, schema)
+
+  override def commit(epochId: Long, messages: 
Array[WriterCommitMessage]): Unit = {}
+  override def abort(messages: Array[WriterCommitMessage]): Unit = {}
+}
+
+/**
+ * A [[DataWriterFactory]] for Kafka writing. Will be serialized and sent 
to executors to generate
+ * the per-task data writers.
+ * @param topic The topic that should be written to. If None, topic will 
be inferred from
+ *  a `topic` field in the incoming data.
+ * @param producerParams Parameters for Kafka producers in each task.
+ * @param schema The schema of the input data.
+ */
+case class KafkaContinuousWriterFactory(
+topic: Option[String], producerParams: Map[String, String], schema: 
StructType)
+  extends DataWriterFactory[InternalRow] {
+
+  override def createDataWriter(partitionId: Int, attemptNumber: Int): 
DataWriter[InternalRow] = {
+new KafkaContinuousDataWriter(topic, producerParams, 
schema.toAttributes)
+  }
+}
+
+/**
+ * A [[DataWriter]] for Kafka writing. One data writer will be created in 
each partition to
+ * process incoming rows.
+ *
+ * @param targetTopic The topic that this data writer is targeting. If 
None, topic will be inferred
+ *from a `topic` field in the incoming data.
+ * @param producerParams Parameters to use for the Kafka producer.
+ * @param inputSchema The attributes in the input data.
+ */
+class KafkaContinuousDataWriter(
+targetTopic: Option[String], producerParams: Map[String, String], 
inputSchema: Seq[Attribute])
+  extends KafkaRowWriter(inputSchema, targetTopic) with 
DataWriter[InternalRow] {
+  import scala.collection.JavaConverters._
+
+  private lazy val producer = CachedKafkaProducer.getOrCreate(
+new java.util.HashMap[String, Object](producerParams.asJava))
+
+  def write(row: InternalRow): Unit = {
+checkForErrors()
+sendRow(row, producer)
+  }
+
+  def commit(): 

  1   2   3   >