[GitHub] spark issue #20167: Allow providing Mesos principal & secret via files (SPAR...
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/20167 is putting secrets as plain text files a good practice..? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20154: [SPARK-22960][k8s] Make build-push-docker-images.sh more...
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/20154 that's good, I think we should still address the finer point of https://github.com/apache/spark/pull/20154#pullrequestreview-86833216 - if docker hub can't build spark-base then pretty much we are crossing out the possibility of releasing the docker images with 2.3.0 release as ASF. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19690: [SPARK-22467]Added a switch to support whether `stdout_s...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/19690 Under most conditions the users shouldn't ignore the printed error/warning messages, have you observed much redundant buzz texts? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20076: [SPARK-21786][SQL] When acquiring 'compressionCodecClass...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20076 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85740/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20076: [SPARK-21786][SQL] When acquiring 'compressionCodecClass...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20076 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20076: [SPARK-21786][SQL] When acquiring 'compressionCodecClass...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20076 **[Test build #85740 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85740/testReport)** for PR 20076 at commit [`9466797`](https://github.com/apache/spark/commit/946679745f16838932e74fabb70f2ad702fa4640). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20076: [SPARK-21786][SQL] When acquiring 'compressionCodecClass...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20076 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85739/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20076: [SPARK-21786][SQL] When acquiring 'compressionCodecClass...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20076 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20076: [SPARK-21786][SQL] When acquiring 'compressionCodecClass...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20076 **[Test build #85739 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85739/testReport)** for PR 20076 at commit [`26c1c61`](https://github.com/apache/spark/commit/26c1c61ffd5742b71aefdec33ddcb69ba944). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19943 **[Test build #85744 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85744/testReport)** for PR 19943 at commit [`aeb6abd`](https://github.com/apache/spark/commit/aeb6abd66ee3338635edf9dca85894c14a05fb72). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19943 **[Test build #85743 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85743/testReport)** for PR 19943 at commit [`fca6a5f`](https://github.com/apache/spark/commit/fca6a5fe83c46d9cb1c2bdf163920a82fcd0b7a2). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...
Github user sameeragarwal commented on a diff in the pull request: https://github.com/apache/spark/pull/19080#discussion_r160017940 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -73,46 +127,31 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution { "An AllTuples should be used to represent a distribution that only has " + "a single partition.") - // TODO: This is not really valid... - def clustering: Set[Expression] = ordering.map(_.child).toSet + override def requiredNumPartitions: Option[Int] = None + + override def createPartitioning(numPartitions: Int): Partitioning = { +RangePartitioning(ordering, numPartitions) + } } /** * Represents data where tuples are broadcasted to every node. It is quite common that the * entire set of tuples is transformed into different data structure. */ -case class BroadcastDistribution(mode: BroadcastMode) extends Distribution +case class BroadcastDistribution(mode: BroadcastMode) extends Distribution { --- End diff -- Similarly, how about `BroadcastPartitioning` just satisfying the `AllTuples` distribution? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...
Github user sameeragarwal commented on a diff in the pull request: https://github.com/apache/spark/pull/19080#discussion_r160017879 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -51,12 +76,41 @@ case object AllTuples extends Distribution */ case class ClusteredDistribution( clustering: Seq[Expression], -numPartitions: Option[Int] = None) extends Distribution { +requiredNumPartitions: Option[Int] = None) extends Distribution { require( clustering != Nil, "The clustering expressions of a ClusteredDistribution should not be Nil. " + "An AllTuples should be used to represent a distribution that only has " + "a single partition.") + + override def createPartitioning(numPartitions: Int): Partitioning = { +assert(requiredNumPartitions.isEmpty || requiredNumPartitions.get == numPartitions, + s"This ClusteredDistribution requires ${requiredNumPartitions.get} partitions, but " + +s"the actual number of partitions is $numPartitions.") +HashPartitioning(clustering, numPartitions) + } +} + +/** + * Represents data where tuples have been partitioned according to the hash of the given + * `expressions`. The hash function is defined as `HashPartitioning.partitionIdExpression`, so only + * [[HashPartitioning]] can satisfy this distribution. + * + * This is a strictly stronger guarantee than [[ClusteredDistribution]]. Given a tuple and the + * number of partitions, this distribution strictly requires which partition the tuple should be in. + */ +case class HashPartitionedDistribution(expressions: Seq[Expression]) extends Distribution { --- End diff -- Semantically, a `Partitioning` satisfies a `Distribution` so it'd be better not to call this `HashPartitioned`. How about we call this `DeterminsticClusteredDistribution` or `HashClusteredDistribution`? Also perhaps this can just extend `ClusteredDistribution`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...
Github user sameeragarwal commented on a diff in the pull request: https://github.com/apache/spark/pull/19080#discussion_r160018028 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -73,46 +127,31 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution { "An AllTuples should be used to represent a distribution that only has " + "a single partition.") - // TODO: This is not really valid... - def clustering: Set[Expression] = ordering.map(_.child).toSet + override def requiredNumPartitions: Option[Int] = None --- End diff -- Out of curiosity, should an `OrderedDistribution` make any guarantees around clustering? Do we care if "tuples that share the same value for the ordering expressions will never be split across partitions"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20013: [SPARK-20657][core] Speed up rendering of the sta...
Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/20013#discussion_r160017934 --- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala --- @@ -119,118 +121,115 @@ private class LiveTask( import LiveEntityHelpers._ - private var recordedMetrics: v1.TaskMetrics = null + private var metrics: MetricsTracker = new MetricsTracker() var errorMessage: Option[String] = None /** * Update the metrics for the task and return the difference between the previous and new * values. */ - def updateMetrics(metrics: TaskMetrics): v1.TaskMetrics = { + def updateMetrics(metrics: TaskMetrics): MetricsTracker = { if (metrics != null) { - val old = recordedMetrics - recordedMetrics = new v1.TaskMetrics( -metrics.executorDeserializeTime, -metrics.executorDeserializeCpuTime, -metrics.executorRunTime, -metrics.executorCpuTime, -metrics.resultSize, -metrics.jvmGCTime, -metrics.resultSerializationTime, -metrics.memoryBytesSpilled, -metrics.diskBytesSpilled, -metrics.peakExecutionMemory, -new v1.InputMetrics( - metrics.inputMetrics.bytesRead, - metrics.inputMetrics.recordsRead), -new v1.OutputMetrics( - metrics.outputMetrics.bytesWritten, - metrics.outputMetrics.recordsWritten), -new v1.ShuffleReadMetrics( - metrics.shuffleReadMetrics.remoteBlocksFetched, - metrics.shuffleReadMetrics.localBlocksFetched, - metrics.shuffleReadMetrics.fetchWaitTime, - metrics.shuffleReadMetrics.remoteBytesRead, - metrics.shuffleReadMetrics.remoteBytesReadToDisk, - metrics.shuffleReadMetrics.localBytesRead, - metrics.shuffleReadMetrics.recordsRead), -new v1.ShuffleWriteMetrics( - metrics.shuffleWriteMetrics.bytesWritten, - metrics.shuffleWriteMetrics.writeTime, - metrics.shuffleWriteMetrics.recordsWritten)) - if (old != null) calculateMetricsDelta(recordedMetrics, old) else recordedMetrics + val old = this.metrics + val newMetrics = new MetricsTracker() --- End diff -- Changing so many fields here seems ugly..But I respect you preference --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20013: [SPARK-20657][core] Speed up rendering of the stages pag...
Github user squito commented on the issue: https://github.com/apache/spark/pull/20013 lgtm --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20013: [SPARK-20657][core] Speed up rendering of the sta...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20013#discussion_r160017859 --- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala --- @@ -119,118 +121,115 @@ private class LiveTask( import LiveEntityHelpers._ - private var recordedMetrics: v1.TaskMetrics = null + private var metrics: MetricsTracker = new MetricsTracker() var errorMessage: Option[String] = None /** * Update the metrics for the task and return the difference between the previous and new * values. */ - def updateMetrics(metrics: TaskMetrics): v1.TaskMetrics = { + def updateMetrics(metrics: TaskMetrics): MetricsTracker = { if (metrics != null) { - val old = recordedMetrics - recordedMetrics = new v1.TaskMetrics( -metrics.executorDeserializeTime, -metrics.executorDeserializeCpuTime, -metrics.executorRunTime, -metrics.executorCpuTime, -metrics.resultSize, -metrics.jvmGCTime, -metrics.resultSerializationTime, -metrics.memoryBytesSpilled, -metrics.diskBytesSpilled, -metrics.peakExecutionMemory, -new v1.InputMetrics( - metrics.inputMetrics.bytesRead, - metrics.inputMetrics.recordsRead), -new v1.OutputMetrics( - metrics.outputMetrics.bytesWritten, - metrics.outputMetrics.recordsWritten), -new v1.ShuffleReadMetrics( - metrics.shuffleReadMetrics.remoteBlocksFetched, - metrics.shuffleReadMetrics.localBlocksFetched, - metrics.shuffleReadMetrics.fetchWaitTime, - metrics.shuffleReadMetrics.remoteBytesRead, - metrics.shuffleReadMetrics.remoteBytesReadToDisk, - metrics.shuffleReadMetrics.localBytesRead, - metrics.shuffleReadMetrics.recordsRead), -new v1.ShuffleWriteMetrics( - metrics.shuffleWriteMetrics.bytesWritten, - metrics.shuffleWriteMetrics.writeTime, - metrics.shuffleWriteMetrics.recordsWritten)) - if (old != null) calculateMetricsDelta(recordedMetrics, old) else recordedMetrics + val old = this.metrics + val newMetrics = new MetricsTracker() + newMetrics.executorDeserializeTime = metrics.executorDeserializeTime + newMetrics.executorDeserializeCpuTime = metrics.executorDeserializeCpuTime + newMetrics.executorRunTime = metrics.executorRunTime + newMetrics.executorCpuTime = metrics.executorCpuTime + newMetrics.resultSize = metrics.resultSize + newMetrics.jvmGcTime = metrics.jvmGCTime + newMetrics.resultSerializationTime = metrics.resultSerializationTime + newMetrics.memoryBytesSpilled = metrics.memoryBytesSpilled + newMetrics.diskBytesSpilled = metrics.diskBytesSpilled + newMetrics.peakExecutionMemory = metrics.peakExecutionMemory + newMetrics.inputBytesRead = metrics.inputMetrics.bytesRead + newMetrics.inputRecordsRead = metrics.inputMetrics.recordsRead + newMetrics.outputBytesWritten = metrics.outputMetrics.bytesWritten + newMetrics.outputRecordsWritten = metrics.outputMetrics.recordsWritten + newMetrics.shuffleRemoteBlocksFetched = metrics.shuffleReadMetrics.remoteBlocksFetched + newMetrics.shuffleLocalBlocksFetched = metrics.shuffleReadMetrics.localBlocksFetched + newMetrics.shuffleFetchWaitTime = metrics.shuffleReadMetrics.fetchWaitTime + newMetrics.shuffleRemoteBytesRead = metrics.shuffleReadMetrics.remoteBytesRead + newMetrics.shuffleRemoteBytesReadToDisk = metrics.shuffleReadMetrics.remoteBytesReadToDisk + newMetrics.shuffleLocalBytesRead = metrics.shuffleReadMetrics.localBytesRead + newMetrics.shuffleRecordsRead = metrics.shuffleReadMetrics.recordsRead + newMetrics.shuffleBytesWritten = metrics.shuffleWriteMetrics.bytesWritten + newMetrics.shuffleWriteTime = metrics.shuffleWriteMetrics.writeTime + newMetrics.shuffleRecordsWritten = metrics.shuffleWriteMetrics.recordsWritten + + this.metrics = newMetrics + if (old.executorDeserializeTime >= 0L) { +old.subtract(newMetrics) +old + } else { +newMetrics + } } else { null } } - /** - * Return a new TaskMetrics object containing the delta of the various fields of the given - * metrics objects. This is currently targeted at updating stage data, so it does not - * necessarily calculate deltas for all the fields. - */ - private def calculateMetricsDelta( - metrics: v1.TaskMetrics, - old: v1.TaskMetrics): v1.TaskMetrics = { -val shuffleWri
[GitHub] spark issue #20013: [SPARK-20657][core] Speed up rendering of the stages pag...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20013 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20013: [SPARK-20657][core] Speed up rendering of the stages pag...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20013 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85737/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20013: [SPARK-20657][core] Speed up rendering of the stages pag...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20013 **[Test build #85737 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85737/testReport)** for PR 20013 at commit [`c4e7f61`](https://github.com/apache/spark/commit/c4e7f6149fbff39c9f3955f536095ed4fd5df2ff). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20163: [SPARK-22966][PySpark] Spark SQL should handle Py...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20163#discussion_r160017637 --- Diff: python/pyspark/sql/udf.py --- @@ -26,6 +26,28 @@ def _wrap_function(sc, func, returnType): +def coerce_to_str(v): +import datetime +if type(v) == datetime.date or type(v) == datetime.datetime: +return str(v) +else: +return v + +# Pyrolite will unpickle both Python datetime.date and datetime.datetime objects +# into java.util.Calendar objects, so the type information on the Python side is lost. +# This is problematic when Spark SQL needs to cast such objects into Spark SQL string type, +# because the format of the string should be different, depending on the type of the input +# object. So for those two specific types we eagerly convert them to string here, where the +# Python type information is still intact. +if returnType == StringType(): --- End diff -- I have a question, why we need to handle this type conversion? If we expect correct string format, isn't it more reasonable to convert the date/datetime to strings in the udf, instead of adding this conversion implicitly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20163: [SPARK-22966][PySpark] Spark SQL should handle Py...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20163#discussion_r160017370 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala --- @@ -193,6 +193,24 @@ object DateTimeUtils { millisToDays(date.getTime) } + /** + * Returns the number of days since epoch from java.util.Calendar + */ + def fromJavaCalendarForDate(cal: Calendar): SQLDate = { +val ms = cal.getTimeInMillis +cal.getTimeZone match { + case null => millisToDays(ms) + case tz => millisToDays(ms, tz) +} + } + + /** + * Returns SQLTimestamp from java.util.Calendar (microseconds since epoch) --- End diff -- (Matching the comment of fromJavaCalendarForDate.) nit: Returns the number of microseconds since epoch from java.util.Calendar. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160017579 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,493 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var recordReader: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable column vectors of ColumnarBatch. + */ + private var columnVectors: Seq[WritableColumnVector] = _ + + /** + * The number of rows read and considered to be returned. + */ + private var rowsReturned: Long = 0L + + /** + * Total number of rows. + */ + private var totalRowCount: Long = 0L + + override def getCurrentKey: Void = null + + override def getCurrentValue: ColumnarBatch = columnarBatch + + override def getProgress: Float = rowsReturned.toFloat / totalRowCount + + override def nextKeyValue(): Boolean = nextBatch() + + override def close(): Unit = { +if (columnarBatch != null) { + columnarBatch.close() + columnarBatch = null +} +if (recordReader != null) { + recordReader.close() + recordReader = null +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): Unit = { +val fileSplit = inputSplit.asInstanceOf[FileSplit] +val conf = taskAttemptContext.getConfiguration +reader = OrcFile.createReader( + fileSplit.getPath, + OrcFile.readerOptions(conf) +.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) +.filesystem(fileSplit.getPath.getFileSystem(conf))) + +val options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart, fileSplit.getLength) +recordReader = reader.rows(options) +totalRowCount = reader.getNumberOfRows + } + + /** + * Set required schema and partition information. + * With this information, this creates ColumnarBatch with the full schema. + */ + def setRequiredSchema( + orcSchema: TypeDescription, + requestedColIds: Array[Int], + resultSchema: StructType, + requiredSchema
[GitHub] spark issue #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/19943 I answered at the comment~ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160017549 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.internal.Logging +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] with Logging { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var rows: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable columnVectors of ColumnarBatch. + */ + private var columnVectors: Seq[WritableColumnVector] = _ + + /** + * The number of rows read and considered to be returned. + */ + private var rowsReturned: Long = 0L + + /** + * Total number of rows. + */ + private var totalRowCount: Long = 0L + + override def getCurrentKey: Void = null + + override def getCurrentValue: ColumnarBatch = columnarBatch + + override def getProgress: Float = rowsReturned.toFloat / totalRowCount + + override def nextKeyValue(): Boolean = nextBatch() + + override def close(): Unit = { +if (columnarBatch != null) { + columnarBatch.close() + columnarBatch = null +} +if (rows != null) { + rows.close() + rows = null +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): Unit = { +val fileSplit = inputSplit.asInstanceOf[FileSplit] +val conf = taskAttemptContext.getConfiguration +reader = OrcFile.createReader( + fileSplit.getPath, + OrcFile.readerOptions(conf) +.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) +.filesystem(fileSplit.getPath.getFileSystem(conf))) + +val options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart, fileSplit.getLength) +rows = reader.rows(options) + } + + /** + * Set required schema and partition information. + * With this information, this creates ColumnarBatch with the full schema. + */ + def setRequiredSchema( + orcSchema: TypeDescription, + requestedColIds: Array[Int], + resultSchema: StructType, + requiredSchema: StructType, + partitionValues: InternalRow): Unit = { +batch = orcSchema.createRowBatch(DEFAULT_SIZE) +assert(!batch.selectedInUse) +t
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160017493 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala --- @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure ORC read performance. + * + * This is in `sql/hive` module in order to compare `sql/core` and `sql/hive` ORC data sources. + */ +// scalastyle:off line.size.limit +object OrcReadBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("OrcReadBenchmark") +.config(conf) +.getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true") + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private val NATIVE_ORC_FORMAT = "org.apache.spark.sql.execution.datasources.orc.OrcFileFormat" + private val HIVE_ORC_FORMAT = "org.apache.spark.sql.hive.orc.OrcFileFormat" + + private def prepareTable(dir: File, df: DataFrame, partition: Option[String] = None): Unit = { +val dirORC = dir.getCanonicalPath + +if (partition.isDefined) { + df.write.partitionBy(partition.get).orc(dirORC) +} else { + df.write.orc(dirORC) +} + + spark.read.format(NATIVE_ORC_FORMAT).load(dirORC).createOrReplaceTempView("nativeOrcTable") + spark.read.format(HIVE_ORC_FORMAT).load(dirORC).createOrReplaceTempView("hiveOrcTable") + } + + def numericScanBenchmark(values: Int, dataType: DataType): Unit = { +val sqlBenchmark = new Benchmark(s"SQL Single ${dataType.sql} Column Scan", values) + +withTempPath { dir => + withTempTable("t1", "nativeOrcTable", "hiveOrcTable") { +import spark.implicits._ +spark.range(values).map(_ => Random.nextLong).createOrReplaceTempView("t1") + +prepareTable(dir, spark.sql(s"SELECT CAST(value as ${dataType.sql}) id FROM t1")) + +sqlBenchmark.addCase("Native ORC Vectorized") { _ => + spark.sql("SELECT sum(id) FROM nativeOrcTable").collect() +} + +sqlBenchmark.addCase("Native ORC MR") { _ => + withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") { +spark.sql("SELECT sum(id) FROM nativeOrcTable").collect() + } +} + +sqlBenchmark.addCase("Hive built-in ORC") { _ => + spark.sql("SELECT sum(id) FROM hiveOrcTable").collect() +} + +/* +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.1 +Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz + +SQL Single TINYINT Colum
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160017477 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala --- @@ -110,4 +107,23 @@ object OrcUtils extends Logging { } } } + + /** + * Return a fixed ORC schema with data schema information, if needed. + * The schema inside old ORC files might consist of invalid column names like '_col0'. + */ + def getFixedTypeDescription( --- End diff -- Okay. I'll remove this from this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20168: SPARK-22730 Add ImageSchema support for non-integer imag...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20168 Let's fix the PR title to `[SPARK-22730][ML] ...` BTW. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160017338 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala --- @@ -139,15 +146,25 @@ class OrcFileFormat } } +val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields) +val enableVectorizedReader = sparkSession.sessionState.conf.orcVectorizedReaderEnabled && + supportBatch(sparkSession, resultSchema) --- End diff -- Right. It's fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160017293 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,493 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var recordReader: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable column vectors of ColumnarBatch. + */ + private var columnVectors: Seq[WritableColumnVector] = _ + + /** + * The number of rows read and considered to be returned. + */ + private var rowsReturned: Long = 0L + + /** + * Total number of rows. + */ + private var totalRowCount: Long = 0L + + override def getCurrentKey: Void = null + + override def getCurrentValue: ColumnarBatch = columnarBatch + + override def getProgress: Float = rowsReturned.toFloat / totalRowCount + + override def nextKeyValue(): Boolean = nextBatch() + + override def close(): Unit = { +if (columnarBatch != null) { + columnarBatch.close() + columnarBatch = null +} +if (recordReader != null) { + recordReader.close() + recordReader = null +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): Unit = { +val fileSplit = inputSplit.asInstanceOf[FileSplit] +val conf = taskAttemptContext.getConfiguration +reader = OrcFile.createReader( + fileSplit.getPath, + OrcFile.readerOptions(conf) +.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) +.filesystem(fileSplit.getPath.getFileSystem(conf))) + +val options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart, fileSplit.getLength) +recordReader = reader.rows(options) +totalRowCount = reader.getNumberOfRows + } + + /** + * Set required schema and partition information. + * With this information, this creates ColumnarBatch with the full schema. + */ + def setRequiredSchema( + orcSchema: TypeDescription, + requestedColIds: Array[Int], + resultSchema: StructType, + requiredSchema:
[GitHub] spark issue #20168: SPARK-22730 Add ImageSchema support for non-integer imag...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20168 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20142: [SPARK-22930][PYTHON][SQL] Improve the description of Ve...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/20142 LGTM with minor comments regarding naming. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20168: SPARK-22730 Add ImageSchema support for non-integer imag...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20168 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85742/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20168: SPARK-22730 Add ImageSchema support for non-integer imag...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20168 **[Test build #85742 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85742/testReport)** for PR 20168 at commit [`70bae2f`](https://github.com/apache/spark/commit/70bae2f7e9d85a5f464f1bfc3a9426136259d5d1). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20142: [SPARK-22930][PYTHON][SQL] Improve the descriptio...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20142#discussion_r160017182 --- Diff: python/pyspark/sql/tests.py --- @@ -3950,6 +3975,33 @@ def test_vectorized_udf_timestamps_respect_session_timezone(self): finally: self.spark.conf.set("spark.sql.session.timeZone", orig_tz) +def test_nondeterministic_udf(self): +# Test that nondeterministic UDFs are evaluated only once in chained UDF evaluations +from pyspark.sql.functions import udf, pandas_udf, col + +@pandas_udf('double') +def plus_ten(v): +return v + 10 +random_udf = self.random_udf + +df = self.spark.range(10).withColumn('rand', random_udf(col('id'))) +result1 = df.withColumn('plus_ten(rand)', plus_ten(df['rand'])).toPandas() + +self.assertEqual(random_udf.deterministic, False) +self.assertTrue(result1['plus_ten(rand)'].equals(result1['rand'] + 10)) + +def test_nondeterministic_udf_in_aggregate(self): --- End diff -- test_vectorized_nondeterministic_udf_in_aggregate --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20142: [SPARK-22930][PYTHON][SQL] Improve the descriptio...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20142#discussion_r160017215 --- Diff: python/pyspark/sql/tests.py --- @@ -3567,6 +3580,18 @@ def tearDownClass(cls): time.tzset() ReusedSQLTestCase.tearDownClass() +@property +def random_udf(self): --- End diff -- Maybe `nondeterministic_udf`. So we don't have duplicate name to `random_udf` too.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20142: [SPARK-22930][PYTHON][SQL] Improve the descriptio...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/20142#discussion_r160017176 --- Diff: python/pyspark/sql/tests.py --- @@ -3950,6 +3975,33 @@ def test_vectorized_udf_timestamps_respect_session_timezone(self): finally: self.spark.conf.set("spark.sql.session.timeZone", orig_tz) +def test_nondeterministic_udf(self): --- End diff -- test_vectorized_nondeterministic_udf --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20142: [SPARK-22930][PYTHON][SQL] Improve the description of Ve...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20142 LGTM except for the one minor comment --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160017124 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,493 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var recordReader: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable column vectors of ColumnarBatch. + */ + private var columnVectors: Seq[WritableColumnVector] = _ + + /** + * The number of rows read and considered to be returned. + */ + private var rowsReturned: Long = 0L + + /** + * Total number of rows. + */ + private var totalRowCount: Long = 0L + + override def getCurrentKey: Void = null + + override def getCurrentValue: ColumnarBatch = columnarBatch + + override def getProgress: Float = rowsReturned.toFloat / totalRowCount + + override def nextKeyValue(): Boolean = nextBatch() + + override def close(): Unit = { +if (columnarBatch != null) { + columnarBatch.close() + columnarBatch = null +} +if (recordReader != null) { + recordReader.close() + recordReader = null +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): Unit = { +val fileSplit = inputSplit.asInstanceOf[FileSplit] +val conf = taskAttemptContext.getConfiguration +reader = OrcFile.createReader( + fileSplit.getPath, + OrcFile.readerOptions(conf) +.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) +.filesystem(fileSplit.getPath.getFileSystem(conf))) + +val options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart, fileSplit.getLength) +recordReader = reader.rows(options) +totalRowCount = reader.getNumberOfRows + } + + /** + * Set required schema and partition information. + * With this information, this creates ColumnarBatch with the full schema. + */ + def setRequiredSchema( + orcSchema: TypeDescription, + requestedColIds: Array[Int], + resultSchema: StructType, + requiredSchema
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160017101 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,493 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var recordReader: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable column vectors of ColumnarBatch. + */ + private var columnVectors: Seq[WritableColumnVector] = _ + + /** + * The number of rows read and considered to be returned. + */ + private var rowsReturned: Long = 0L + + /** + * Total number of rows. + */ + private var totalRowCount: Long = 0L + + override def getCurrentKey: Void = null + + override def getCurrentValue: ColumnarBatch = columnarBatch + + override def getProgress: Float = rowsReturned.toFloat / totalRowCount + + override def nextKeyValue(): Boolean = nextBatch() + + override def close(): Unit = { +if (columnarBatch != null) { + columnarBatch.close() + columnarBatch = null +} +if (recordReader != null) { + recordReader.close() + recordReader = null +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): Unit = { +val fileSplit = inputSplit.asInstanceOf[FileSplit] +val conf = taskAttemptContext.getConfiguration +reader = OrcFile.createReader( + fileSplit.getPath, + OrcFile.readerOptions(conf) +.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) +.filesystem(fileSplit.getPath.getFileSystem(conf))) + +val options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart, fileSplit.getLength) +recordReader = reader.rows(options) +totalRowCount = reader.getNumberOfRows + } + + /** + * Set required schema and partition information. + * With this information, this creates ColumnarBatch with the full schema. + */ + def setRequiredSchema( + orcSchema: TypeDescription, + requestedColIds: Array[Int], + resultSchema: StructType, + requiredSchema
[GitHub] spark pull request #20142: [SPARK-22930][PYTHON][SQL] Improve the descriptio...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20142#discussion_r160017026 --- Diff: python/pyspark/sql/tests.py --- @@ -3567,6 +3580,18 @@ def tearDownClass(cls): time.tzset() ReusedSQLTestCase.tearDownClass() +@property +def random_udf(self): --- End diff -- Could we add "nondeterministic" in its name somehow? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20168: SPARK-22730 Add ImageSchema support for non-integ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20168#discussion_r160016767 --- Diff: python/pyspark/ml/image.py --- @@ -71,9 +88,30 @@ def ocvTypes(self): """ if self._ocvTypes is None: -ctx = SparkContext._active_spark_context -self._ocvTypes = dict(ctx._jvm.org.apache.spark.ml.image.ImageSchema.javaOcvTypes()) -return self._ocvTypes +ctx = SparkContext.getOrCreate() +ocvTypeList = ctx._jvm.org.apache.spark.ml.image.ImageSchema.javaOcvTypes() +self._ocvTypes = [self._OcvType(name=x.name(), +mode=x.mode(), +nChannels=x.nChannels(), +dataType=x.dataType(), + nptype=self._ocvToNumpyMap[x.dataType()]) + for x in ocvTypeList] +return self._ocvTypes[:] + +def ocvTypeByName(self, name): +if self._ocvTypesByName is None: +self._ocvTypesByName = {x.name: x for x in self.ocvTypes} +if name not in self._ocvTypesByName: +raise ValueError( +"Can not find matching OpenCvFormat for type = '%s'; supported formats are = %s" % +(name, str( +self._ocvTypesByName.keys( +return self._ocvTypesByName[name] + +def ocvTypeByMode(self, mode): --- End diff -- Is it meant to be public? Seems doc is missing and this one doesn't look consistent with Scala side? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20168: SPARK-22730 Add ImageSchema support for non-integ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20168#discussion_r160016719 --- Diff: python/pyspark/ml/image.py --- @@ -71,9 +88,30 @@ def ocvTypes(self): """ if self._ocvTypes is None: -ctx = SparkContext._active_spark_context -self._ocvTypes = dict(ctx._jvm.org.apache.spark.ml.image.ImageSchema.javaOcvTypes()) -return self._ocvTypes +ctx = SparkContext.getOrCreate() +ocvTypeList = ctx._jvm.org.apache.spark.ml.image.ImageSchema.javaOcvTypes() +self._ocvTypes = [self._OcvType(name=x.name(), +mode=x.mode(), +nChannels=x.nChannels(), +dataType=x.dataType(), + nptype=self._ocvToNumpyMap[x.dataType()]) + for x in ocvTypeList] +return self._ocvTypes[:] --- End diff -- Is it for copy? I usually do `list(self._ocvTypes)` tho. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20168: SPARK-22730 Add ImageSchema support for non-integ...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/20168#discussion_r160016683 --- Diff: mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala --- @@ -37,20 +37,51 @@ import org.apache.spark.sql.types._ @Since("2.3.0") object ImageSchema { - val undefinedImageType = "Undefined" - /** - * (Scala-specific) OpenCV type mapping supported + * OpenCv type representation + * @param mode ordinal for the type + * @param dataType open cv data type + * @param nChannels number of color channels */ - val ocvTypes: Map[String, Int] = Map( -undefinedImageType -> -1, -"CV_8U" -> 0, "CV_8UC1" -> 0, "CV_8UC3" -> 16, "CV_8UC4" -> 24 - ) + case class OpenCvType(mode: Int, dataType: String, nChannels: Int) { +def name: String = "CV_" + dataType + "C" + nChannels +override def toString: String = "OpenCvType(mode = " + mode + ", name = " + name + ")" + } + + object OpenCvType { +def get(name: String): OpenCvType = { + ocvTypes.find(x => x.name == name).getOrElse( +throw new IllegalArgumentException("Unknown open cv type " + name)) +} +def get(mode: Int): OpenCvType = { + ocvTypes.find(x => x.mode == mode).getOrElse( +throw new IllegalArgumentException("Unknown open cv mode " + mode)) +} +val undefinedType = OpenCvType(-1, "N/A", -1) + } /** - * (Java-specific) OpenCV type mapping supported + * A Mapping of Type to Numbers in OpenCV + * + *C1 C2 C3 C4 + * CV_8U 0 8 16 24 + * CV_8S 1 9 17 25 + * CV_16U 2 10 18 26 + * CV_16S 3 11 19 27 + * CV_32S 4 12 20 28 + * CV_32F 5 13 21 29 + * CV_64F 6 14 22 30 */ - val javaOcvTypes: java.util.Map[String, Int] = ocvTypes.asJava + val ocvTypes = { +val types = + for (nc <- Array(1, 2, 3, 4); + dt <- Array("8U", "8S", "16U", "16S", "32S", "32F", "64F")) +yield (dt, nc) +val ordinals = for (i <- 0 to 3; j <- 0 to 6) yield ( i * 8 + j) +OpenCvType.undefinedType +: (ordinals zip types).map(x => OpenCvType(x._1, x._2._1, x._2._2)) + } + + val javaOcvTypes = ocvTypes.asJava --- End diff -- Hm .. why did we remove the doc here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20168: SPARK-22730 Add ImageSchema support for non-integer imag...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20168 **[Test build #85742 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85742/testReport)** for PR 20168 at commit [`70bae2f`](https://github.com/apache/spark/commit/70bae2f7e9d85a5f464f1bfc3a9426136259d5d1). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20168: SPARK-22730 Add ImageSchema support for non-integer imag...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20168 cc @jkbradley, @imatiach-msft, @MrBago and @thunterdb. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20168: SPARK-22730 Add ImageSchema support for non-integer imag...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20168 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20163: [SPARK-22966][PySpark] Spark SQL should handle Python UD...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20163 @ueshin @icexelloss @cloud-fan @rednaxelafx, which one would you prefer? To me, I like 1 at most. If the perf diff is trivial, 2. is also fine. If 3. works fine, I think I am also fine with it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/19943 overall looks good, my major concern is https://github.com/apache/spark/pull/19943/files#r159221758 , do you have an answer? This may be a big drawback compared to the wrapper solution. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160016468 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcReadBenchmark.scala --- @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import java.io.File + +import scala.util.{Random, Try} + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ +import org.apache.spark.util.{Benchmark, Utils} + + +/** + * Benchmark to measure ORC read performance. + * + * This is in `sql/hive` module in order to compare `sql/core` and `sql/hive` ORC data sources. + */ +// scalastyle:off line.size.limit +object OrcReadBenchmark { + val conf = new SparkConf() + conf.set("orc.compression", "snappy") + + private val spark = SparkSession.builder() +.master("local[1]") +.appName("OrcReadBenchmark") +.config(conf) +.getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key, "true") + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + private val NATIVE_ORC_FORMAT = "org.apache.spark.sql.execution.datasources.orc.OrcFileFormat" + private val HIVE_ORC_FORMAT = "org.apache.spark.sql.hive.orc.OrcFileFormat" + + private def prepareTable(dir: File, df: DataFrame, partition: Option[String] = None): Unit = { +val dirORC = dir.getCanonicalPath + +if (partition.isDefined) { + df.write.partitionBy(partition.get).orc(dirORC) +} else { + df.write.orc(dirORC) +} + + spark.read.format(NATIVE_ORC_FORMAT).load(dirORC).createOrReplaceTempView("nativeOrcTable") + spark.read.format(HIVE_ORC_FORMAT).load(dirORC).createOrReplaceTempView("hiveOrcTable") + } + + def numericScanBenchmark(values: Int, dataType: DataType): Unit = { +val sqlBenchmark = new Benchmark(s"SQL Single ${dataType.sql} Column Scan", values) + +withTempPath { dir => + withTempTable("t1", "nativeOrcTable", "hiveOrcTable") { +import spark.implicits._ +spark.range(values).map(_ => Random.nextLong).createOrReplaceTempView("t1") + +prepareTable(dir, spark.sql(s"SELECT CAST(value as ${dataType.sql}) id FROM t1")) + +sqlBenchmark.addCase("Native ORC Vectorized") { _ => + spark.sql("SELECT sum(id) FROM nativeOrcTable").collect() +} + +sqlBenchmark.addCase("Native ORC MR") { _ => + withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false") { +spark.sql("SELECT sum(id) FROM nativeOrcTable").collect() + } +} + +sqlBenchmark.addCase("Hive built-in ORC") { _ => + spark.sql("SELECT sum(id) FROM hiveOrcTable").collect() +} + +/* +Java HotSpot(TM) 64-Bit Server VM 1.8.0_152-b16 on Mac OS X 10.13.1 +Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz + +SQL Single TINYINT Column Sc
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160016446 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala --- @@ -110,4 +107,23 @@ object OrcUtils extends Logging { } } } + + /** + * Return a fixed ORC schema with data schema information, if needed. + * The schema inside old ORC files might consist of invalid column names like '_col0'. + */ + def getFixedTypeDescription( --- End diff -- Do we really need this? The ORC schema is used to create the ORC batch, for the batch data I think we only care about the data types not field names. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160016431 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,493 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var recordReader: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable column vectors of ColumnarBatch. + */ + private var columnVectors: Seq[WritableColumnVector] = _ + + /** + * The number of rows read and considered to be returned. + */ + private var rowsReturned: Long = 0L + + /** + * Total number of rows. + */ + private var totalRowCount: Long = 0L + + override def getCurrentKey: Void = null + + override def getCurrentValue: ColumnarBatch = columnarBatch + + override def getProgress: Float = rowsReturned.toFloat / totalRowCount + + override def nextKeyValue(): Boolean = nextBatch() + + override def close(): Unit = { +if (columnarBatch != null) { + columnarBatch.close() + columnarBatch = null +} +if (recordReader != null) { + recordReader.close() + recordReader = null +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): Unit = { +val fileSplit = inputSplit.asInstanceOf[FileSplit] +val conf = taskAttemptContext.getConfiguration +reader = OrcFile.createReader( + fileSplit.getPath, + OrcFile.readerOptions(conf) +.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) +.filesystem(fileSplit.getPath.getFileSystem(conf))) + +val options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart, fileSplit.getLength) +recordReader = reader.rows(options) +totalRowCount = reader.getNumberOfRows + } + + /** + * Set required schema and partition information. + * With this information, this creates ColumnarBatch with the full schema. + */ + def setRequiredSchema( + orcSchema: TypeDescription, + requestedColIds: Array[Int], + resultSchema: StructType, + requiredSchema: St
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160016423 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,493 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var recordReader: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable column vectors of ColumnarBatch. + */ + private var columnVectors: Seq[WritableColumnVector] = _ + + /** + * The number of rows read and considered to be returned. + */ + private var rowsReturned: Long = 0L + + /** + * Total number of rows. + */ + private var totalRowCount: Long = 0L + + override def getCurrentKey: Void = null + + override def getCurrentValue: ColumnarBatch = columnarBatch + + override def getProgress: Float = rowsReturned.toFloat / totalRowCount + + override def nextKeyValue(): Boolean = nextBatch() + + override def close(): Unit = { +if (columnarBatch != null) { + columnarBatch.close() + columnarBatch = null +} +if (recordReader != null) { + recordReader.close() + recordReader = null +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): Unit = { +val fileSplit = inputSplit.asInstanceOf[FileSplit] +val conf = taskAttemptContext.getConfiguration +reader = OrcFile.createReader( + fileSplit.getPath, + OrcFile.readerOptions(conf) +.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) +.filesystem(fileSplit.getPath.getFileSystem(conf))) + +val options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart, fileSplit.getLength) +recordReader = reader.rows(options) +totalRowCount = reader.getNumberOfRows + } + + /** + * Set required schema and partition information. + * With this information, this creates ColumnarBatch with the full schema. + */ + def setRequiredSchema( + orcSchema: TypeDescription, + requestedColIds: Array[Int], + resultSchema: StructType, + requiredSchema: St
[GitHub] spark issue #20076: [SPARK-21786][SQL] When acquiring 'compressionCodecClass...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20076 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20076: [SPARK-21786][SQL] When acquiring 'compressionCodecClass...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20076 **[Test build #85741 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85741/testReport)** for PR 20076 at commit [`1a8c654`](https://github.com/apache/spark/commit/1a8c654805656d3e143c2e63355c7b6365dac471). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20076: [SPARK-21786][SQL] When acquiring 'compressionCodecClass...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20076 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85741/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20076: [SPARK-21786][SQL] When acquiring 'compressionCodecClass...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20076 **[Test build #85741 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85741/testReport)** for PR 20076 at commit [`1a8c654`](https://github.com/apache/spark/commit/1a8c654805656d3e143c2e63355c7b6365dac471). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160016334 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,493 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var recordReader: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable column vectors of ColumnarBatch. + */ + private var columnVectors: Seq[WritableColumnVector] = _ + + /** + * The number of rows read and considered to be returned. + */ + private var rowsReturned: Long = 0L + + /** + * Total number of rows. + */ + private var totalRowCount: Long = 0L + + override def getCurrentKey: Void = null + + override def getCurrentValue: ColumnarBatch = columnarBatch + + override def getProgress: Float = rowsReturned.toFloat / totalRowCount + + override def nextKeyValue(): Boolean = nextBatch() + + override def close(): Unit = { +if (columnarBatch != null) { + columnarBatch.close() + columnarBatch = null +} +if (recordReader != null) { + recordReader.close() + recordReader = null +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): Unit = { +val fileSplit = inputSplit.asInstanceOf[FileSplit] +val conf = taskAttemptContext.getConfiguration +reader = OrcFile.createReader( + fileSplit.getPath, + OrcFile.readerOptions(conf) +.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) +.filesystem(fileSplit.getPath.getFileSystem(conf))) + +val options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart, fileSplit.getLength) +recordReader = reader.rows(options) +totalRowCount = reader.getNumberOfRows + } + + /** + * Set required schema and partition information. + * With this information, this creates ColumnarBatch with the full schema. + */ + def setRequiredSchema( + orcSchema: TypeDescription, + requestedColIds: Array[Int], + resultSchema: StructType, + requiredSchema: St
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160016341 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,493 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var recordReader: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable column vectors of ColumnarBatch. + */ + private var columnVectors: Seq[WritableColumnVector] = _ + + /** + * The number of rows read and considered to be returned. + */ + private var rowsReturned: Long = 0L + + /** + * Total number of rows. + */ + private var totalRowCount: Long = 0L + + override def getCurrentKey: Void = null + + override def getCurrentValue: ColumnarBatch = columnarBatch + + override def getProgress: Float = rowsReturned.toFloat / totalRowCount + + override def nextKeyValue(): Boolean = nextBatch() + + override def close(): Unit = { +if (columnarBatch != null) { + columnarBatch.close() + columnarBatch = null +} +if (recordReader != null) { + recordReader.close() + recordReader = null +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): Unit = { +val fileSplit = inputSplit.asInstanceOf[FileSplit] +val conf = taskAttemptContext.getConfiguration +reader = OrcFile.createReader( + fileSplit.getPath, + OrcFile.readerOptions(conf) +.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) +.filesystem(fileSplit.getPath.getFileSystem(conf))) + +val options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart, fileSplit.getLength) +recordReader = reader.rows(options) +totalRowCount = reader.getNumberOfRows + } + + /** + * Set required schema and partition information. + * With this information, this creates ColumnarBatch with the full schema. + */ + def setRequiredSchema( + orcSchema: TypeDescription, + requestedColIds: Array[Int], + resultSchema: StructType, + requiredSchema: St
[GitHub] spark issue #20076: [SPARK-21786][SQL] When acquiring 'compressionCodecClass...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20076 **[Test build #85740 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85740/testReport)** for PR 20076 at commit [`9466797`](https://github.com/apache/spark/commit/946679745f16838932e74fabb70f2ad702fa4640). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20029: [SPARK-22793][SQL]Memory leak in Spark Thrift Server
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20029 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85736/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20076: [SPARK-21786][SQL] When acquiring 'compressionCodecClass...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20076 **[Test build #85739 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85739/testReport)** for PR 20076 at commit [`26c1c61`](https://github.com/apache/spark/commit/26c1c61ffd5742b71aefdec33ddcb69ba944). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20029: [SPARK-22793][SQL]Memory leak in Spark Thrift Server
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20029 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20029: [SPARK-22793][SQL]Memory leak in Spark Thrift Server
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20029 **[Test build #85736 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85736/testReport)** for PR 20029 at commit [`2b1e166`](https://github.com/apache/spark/commit/2b1e166f4f43b300d272fc7ce1d9d7997f7ae3cd). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20166: [SPARK-22973][SQL] Fix incorrect results of Casting Map ...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20166 LGTM except one minor comment --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20166: [SPARK-22973][SQL] Fix incorrect results of Casti...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20166#discussion_r160016007 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala --- @@ -228,6 +228,35 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String builder.append("]") builder.build() }) +case MapType(kt, vt, _) => + buildCast[MapData](_, map => { +val builder = new UTF8StringBuilder +builder.append("[") +if (map.numElements > 0) { + val keyToUTF8String = castToString(kt) + val valueToUTF8String = castToString(vt) + builder.append(keyToUTF8String(map.keyArray().get(0, kt)).asInstanceOf[UTF8String]) --- End diff -- `map.keyArray()` and `map.valueArray` appear many times, we can create 2 local variables for them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20029: [SPARK-22793][SQL]Memory leak in Spark Thrift Server
Github user viirya commented on the issue: https://github.com/apache/spark/pull/20029 > The hiveClient created for the resourceLoader is only used to addJar, which is, in turn, to add Jar to the shared IsolatedClientLoader. Then we can just use the shared hive client for this purpose. Shouldn't `addJar` be session-based? At least seems in Hive it is: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Cli#LanguageManualCli-HiveResources Although looks like in `SessionResourceLoader` for `SessionState`, `addJar` isn't session-based too. So at least seems we have consistent behavior. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20163: [SPARK-22966][PySpark] Spark SQL should handle Python UD...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20163 Hey @rednaxelafx that's fine. We all make mistake and I usually think it's always better then not trying. I also made a mistake at the first time. It was easier to debug this with your comments and details in the PR description. Thank you. > I'd like to wait for more discussions / suggestions on whether or not we want a behavior change that makes this reproducer work, or a simple document change that'll just say PySpark doesn't support mismatching returnType. So, few options might be ... 1. Simple document this 2. `str` logics in `type.StringType` - in this case, I think we should do a small banchmark. It'd would be so hard and I think you could reuse commands I used here - https://github.com/apache/spark/pull/19246#discussion_r139874732 3. Investigate the way to register a custom Pyrolite unpickler that converts `datetime.date*` to `Timestamp` or `Date`. I believe we already have some custom fixes there. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over Struct...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/20010 Overall, it is reasonable. What is the current behavior in Hive? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20170: [SPARK-22960][K8S] Revert use of ARG base_image in image...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20170 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20170: [SPARK-22960][K8S] Revert use of ARG base_image in image...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20170 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85734/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20170: [SPARK-22960][K8S] Revert use of ARG base_image in image...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20170 **[Test build #85734 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85734/testReport)** for PR 20170 at commit [`2f8d0b9`](https://github.com/apache/spark/commit/2f8d0b96babb47abce2f2af8d36c33c429eb7257). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17968: [SPARK-9792] Make DenseMatrix equality semantical
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17968 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17968: [SPARK-9792] Make DenseMatrix equality semantical
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/17968 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85738/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17968: [SPARK-9792] Make DenseMatrix equality semantical
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17968 **[Test build #85738 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85738/testReport)** for PR 17968 at commit [`311c94a`](https://github.com/apache/spark/commit/311c94a3d608b0b86f3ce39415639ec260e5af37). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17968: [SPARK-9792] Make DenseMatrix equality semantical
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/17968 **[Test build #85738 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85738/testReport)** for PR 17968 at commit [`311c94a`](https://github.com/apache/spark/commit/311c94a3d608b0b86f3ce39415639ec260e5af37). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17968: [SPARK-9792] Make DenseMatrix equality semantical
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/17968 cc @WeichenXu123 @yanboliang --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17968: [SPARK-9792] Make DenseMatrix equality semantical
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/17968 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20170: [SPARK-22960][K8S] Revert use of ARG base_image i...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20170 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20170: [SPARK-22960][K8S] Revert use of ARG base_image in image...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/20170 Tests are taking to long... Merging to master / 2.3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20098: [SPARK-22914][DEPLOY] Register history.ui.port
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20098 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20135: [SPARK-22937][SQL] SQL elt output binary for bina...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20135 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20135: [SPARK-22937][SQL] SQL elt output binary for bina...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20135#discussion_r160012524 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala --- @@ -271,33 +271,45 @@ case class ConcatWs(children: Seq[Expression]) } } +/** + * An expression that returns the `n`-th input in given inputs. + * If all inputs are binary, `elt` returns an output as binary. Otherwise, it returns as string. + * If any input is null, `elt` returns null. + */ // scalastyle:off line.size.limit @ExpressionDescription( - usage = "_FUNC_(n, str1, str2, ...) - Returns the `n`-th string, e.g., returns `str2` when `n` is 2.", + usage = "_FUNC_(n, input1, input2, ...) - Returns the `n`-th input, e.g., returns `input2` when `n` is 2.", examples = """ Examples: > SELECT _FUNC_(1, 'scala', 'java'); scala """) // scalastyle:on line.size.limit -case class Elt(children: Seq[Expression]) - extends Expression with ImplicitCastInputTypes { +case class Elt(children: Seq[Expression]) extends Expression { private lazy val indexExpr = children.head - private lazy val stringExprs = children.tail.toArray + private lazy val inputExprs = children.tail.toArray /** This expression is always nullable because it returns null if index is out of range. */ override def nullable: Boolean = true - override def dataType: DataType = StringType - - override def inputTypes: Seq[DataType] = IntegerType +: Seq.fill(children.size - 1)(StringType) + override def dataType: DataType = inputExprs.map(_.dataType).headOption.getOrElse(StringType) --- End diff -- We issue an exception when the input argument is 1. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20135: [SPARK-22937][SQL] SQL elt output binary for binary inpu...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/20135 Thanks! Merged to master/2.3 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20135: [SPARK-22937][SQL] SQL elt output binary for binary inpu...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/20135 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20098: [SPARK-22914][DEPLOY] Register history.ui.port
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/20098 Merging to master / 2.3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20013: [SPARK-20657][core] Speed up rendering of the stages pag...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20013 **[Test build #85737 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85737/testReport)** for PR 20013 at commit [`c4e7f61`](https://github.com/apache/spark/commit/c4e7f6149fbff39c9f3955f536095ed4fd5df2ff). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20029: [SPARK-22793][SQL]Memory leak in Spark Thrift Server
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20029 **[Test build #85736 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85736/testReport)** for PR 20029 at commit [`2b1e166`](https://github.com/apache/spark/commit/2b1e166f4f43b300d272fc7ce1d9d7997f7ae3cd). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20029: [SPARK-22793][SQL]Memory leak in Spark Thrift Server
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/20029 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20029: [SPARK-22793][SQL]Memory leak in Spark Thrift Server
Github user liufengdb commented on the issue: https://github.com/apache/spark/pull/20029 lgtm! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20097: [SPARK-22912] v2 data source support in MicroBatchExecut...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20097 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20097: [SPARK-22912] v2 data source support in MicroBatchExecut...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20097 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85730/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20097: [SPARK-22912] v2 data source support in MicroBatchExecut...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20097 **[Test build #85730 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85730/testReport)** for PR 20097 at commit [`5f0a6e2`](https://github.com/apache/spark/commit/5f0a6e271bec953c79b2298190cb848129f5b84e). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160009573 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala --- @@ -19,7 +19,8 @@ package org.apache.spark.sql.kafka010 import org.apache.kafka.common.TopicPartition -import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset} +import org.apache.spark.sql.execution.streaming.{Offset => OffsetV1, SerializedOffset} --- End diff -- Ummm.. i think its better to rename new Offset to OffsetV2 than this rename old one to OffsetV1. This will keep it more consistent with other APIs which have V2 in them. Also, the MicroBatchExecution in the other PR also uses OffsetV2. Sorry for the nothing think ahead on this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20169: [SPARK-17088][hive] Fix 'sharesHadoopClasses' option whe...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20169 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20169: [SPARK-17088][hive] Fix 'sharesHadoopClasses' option whe...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20169 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85733/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20169: [SPARK-17088][hive] Fix 'sharesHadoopClasses' option whe...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20169 **[Test build #85733 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85733/testReport)** for PR 20169 at commit [`668fcba`](https://github.com/apache/spark/commit/668fcbac6b24e6c1e4b6c720459654ca8e88f03c). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160004815 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +/** + * A [[ContinuousReader]] for data from kafka. + * + * @param offsetReader a reader used to get kafka offsets. Note that the actual data will be + * read by per-task consumers generated later. + * @param kafkaParams String params for per-task Kafka consumers. + * @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceV2Options]] params which + * are not Kafka consumer params. + * @param metadataPath Path to a directory this reader can use for writing metadata. + * @param initialOffsets The Kafka offsets to start reading data at. + * @param failOnDataLoss Flag indicating whether reading should fail in data loss + * scenarios, where some offsets after the specified initial ones can't be + * properly read. + */ +class KafkaContinuousReader( +offsetReader: KafkaOffsetReader, +kafkaParams: java.util.Map[String, Object], --- End diff -- Since there are lots of different uses of java.util.* ... you can probably rename java.util to ju. Thats what the file KafkaSourceProvider class does. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160007884 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -418,11 +418,16 @@ abstract class StreamExecution( * Blocks the current thread until processing for data from the given `source` has reached at * least the given `Offset`. This method is intended for use primarily when writing tests. */ - private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = { + private[sql] def awaitOffset(sourceIndex: Int, newOffset: Offset): Unit = { assertAwaitThread() def notDone = { val localCommittedOffsets = committedOffsets - !localCommittedOffsets.contains(source) || localCommittedOffsets(source) != newOffset + if (sources.length <= sourceIndex) { +false --- End diff -- The race condition is present because `sources` is initialized to Seq.empty and then assigned to the actual sources. You can actually initialize `sources` to null, and then return `notDone = false` when `sources` is null. Any other mismatch should throw error. I dont like this current code which hides erroneous situations. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160006676 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousWriter.scala --- @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection} +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{kafkaParamsForProducer, TOPIC_OPTION_KEY} +import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.{BinaryType, StringType, StructType} + +/** + * Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we + * don't need to really send one. + */ +case object KafkaWriterCommitMessage extends WriterCommitMessage + +/** + * A [[ContinuousWriter]] for Kafka writing. Responsible for generating the writer factory. + * @param topic The topic this writer is responsible for. If None, topic will be inferred from + * a `topic` field in the incoming data. + * @param producerParams Parameters for Kafka producers in each task. + * @param schema The schema of the input data. + */ +class KafkaContinuousWriter( +topic: Option[String], producerParams: Map[String, String], schema: StructType) + extends ContinuousWriter with SupportsWriteInternalRow { + + override def createInternalRowWriterFactory(): KafkaContinuousWriterFactory = +KafkaContinuousWriterFactory(topic, producerParams, schema) + + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort(messages: Array[WriterCommitMessage]): Unit = {} +} + +/** + * A [[DataWriterFactory]] for Kafka writing. Will be serialized and sent to executors to generate + * the per-task data writers. + * @param topic The topic that should be written to. If None, topic will be inferred from + * a `topic` field in the incoming data. + * @param producerParams Parameters for Kafka producers in each task. + * @param schema The schema of the input data. + */ +case class KafkaContinuousWriterFactory( +topic: Option[String], producerParams: Map[String, String], schema: StructType) + extends DataWriterFactory[InternalRow] { + + override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = { +new KafkaContinuousDataWriter(topic, producerParams, schema.toAttributes) + } +} + +/** + * A [[DataWriter]] for Kafka writing. One data writer will be created in each partition to + * process incoming rows. + * + * @param targetTopic The topic that this data writer is targeting. If None, topic will be inferred + *from a `topic` field in the incoming data. + * @param producerParams Parameters to use for the Kafka producer. + * @param inputSchema The attributes in the input data. + */ +class KafkaContinuousDataWriter( +targetTopic: Option[String], producerParams: Map[String, String], inputSchema: Seq[Attribute]) + extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] { + import scala.collection.JavaConverters._ + + private lazy val producer = CachedKafkaProducer.getOrCreate( +new java.util.HashMap[String, Object](producerParams.asJava)) + + def write(row: InternalRow): Unit = { +checkForErrors() +sendRow(row, producer) + } + + def commit(): WriterCommitMessage