[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160087111 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,482 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `initBatch` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + // ORC File Reader + private Reader reader; + + // Vectorized ORC Row Batch + private VectorizedRowBatch batch; + + /** + * The column IDs of the physical ORC file schema which are required by this reader. + * -1 means this required column doesn't exist in the ORC file. + */ + private int[] requestedColIds; + + // Record reader from ORC row batch. + private org.apache.orc.RecordReader recordReader; + + private StructType requiredSchema; --- End diff -- this could be `StructField[] requiredFields` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160087041 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -386,6 +386,16 @@ object SQLConf { .checkValues(Set("hive", "native")) .createWithDefault("native") + val ORC_VECTORIZED_READER_ENABLED = buildConf("spark.sql.orc.enableVectorizedReader") +.doc("Enables vectorized orc decoding.") +.booleanConf +.createWithDefault(true) + + val ORC_VECTORIZED_JAVA_READER_ENABLED = buildConf("spark.sql.orc.enableVectorizedJavaReader") --- End diff -- I don't think there will be a performance difference, but using java for things like this is more conventional in Spark. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20171: [SPARK-22978] [PySpark] Register Vectorized UDFs ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/20171#discussion_r160086985 --- Diff: python/pyspark/sql/catalog.py --- @@ -265,12 +267,23 @@ def registerFunction(self, name, f, returnType=StringType()): [Row(random_udf()=u'82')] >>> spark.range(1).select(newRandom_udf()).collect() # doctest: +SKIP [Row(random_udf()=u'62')] + +>>> import random +>>> from pyspark.sql.types import IntegerType +>>> from pyspark.sql.functions import pandas_udf +>>> random_pandas_udf = pandas_udf( +... lambda x: random.randint(0, 100) + x, IntegerType()) +... .asNondeterministic() # doctest: +SKIP +>>> _ = spark.catalog.registerFunction( +... "random_pandas_udf", random_pandas_udf, IntegerType()) # doctest: +SKIP +>>> spark.sql("SELECT random_pandas_udf(2)").collect() # doctest: +SKIP +[Row(random_pandas_udf(2)=84)] """ # This is to check whether the input function is a wrapped/native UserDefinedFunction if hasattr(f, 'asNondeterministic'): udf = UserDefinedFunction(f.func, returnType=returnType, name=name, - evalType=PythonEvalType.SQL_BATCHED_UDF, + evalType=f.evalType, --- End diff -- > when it's not a PythonEvalType.SQL_BATCHED_UDF -> > when it's neither a `PythonEvalType.SQL_BATCHED_UDF` nor `PythonEvalType.SQL_PANDAS_SCALAR_UDF`, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20163: [SPARK-22966][PySpark] Spark SQL should handle Py...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20163#discussion_r160086824 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala --- @@ -120,10 +121,18 @@ object EvaluatePython { case (c: java.math.BigDecimal, dt: DecimalType) => Decimal(c, dt.precision, dt.scale) case (c: Int, DateType) => c +// Pyrolite will unpickle a Python datetime.date to a java.util.Calendar +case (c: Calendar, DateType) => DateTimeUtils.fromJavaCalendarForDate(c) --- End diff -- seems what we need is a `case (c: Calendar, StringType) => ...` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20163: [SPARK-22966][PySpark] Spark SQL should handle Py...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20163#discussion_r160086772 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala --- @@ -120,10 +121,18 @@ object EvaluatePython { case (c: java.math.BigDecimal, dt: DecimalType) => Decimal(c, dt.precision, dt.scale) case (c: Int, DateType) => c +// Pyrolite will unpickle a Python datetime.date to a java.util.Calendar +case (c: Calendar, DateType) => DateTimeUtils.fromJavaCalendarForDate(c) --- End diff -- so we will never hit this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compres...
Github user fjh100456 commented on a diff in the pull request: https://github.com/apache/spark/pull/20087#discussion_r160086482 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala --- @@ -68,6 +68,10 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { .get("mapreduce.output.fileoutputformat.compress.type")) } +// Set compression by priority +HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, sparkSession.sessionState.conf) + .foreach { case (compression, codec) => hadoopConf.set(compression, codec) } --- End diff -- For parquet, without the changes of this pr, the precedence is table-level compression > `mapreduce.output.fileoutputformat.compress`. `spark.sql.parquet.compression` never takes effect. But now with this pr, `mapreduce.output.fileoutputformat.compress` will not take effect. As an alternative, `spark.sql.parquet.compression` will always take effect if there is no table level compression. For ORC, `hive.exec.compress.output` does not take effect, as explained in the comments of the Code. Shall we keep this precedence for parquet? If so, how to deal with ORC? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/19943 @henrify and @cloud-fan . I updated the PR with put APIs. You can check the BM result. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160086248 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. + */ + private org.apache.orc.RecordReader recordReader; + + /** + * Required Schema. + */ + private StructType requiredSchema; + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private ColumnarBatch columnarBatch; + + /** + * Writable column vectors of ColumnarBatch. + */ + private WritableColumnVector[] columnVectors; + + /** + * The number of rows read and considered to be returned. + */ + private long rowsReturned = 0L; + + /** + * Total number of rows. + */ + private long totalRowCount = 0L; + + @Override + public Void getCurrentKey() throws IOException, InterruptedException { +return null; + } + + @Override + public ColumnarBatch getCurrentValue() throws IOException, InterruptedException { +return columnarBatch; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return (float) rowsReturned / totalRowCount; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +return nextBatch(); + } + + @Override + public void close() throws IOException { +if (columnarBatch != null) { + columnarBatch.close(); + columnarBatch = null; +} +if (recordReader != null) { + recordReader.close(); + recordReader = null; +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException {
[GitHub] spark issue #20178: [Spark-22952][CORE] Deprecate stageAttemptId in favour o...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20178 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85785/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20178: [Spark-22952][CORE] Deprecate stageAttemptId in favour o...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20178 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20178: [Spark-22952][CORE] Deprecate stageAttemptId in favour o...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20178 **[Test build #85785 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85785/testReport)** for PR 20178 at commit [`2ec9197`](https://github.com/apache/spark/commit/2ec919761ced379f00e1fa9804a66e3b15e9d2e9). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19943 **[Test build #85788 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85788/testReport)** for PR 19943 at commit [`3a0702a`](https://github.com/apache/spark/commit/3a0702ae0b31f762c9f3da06d267a02ec8d1a23b). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20151: [SPARK-22959][PYTHON] Configuration to select the module...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20151 Yup, thanks for all review @felixcheung and @ueshin BTW --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20151: [SPARK-22959][PYTHON] Configuration to select the module...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/20151 Looks good. Let's wait for @rxin's response. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20185: Branch 2.3
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20185 Seems mistakenly open. Could you close this @jimmy144 ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20185: Branch 2.3
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20185 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20185: Branch 2.3
GitHub user jimmy144 opened a pull request: https://github.com/apache/spark/pull/20185 Branch 2.3 ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/apache/spark branch-2.3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20185.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20185 commit 5244aafc2d7945c11c96398b8d5b752b45fd148c Author: Xianjin YEDate: 2018-01-02T15:30:38Z [SPARK-22897][CORE] Expose stageAttemptId in TaskContext ## What changes were proposed in this pull request? stageAttemptId added in TaskContext and corresponding construction modification ## How was this patch tested? Added a new test in TaskContextSuite, two cases are tested: 1. Normal case without failure 2. Exception case with resubmitted stages Link to [SPARK-22897](https://issues.apache.org/jira/browse/SPARK-22897) Author: Xianjin YE Closes #20082 from advancedxy/SPARK-22897. (cherry picked from commit a6fc300e91273230e7134ac6db95ccb4436c6f8f) Signed-off-by: Wenchen Fan commit b96a2132413937c013e1099be3ec4bc420c947fd Author: Juliusz Sompolski Date: 2018-01-03T13:40:51Z [SPARK-22938] Assert that SQLConf.get is accessed only on the driver. ## What changes were proposed in this pull request? Assert if code tries to access SQLConf.get on executor. This can lead to hard to detect bugs, where the executor will read fallbackConf, falling back to default config values, ignoring potentially changed non-default configs. If a config is to be passed to executor code, it needs to be read on the driver, and passed explicitly. ## How was this patch tested? Check in existing tests. Author: Juliusz Sompolski Closes #20136 from juliuszsompolski/SPARK-22938. (cherry picked from commit 247a08939d58405aef39b2a4e7773aa45474ad12) Signed-off-by: Wenchen Fan commit a05e85ecb76091567a26a3a14ad0879b4728addc Author: gatorsmile Date: 2018-01-03T14:09:30Z [SPARK-22934][SQL] Make optional clauses order insensitive for CREATE TABLE SQL statement ## What changes were proposed in this pull request? Currently, our CREATE TABLE syntax require the EXACT order of clauses. It is pretty hard to remember the exact order. Thus, this PR is to make optional clauses order insensitive for `CREATE TABLE` SQL statement. ``` CREATE [TEMPORARY] TABLE [IF NOT EXISTS] [db_name.]table_name [(col_name1 col_type1 [COMMENT col_comment1], ...)] USING datasource [OPTIONS (key1=val1, key2=val2, ...)] [PARTITIONED BY (col_name1, col_name2, ...)] [CLUSTERED BY (col_name3, col_name4, ...) INTO num_buckets BUCKETS] [LOCATION path] [COMMENT table_comment] [TBLPROPERTIES (key1=val1, key2=val2, ...)] [AS select_statement] ``` The proposal is to make the following clauses order insensitive. ``` [OPTIONS (key1=val1, key2=val2, ...)] [PARTITIONED BY (col_name1, col_name2, ...)] [CLUSTERED BY (col_name3, col_name4, ...) INTO num_buckets BUCKETS] [LOCATION path] [COMMENT table_comment] [TBLPROPERTIES (key1=val1, key2=val2, ...)] ``` The same idea is also applicable to Create Hive Table. ``` CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name [(col_name1[:] col_type1 [COMMENT col_comment1], ...)] [COMMENT table_comment] [PARTITIONED BY (col_name2[:] col_type2 [COMMENT col_comment2], ...)] [ROW FORMAT row_format] [STORED AS file_format] [LOCATION path] [TBLPROPERTIES (key1=val1, key2=val2, ...)] [AS select_statement] ``` The proposal is to make the following clauses order insensitive. ``` [COMMENT table_comment] [PARTITIONED BY (col_name2[:] col_type2 [COMMENT col_comment2], ...)] [ROW FORMAT row_format] [STORED AS file_format] [LOCATION path] [TBLPROPERTIES (key1=val1, key2=val2, ...)] ``` ## How was this patch tested? Added test cases Author: gatorsmile
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user henrify commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160084073 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. + */ + private org.apache.orc.RecordReader recordReader; + + /** + * Required Schema. + */ + private StructType requiredSchema; + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private ColumnarBatch columnarBatch; + + /** + * Writable column vectors of ColumnarBatch. + */ + private WritableColumnVector[] columnVectors; + + /** + * The number of rows read and considered to be returned. + */ + private long rowsReturned = 0L; + + /** + * Total number of rows. + */ + private long totalRowCount = 0L; + + @Override + public Void getCurrentKey() throws IOException, InterruptedException { +return null; + } + + @Override + public ColumnarBatch getCurrentValue() throws IOException, InterruptedException { +return columnarBatch; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return (float) rowsReturned / totalRowCount; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +return nextBatch(); + } + + @Override + public void close() throws IOException { +if (columnarBatch != null) { + columnarBatch.close(); + columnarBatch = null; +} +if (recordReader != null) { + recordReader.close(); + recordReader = null; +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { +
[GitHub] spark issue #20151: [SPARK-22959][PYTHON] Configuration to select the module...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20151 I manually tested after setting `spark.python.daemon.module` to `nonexistantmodule`. It shows the error message like this: ```python >>> spark.range(1).rdd.map(lambda x: x).collect() ``` ``` ... Traceback (most recent call last): File "", line 1, in File "/.../spark/python/pyspark/rdd.py", line 824, in collect port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) File "/.../spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__ File "/.../spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/.../spark/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost, executor driver): org.apache.spark.SparkException: Error from python worker: /usr/bin/python: No module named nonexistantmodule PYTHONPATH was: /.../spark/python/lib/pyspark.zip:/.../spark/python/lib/py4j-0.10.6-src.zip:/.../spark/assembly/target/scala-2.11/jars/spark-core_2.11-2.3.0-SNAPSHOT.jar:/.../spark/python/lib/py4j-0.10.6-src.zip:/.../spark/python/: java.io.EOFException ... Driver stacktrace: ... Caused by: org.apache.spark.SparkException: Error from python worker: /usr/bin/python: No module named nonexistantmodule PYTHONPATH was: /.../spark/python/lib/pyspark.zip:/.../spark/python/lib/py4j-0.10.6-src.zip:/.../spark/assembly/target/scala-2.11/jars/spark-core_2.11-2.3.0-SNAPSHOT.jar:/.../spark/python/lib/py4j-0.10.6-src.zip:/.../spark/python/: java.io.EOFException ... ... 1 more 18/01/08 15:54:06 WARN TaskSetManager: Lost task 6.0 in stage 0.0 (TID 6, localhost, executor driver): TaskKilled (Stage cancelled) ... ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160082910 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. + */ + private org.apache.orc.RecordReader recordReader; + + /** + * Required Schema. + */ + private StructType requiredSchema; + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private ColumnarBatch columnarBatch; + + /** + * Writable column vectors of ColumnarBatch. + */ + private WritableColumnVector[] columnVectors; + + /** + * The number of rows read and considered to be returned. + */ + private long rowsReturned = 0L; + + /** + * Total number of rows. + */ + private long totalRowCount = 0L; + + @Override + public Void getCurrentKey() throws IOException, InterruptedException { +return null; + } + + @Override + public ColumnarBatch getCurrentValue() throws IOException, InterruptedException { +return columnarBatch; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return (float) rowsReturned / totalRowCount; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +return nextBatch(); + } + + @Override + public void close() throws IOException { +if (columnarBatch != null) { + columnarBatch.close(); + columnarBatch = null; +} +if (recordReader != null) { + recordReader.close(); + recordReader = null; +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException {
[GitHub] spark issue #14180: [SPARK-16367][PYSPARK] Support for deploying Anaconda an...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/14180 @gatorsmile @jiangxb1987 Maybe we should review and merge #13599 first because this pr is based on it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160082708 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. + */ + private org.apache.orc.RecordReader recordReader; + + /** + * Required Schema. + */ + private StructType requiredSchema; + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private ColumnarBatch columnarBatch; + + /** + * Writable column vectors of ColumnarBatch. + */ + private WritableColumnVector[] columnVectors; + + /** + * The number of rows read and considered to be returned. + */ + private long rowsReturned = 0L; + + /** + * Total number of rows. + */ + private long totalRowCount = 0L; + + @Override + public Void getCurrentKey() throws IOException, InterruptedException { +return null; + } + + @Override + public ColumnarBatch getCurrentValue() throws IOException, InterruptedException { +return columnarBatch; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return (float) rowsReturned / totalRowCount; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +return nextBatch(); + } + + @Override + public void close() throws IOException { +if (columnarBatch != null) { + columnarBatch.close(); + columnarBatch = null; +} +if (recordReader != null) { + recordReader.close(); + recordReader = null; +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException {
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user henrify commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160082127 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. + */ + private org.apache.orc.RecordReader recordReader; + + /** + * Required Schema. + */ + private StructType requiredSchema; + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private ColumnarBatch columnarBatch; + + /** + * Writable column vectors of ColumnarBatch. + */ + private WritableColumnVector[] columnVectors; + + /** + * The number of rows read and considered to be returned. + */ + private long rowsReturned = 0L; + + /** + * Total number of rows. + */ + private long totalRowCount = 0L; + + @Override + public Void getCurrentKey() throws IOException, InterruptedException { +return null; + } + + @Override + public ColumnarBatch getCurrentValue() throws IOException, InterruptedException { +return columnarBatch; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return (float) rowsReturned / totalRowCount; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +return nextBatch(); + } + + @Override + public void close() throws IOException { +if (columnarBatch != null) { + columnarBatch.close(); + columnarBatch = null; +} +if (recordReader != null) { + recordReader.close(); + recordReader = null; +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { +
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160080614 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. + */ + private org.apache.orc.RecordReader recordReader; + + /** + * Required Schema. + */ + private StructType requiredSchema; + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private ColumnarBatch columnarBatch; + + /** + * Writable column vectors of ColumnarBatch. + */ + private WritableColumnVector[] columnVectors; + + /** + * The number of rows read and considered to be returned. + */ + private long rowsReturned = 0L; + + /** + * Total number of rows. + */ + private long totalRowCount = 0L; + + @Override + public Void getCurrentKey() throws IOException, InterruptedException { +return null; + } + + @Override + public ColumnarBatch getCurrentValue() throws IOException, InterruptedException { +return columnarBatch; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return (float) rowsReturned / totalRowCount; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +return nextBatch(); + } + + @Override + public void close() throws IOException { +if (columnarBatch != null) { + columnarBatch.close(); + columnarBatch = null; +} +if (recordReader != null) { + recordReader.close(); + recordReader = null; +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { +
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160080462 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. + */ + private org.apache.orc.RecordReader recordReader; + + /** + * Required Schema. + */ + private StructType requiredSchema; + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private ColumnarBatch columnarBatch; + + /** + * Writable column vectors of ColumnarBatch. + */ + private WritableColumnVector[] columnVectors; + + /** + * The number of rows read and considered to be returned. + */ + private long rowsReturned = 0L; + + /** + * Total number of rows. + */ + private long totalRowCount = 0L; + + @Override + public Void getCurrentKey() throws IOException, InterruptedException { +return null; + } + + @Override + public ColumnarBatch getCurrentValue() throws IOException, InterruptedException { +return columnarBatch; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return (float) rowsReturned / totalRowCount; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +return nextBatch(); + } + + @Override + public void close() throws IOException { +if (columnarBatch != null) { + columnarBatch.close(); + columnarBatch = null; +} +if (recordReader != null) { + recordReader.close(); + recordReader = null; +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { +
[GitHub] spark issue #19080: [SPARK-21865][SQL] simplify the distribution semantic of...
Github user sameeragarwal commented on the issue: https://github.com/apache/spark/pull/19080 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...
Github user sameeragarwal commented on a diff in the pull request: https://github.com/apache/spark/pull/19080#discussion_r160080116 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -51,12 +76,41 @@ case object AllTuples extends Distribution */ case class ClusteredDistribution( clustering: Seq[Expression], -numPartitions: Option[Int] = None) extends Distribution { +requiredNumPartitions: Option[Int] = None) extends Distribution { require( clustering != Nil, "The clustering expressions of a ClusteredDistribution should not be Nil. " + "An AllTuples should be used to represent a distribution that only has " + "a single partition.") + + override def createPartitioning(numPartitions: Int): Partitioning = { +assert(requiredNumPartitions.isEmpty || requiredNumPartitions.get == numPartitions, + s"This ClusteredDistribution requires ${requiredNumPartitions.get} partitions, but " + +s"the actual number of partitions is $numPartitions.") +HashPartitioning(clustering, numPartitions) + } +} + +/** + * Represents data where tuples have been partitioned according to the hash of the given + * `expressions`. The hash function is defined as `HashPartitioning.partitionIdExpression`, so only + * [[HashPartitioning]] can satisfy this distribution. + * + * This is a strictly stronger guarantee than [[ClusteredDistribution]]. Given a tuple and the + * number of partitions, this distribution strictly requires which partition the tuple should be in. + */ +case class HashPartitionedDistribution(expressions: Seq[Expression]) extends Distribution { --- End diff -- sounds good --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20087: [SPARK-21786][SQL] The 'spark.sql.parquet.compression.co...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20087 **[Test build #85787 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85787/testReport)** for PR 20087 at commit [`5b150bc`](https://github.com/apache/spark/commit/5b150bc643e301ec063704303aebeea87de02e83). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19080: [SPARK-21865][SQL] simplify the distribution semantic of...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19080 **[Test build #85786 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85786/testReport)** for PR 19080 at commit [`a2f1bc1`](https://github.com/apache/spark/commit/a2f1bc1a077929e9d6bdb7e855debfdbac3ca03e). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160079405 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. + */ + private org.apache.orc.RecordReader recordReader; + + /** + * Required Schema. + */ + private StructType requiredSchema; + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private ColumnarBatch columnarBatch; + + /** + * Writable column vectors of ColumnarBatch. + */ + private WritableColumnVector[] columnVectors; + + /** + * The number of rows read and considered to be returned. + */ + private long rowsReturned = 0L; + + /** + * Total number of rows. + */ + private long totalRowCount = 0L; + + @Override + public Void getCurrentKey() throws IOException, InterruptedException { +return null; + } + + @Override + public ColumnarBatch getCurrentValue() throws IOException, InterruptedException { +return columnarBatch; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return (float) rowsReturned / totalRowCount; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +return nextBatch(); + } + + @Override + public void close() throws IOException { +if (columnarBatch != null) { + columnarBatch.close(); + columnarBatch = null; +} +if (recordReader != null) { + recordReader.close(); + recordReader = null; +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException {
[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19080#discussion_r160079396 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -73,46 +127,31 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution { "An AllTuples should be used to represent a distribution that only has " + "a single partition.") - // TODO: This is not really valid... - def clustering: Set[Expression] = ordering.map(_.child).toSet + override def requiredNumPartitions: Option[Int] = None + + override def createPartitioning(numPartitions: Int): Partitioning = { +RangePartitioning(ordering, numPartitions) + } } /** * Represents data where tuples are broadcasted to every node. It is quite common that the * entire set of tuples is transformed into different data structure. */ -case class BroadcastDistribution(mode: BroadcastMode) extends Distribution +case class BroadcastDistribution(mode: BroadcastMode) extends Distribution { --- End diff -- yea good idea, but again, this is an existing problem, let's fix it in another PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19080#discussion_r160079216 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -73,46 +127,31 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution { "An AllTuples should be used to represent a distribution that only has " + "a single partition.") - // TODO: This is not really valid... - def clustering: Set[Expression] = ordering.map(_.child).toSet + override def requiredNumPartitions: Option[Int] = None --- End diff -- According to the comment `This is a strictly stronger guarantee than [[ClusteredDistribution]]`, we want to guarantee it. However we actually don't respect it, see https://github.com/apache/spark/pull/19080/files#r136419947 Since it is an existing problem, I'd like to fix it in another PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19988: [Spark-22795] [ML] Raise error when line search in First...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19988 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19080: [SPARK-21865][SQL] simplify the distribution sema...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19080#discussion_r160079142 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala --- @@ -51,12 +76,41 @@ case object AllTuples extends Distribution */ case class ClusteredDistribution( clustering: Seq[Expression], -numPartitions: Option[Int] = None) extends Distribution { +requiredNumPartitions: Option[Int] = None) extends Distribution { require( clustering != Nil, "The clustering expressions of a ClusteredDistribution should not be Nil. " + "An AllTuples should be used to represent a distribution that only has " + "a single partition.") + + override def createPartitioning(numPartitions: Int): Partitioning = { +assert(requiredNumPartitions.isEmpty || requiredNumPartitions.get == numPartitions, + s"This ClusteredDistribution requires ${requiredNumPartitions.get} partitions, but " + +s"the actual number of partitions is $numPartitions.") +HashPartitioning(clustering, numPartitions) + } +} + +/** + * Represents data where tuples have been partitioned according to the hash of the given + * `expressions`. The hash function is defined as `HashPartitioning.partitionIdExpression`, so only + * [[HashPartitioning]] can satisfy this distribution. + * + * This is a strictly stronger guarantee than [[ClusteredDistribution]]. Given a tuple and the + * number of partitions, this distribution strictly requires which partition the tuple should be in. + */ +case class HashPartitionedDistribution(expressions: Seq[Expression]) extends Distribution { --- End diff -- good idea, I'll rename it to `HashClusteredDistribution`. But I'd like to not extend `ClusteredDistribution`, since if a partition can satisfy `ClusteredDistribution`, it may not be able to satisfy `HashClusteredDistribution`. Thus we can't replace a parent with a child, which obeys Liskov Substitution Principle. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160079119 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. + */ + private org.apache.orc.RecordReader recordReader; + + /** + * Required Schema. + */ + private StructType requiredSchema; + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private ColumnarBatch columnarBatch; + + /** + * Writable column vectors of ColumnarBatch. + */ + private WritableColumnVector[] columnVectors; + + /** + * The number of rows read and considered to be returned. + */ + private long rowsReturned = 0L; + + /** + * Total number of rows. + */ + private long totalRowCount = 0L; + + @Override + public Void getCurrentKey() throws IOException, InterruptedException { +return null; + } + + @Override + public ColumnarBatch getCurrentValue() throws IOException, InterruptedException { +return columnarBatch; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return (float) rowsReturned / totalRowCount; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +return nextBatch(); + } + + @Override + public void close() throws IOException { +if (columnarBatch != null) { + columnarBatch.close(); + columnarBatch = null; +} +if (recordReader != null) { + recordReader.close(); + recordReader = null; +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException {
[GitHub] spark issue #20174: [SPARK-22951][SQL] aggregate should not produce empty ro...
Github user liufengdb commented on the issue: https://github.com/apache/spark/pull/20174 test this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20172: [SPARK-22979][PYTHON][SQL] Avoid per-record type ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20172 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160078819 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. + */ + private org.apache.orc.RecordReader recordReader; + + /** + * Required Schema. + */ + private StructType requiredSchema; + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private ColumnarBatch columnarBatch; + + /** + * Writable column vectors of ColumnarBatch. + */ + private WritableColumnVector[] columnVectors; + + /** + * The number of rows read and considered to be returned. + */ + private long rowsReturned = 0L; + + /** + * Total number of rows. + */ + private long totalRowCount = 0L; + + @Override + public Void getCurrentKey() throws IOException, InterruptedException { +return null; + } + + @Override + public ColumnarBatch getCurrentValue() throws IOException, InterruptedException { +return columnarBatch; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return (float) rowsReturned / totalRowCount; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +return nextBatch(); + } + + @Override + public void close() throws IOException { +if (columnarBatch != null) { + columnarBatch.close(); + columnarBatch = null; +} +if (recordReader != null) { + recordReader.close(); + recordReader = null; +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException {
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160078799 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. + */ + private org.apache.orc.RecordReader recordReader; + + /** + * Required Schema. + */ + private StructType requiredSchema; + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private ColumnarBatch columnarBatch; + + /** + * Writable column vectors of ColumnarBatch. + */ + private WritableColumnVector[] columnVectors; + + /** + * The number of rows read and considered to be returned. + */ + private long rowsReturned = 0L; + + /** + * Total number of rows. + */ + private long totalRowCount = 0L; + + @Override + public Void getCurrentKey() throws IOException, InterruptedException { +return null; + } + + @Override + public ColumnarBatch getCurrentValue() throws IOException, InterruptedException { +return columnarBatch; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return (float) rowsReturned / totalRowCount; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +return nextBatch(); + } + + @Override + public void close() throws IOException { +if (columnarBatch != null) { + columnarBatch.close(); + columnarBatch = null; +} +if (recordReader != null) { + recordReader.close(); + recordReader = null; +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException {
[GitHub] spark issue #20172: [SPARK-22979][PYTHON][SQL] Avoid per-record type dispatc...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20172 LGTM, merging to master/2.3 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19989: [SPARK-22793][SQL][BACKPORT-2.0]Memory leak in Sp...
Github user zuotingbing closed the pull request at: https://github.com/apache/spark/pull/19989 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user henrify commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160078679 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. + */ + private org.apache.orc.RecordReader recordReader; + + /** + * Required Schema. + */ + private StructType requiredSchema; + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private ColumnarBatch columnarBatch; + + /** + * Writable column vectors of ColumnarBatch. + */ + private WritableColumnVector[] columnVectors; + + /** + * The number of rows read and considered to be returned. + */ + private long rowsReturned = 0L; + + /** + * Total number of rows. + */ + private long totalRowCount = 0L; + + @Override + public Void getCurrentKey() throws IOException, InterruptedException { +return null; + } + + @Override + public ColumnarBatch getCurrentValue() throws IOException, InterruptedException { +return columnarBatch; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return (float) rowsReturned / totalRowCount; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +return nextBatch(); + } + + @Override + public void close() throws IOException { +if (columnarBatch != null) { + columnarBatch.close(); + columnarBatch = null; +} +if (recordReader != null) { + recordReader.close(); + recordReader = null; +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { +
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160078425 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. + */ + private org.apache.orc.RecordReader recordReader; + + /** + * Required Schema. + */ + private StructType requiredSchema; + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private ColumnarBatch columnarBatch; + + /** + * Writable column vectors of ColumnarBatch. + */ + private WritableColumnVector[] columnVectors; + + /** + * The number of rows read and considered to be returned. + */ + private long rowsReturned = 0L; + + /** + * Total number of rows. + */ + private long totalRowCount = 0L; + + @Override + public Void getCurrentKey() throws IOException, InterruptedException { +return null; + } + + @Override + public ColumnarBatch getCurrentValue() throws IOException, InterruptedException { +return columnarBatch; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return (float) rowsReturned / totalRowCount; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +return nextBatch(); + } + + @Override + public void close() throws IOException { +if (columnarBatch != null) { + columnarBatch.close(); + columnarBatch = null; +} +if (recordReader != null) { + recordReader.close(); + recordReader = null; +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException {
[GitHub] spark issue #20180: [SPARK-22983] Don't push filters beneath aggregates with...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20180 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20180: [SPARK-22983] Don't push filters beneath aggregates with...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20180 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85783/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20180: [SPARK-22983] Don't push filters beneath aggregates with...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20180 **[Test build #85783 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85783/testReport)** for PR 20180 at commit [`5568d55`](https://github.com/apache/spark/commit/5568d55255cbdd5313f7391755c7c39d34390c30). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160078349 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. + */ + private org.apache.orc.RecordReader recordReader; + + /** + * Required Schema. + */ + private StructType requiredSchema; + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private ColumnarBatch columnarBatch; + + /** + * Writable column vectors of ColumnarBatch. + */ + private WritableColumnVector[] columnVectors; + + /** + * The number of rows read and considered to be returned. + */ + private long rowsReturned = 0L; + + /** + * Total number of rows. + */ + private long totalRowCount = 0L; + + @Override + public Void getCurrentKey() throws IOException, InterruptedException { +return null; + } + + @Override + public ColumnarBatch getCurrentValue() throws IOException, InterruptedException { +return columnarBatch; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return (float) rowsReturned / totalRowCount; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +return nextBatch(); + } + + @Override + public void close() throws IOException { +if (columnarBatch != null) { + columnarBatch.close(); + columnarBatch = null; +} +if (recordReader != null) { + recordReader.close(); + recordReader = null; +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException {
[GitHub] spark issue #19884: [SPARK-22324][SQL][PYTHON] Upgrade Arrow to 0.8.0
Github user BryanCutler commented on the issue: https://github.com/apache/spark/pull/19884 @HyukjinKwon [ARROW-1949](https://issues.apache.org/jira/browse/ARROW-1949) was created to add an option to allow truncation when data will be lost. Once that is in Arrow, we can remove the workaround if we want. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160078288 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. + */ + private org.apache.orc.RecordReader recordReader; + + /** + * Required Schema. + */ + private StructType requiredSchema; + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private ColumnarBatch columnarBatch; + + /** + * Writable column vectors of ColumnarBatch. + */ + private WritableColumnVector[] columnVectors; + + /** + * The number of rows read and considered to be returned. + */ + private long rowsReturned = 0L; + + /** + * Total number of rows. + */ + private long totalRowCount = 0L; + + @Override + public Void getCurrentKey() throws IOException, InterruptedException { +return null; + } + + @Override + public ColumnarBatch getCurrentValue() throws IOException, InterruptedException { +return columnarBatch; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return (float) rowsReturned / totalRowCount; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +return nextBatch(); + } + + @Override + public void close() throws IOException { +if (columnarBatch != null) { + columnarBatch.close(); + columnarBatch = null; +} +if (recordReader != null) { + recordReader.close(); + recordReader = null; +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException {
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160078262 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. + */ + private org.apache.orc.RecordReader recordReader; + + /** + * Required Schema. + */ + private StructType requiredSchema; + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private ColumnarBatch columnarBatch; + + /** + * Writable column vectors of ColumnarBatch. + */ + private WritableColumnVector[] columnVectors; + + /** + * The number of rows read and considered to be returned. + */ + private long rowsReturned = 0L; + + /** + * Total number of rows. + */ + private long totalRowCount = 0L; + + @Override + public Void getCurrentKey() throws IOException, InterruptedException { +return null; + } + + @Override + public ColumnarBatch getCurrentValue() throws IOException, InterruptedException { +return columnarBatch; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return (float) rowsReturned / totalRowCount; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +return nextBatch(); + } + + @Override + public void close() throws IOException { +if (columnarBatch != null) { + columnarBatch.close(); + columnarBatch = null; +} +if (recordReader != null) { + recordReader.close(); + recordReader = null; +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException {
[GitHub] spark issue #20181: [SPARK-22984] Fix incorrect bitmap copying and offset ad...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20181 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20181: [SPARK-22984] Fix incorrect bitmap copying and offset ad...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20181 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85782/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160078213 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. + */ + private org.apache.orc.RecordReader recordReader; + + /** + * Required Schema. + */ + private StructType requiredSchema; + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private ColumnarBatch columnarBatch; + + /** + * Writable column vectors of ColumnarBatch. + */ + private WritableColumnVector[] columnVectors; + + /** + * The number of rows read and considered to be returned. + */ + private long rowsReturned = 0L; + + /** + * Total number of rows. + */ + private long totalRowCount = 0L; + + @Override + public Void getCurrentKey() throws IOException, InterruptedException { +return null; + } + + @Override + public ColumnarBatch getCurrentValue() throws IOException, InterruptedException { +return columnarBatch; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return (float) rowsReturned / totalRowCount; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +return nextBatch(); + } + + @Override + public void close() throws IOException { +if (columnarBatch != null) { + columnarBatch.close(); + columnarBatch = null; +} +if (recordReader != null) { + recordReader.close(); + recordReader = null; +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException {
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160078135 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. + */ + private org.apache.orc.RecordReader recordReader; + + /** + * Required Schema. + */ + private StructType requiredSchema; + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private ColumnarBatch columnarBatch; + + /** + * Writable column vectors of ColumnarBatch. + */ + private WritableColumnVector[] columnVectors; + + /** + * The number of rows read and considered to be returned. + */ + private long rowsReturned = 0L; + + /** + * Total number of rows. + */ + private long totalRowCount = 0L; + + @Override + public Void getCurrentKey() throws IOException, InterruptedException { +return null; + } + + @Override + public ColumnarBatch getCurrentValue() throws IOException, InterruptedException { +return columnarBatch; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return (float) rowsReturned / totalRowCount; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +return nextBatch(); + } + + @Override + public void close() throws IOException { +if (columnarBatch != null) { + columnarBatch.close(); + columnarBatch = null; +} +if (recordReader != null) { + recordReader.close(); + recordReader = null; +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException {
[GitHub] spark issue #20181: [SPARK-22984] Fix incorrect bitmap copying and offset ad...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20181 **[Test build #85782 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85782/testReport)** for PR 20181 at commit [`6e879f1`](https://github.com/apache/spark/commit/6e879f17fc193b8e1d579fa9a28d64a53ba13321). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160078163 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. + */ + private org.apache.orc.RecordReader recordReader; + + /** + * Required Schema. + */ + private StructType requiredSchema; + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private ColumnarBatch columnarBatch; + + /** + * Writable column vectors of ColumnarBatch. + */ + private WritableColumnVector[] columnVectors; + + /** + * The number of rows read and considered to be returned. + */ + private long rowsReturned = 0L; + + /** + * Total number of rows. + */ + private long totalRowCount = 0L; + + @Override + public Void getCurrentKey() throws IOException, InterruptedException { +return null; + } + + @Override + public ColumnarBatch getCurrentValue() throws IOException, InterruptedException { +return columnarBatch; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return (float) rowsReturned / totalRowCount; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +return nextBatch(); + } + + @Override + public void close() throws IOException { +if (columnarBatch != null) { + columnarBatch.close(); + columnarBatch = null; +} +if (recordReader != null) { + recordReader.close(); + recordReader = null; +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException {
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160077992 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. + */ + private org.apache.orc.RecordReader recordReader; + + /** + * Required Schema. + */ + private StructType requiredSchema; + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160078039 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. + */ + private org.apache.orc.RecordReader recordReader; + + /** + * Required Schema. + */ + private StructType requiredSchema; + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private ColumnarBatch columnarBatch; + + /** + * Writable column vectors of ColumnarBatch. + */ + private WritableColumnVector[] columnVectors; + + /** + * The number of rows read and considered to be returned. + */ + private long rowsReturned = 0L; + + /** + * Total number of rows. --- End diff -- Yep. It's removed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160078014 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. + */ + private org.apache.orc.RecordReader recordReader; + + /** + * Required Schema. + */ + private StructType requiredSchema; + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private ColumnarBatch columnarBatch; + + /** + * Writable column vectors of ColumnarBatch. --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160077944 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. + */ + private org.apache.orc.RecordReader recordReader; + + /** + * Required Schema. --- End diff -- Yep. It's removed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160077928 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. --- End diff -- Yep. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160077922 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. --- End diff -- Yep. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160077924 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. --- End diff -- Updated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160077793 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. --- End diff -- Yep. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160077716 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -386,6 +386,16 @@ object SQLConf { .checkValues(Set("hive", "native")) .createWithDefault("native") + val ORC_VECTORIZED_READER_ENABLED = buildConf("spark.sql.orc.enableVectorizedReader") +.doc("Enables vectorized orc decoding.") +.booleanConf +.createWithDefault(true) + + val ORC_VECTORIZED_JAVA_READER_ENABLED = buildConf("spark.sql.orc.enableVectorizedJavaReader") --- End diff -- Yes. It does, but we need to choose the best one for both performance and maintenance perspective. Until now, according to the performance result, java implementations doesn't outperform scala version. I'll remove at the last minute to make it sure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19792: [SPARK-22566][PYTHON] Better error message for `_...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19792 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19792: [SPARK-22566][PYTHON] Better error message for `_merge_t...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/19792 Thanks! merging to master/2.3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20072: [SPARK-22790][SQL] add a configurable factor to d...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20072#discussion_r160077194 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala --- @@ -82,7 +82,11 @@ case class HadoopFsRelation( } } - override def sizeInBytes: Long = location.sizeInBytes + override def sizeInBytes: Long = { +val sizeFactor = sqlContext.conf.diskToMemorySizeFactor --- End diff -- compressionFactor --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20072: [SPARK-22790][SQL] add a configurable factor to d...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20072#discussion_r160077189 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -263,6 +263,17 @@ object SQLConf { .booleanConf .createWithDefault(false) + val DISK_TO_MEMORY_SIZE_FACTOR = buildConf( +"spark.sql.sources.compressionFactor") +.internal() +.doc("The result of multiplying this factor with the size of data source files is propagated " + + "to serve as the stats to choose the best execution plan. In the case where the " + --- End diff -- `When estimating the output data size of a table scan, multiple the file size with this factor as the estimated data size, in case the data is compressed in the file and lead to a heavily underestimated result.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20178: [Spark-22952][CORE] Deprecate stageAttemptId in f...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/20178#discussion_r160077159 --- Diff: core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala --- @@ -56,6 +56,8 @@ class StageInfo( completionTime = Some(System.currentTimeMillis) } + def attemptNumber(): Int = attemptId --- End diff -- h, maybe.. Let me try it out after work... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20072: [SPARK-22790][SQL] add a configurable factor to d...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20072#discussion_r160077024 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -263,6 +263,17 @@ object SQLConf { .booleanConf .createWithDefault(false) + val DISK_TO_MEMORY_SIZE_FACTOR = buildConf( --- End diff -- Rename this too, `FILE_COMRESSION_FACTOR` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20072: [SPARK-22790][SQL] add a configurable factor to d...
Github user CodingCat commented on a diff in the pull request: https://github.com/apache/spark/pull/20072#discussion_r160076999 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -263,6 +263,17 @@ object SQLConf { .booleanConf .createWithDefault(false) + val DISK_TO_MEMORY_SIZE_FACTOR = buildConf( +"spark.sql.sources.compressionFactor") +.internal() +.doc("The result of multiplying this factor with the size of data source files is propagated " + + "to serve as the stats to choose the best execution plan. In the case where the " + + "in-disk and in-memory size of data is significantly different, users can adjust this " + + "factor for a better choice of the execution plan. The default value is 1.0.") +.doubleConf +.checkValue(_ > 0, "the value of fileDataSizeFactor must be larger than 0") --- End diff -- it's not necessary to be that parquet is always smaller than memory size...e.g. in some simple dataset (like the one used in the test), parquet's overhead makes the overall size larger than in-memory size but with TPCDS dataset, I observed that parquet size is much smaller than in-memory size --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20072: [SPARK-22790][SQL] add a configurable factor to d...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20072#discussion_r160077002 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -263,6 +263,17 @@ object SQLConf { .booleanConf .createWithDefault(false) + val DISK_TO_MEMORY_SIZE_FACTOR = buildConf( +"spark.sql.sources.compressionFactor") --- End diff -- BTW, how about `fileCompressionFactor`? Since it works for only file-based data sources. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20072: [SPARK-22790][SQL] add a configurable factor to d...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20072#discussion_r160076965 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -263,6 +263,17 @@ object SQLConf { .booleanConf .createWithDefault(false) + val DISK_TO_MEMORY_SIZE_FACTOR = buildConf( +"spark.sql.sources.compressionFactor") +.internal() +.doc("The result of multiplying this factor with the size of data source files is propagated " + + "to serve as the stats to choose the best execution plan. In the case where the " + + "in-disk and in-memory size of data is significantly different, users can adjust this " + + "factor for a better choice of the execution plan. The default value is 1.0.") +.doubleConf +.checkValue(_ > 0, "the value of fileDataSizeFactor must be larger than 0") --- End diff -- BTW `fileDataSizeFactor` -> `compressionFactor` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20072: [SPARK-22790][SQL] add a configurable factor to d...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20072#discussion_r160076855 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -263,6 +263,17 @@ object SQLConf { .booleanConf .createWithDefault(false) + val DISK_TO_MEMORY_SIZE_FACTOR = buildConf( +"spark.sql.sources.compressionFactor") --- End diff -- merge this with the previous line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20072: [SPARK-22790][SQL] add a configurable factor to d...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20072#discussion_r160076809 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -263,6 +263,17 @@ object SQLConf { .booleanConf .createWithDefault(false) + val DISK_TO_MEMORY_SIZE_FACTOR = buildConf( +"spark.sql.sources.compressionFactor") +.internal() +.doc("The result of multiplying this factor with the size of data source files is propagated " + + "to serve as the stats to choose the best execution plan. In the case where the " + + "in-disk and in-memory size of data is significantly different, users can adjust this " + + "factor for a better choice of the execution plan. The default value is 1.0.") +.doubleConf +.checkValue(_ > 0, "the value of fileDataSizeFactor must be larger than 0") --- End diff -- maybe `>= 1.0`? it's weird to see a compression factor less than 1. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20178: [Spark-22952][CORE] Deprecate stageAttemptId in f...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20178#discussion_r160076700 --- Diff: core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala --- @@ -56,6 +56,8 @@ class StageInfo( completionTime = Some(System.currentTimeMillis) } + def attemptNumber(): Int = attemptId --- End diff -- hmmm, can we add a new constructor for `attemptId`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20178: [Spark-22952][CORE] Deprecate stageAttemptId in f...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20178#discussion_r160076571 --- Diff: core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala --- @@ -56,6 +56,8 @@ class StageInfo( completionTime = Some(System.currentTimeMillis) } + def attemptNumber(): Int = attemptId --- End diff -- ah i see, let's keep it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160076528 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. + */ + private org.apache.orc.RecordReader recordReader; + + /** + * Required Schema. + */ + private StructType requiredSchema; + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private ColumnarBatch columnarBatch; + + /** + * Writable column vectors of ColumnarBatch. + */ + private WritableColumnVector[] columnVectors; + + /** + * The number of rows read and considered to be returned. + */ + private long rowsReturned = 0L; + + /** + * Total number of rows. + */ + private long totalRowCount = 0L; + + @Override + public Void getCurrentKey() throws IOException, InterruptedException { +return null; + } + + @Override + public ColumnarBatch getCurrentValue() throws IOException, InterruptedException { +return columnarBatch; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return (float) rowsReturned / totalRowCount; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +return nextBatch(); + } + + @Override + public void close() throws IOException { +if (columnarBatch != null) { + columnarBatch.close(); + columnarBatch = null; +} +if (recordReader != null) { + recordReader.close(); + recordReader = null; +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { +
[GitHub] spark pull request #20178: [Spark-22952][CORE] Deprecate stageAttemptId in f...
Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/20178#discussion_r160076462 --- Diff: core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala --- @@ -56,6 +56,8 @@ class StageInfo( completionTime = Some(System.currentTimeMillis) } + def attemptNumber(): Int = attemptId --- End diff -- If we go this way, I believe it would break source compatibility for Developer API. Code like `new StageInfo(stageId = xx, attemptId = yy, ...)` couldn't by compiled any more. Not sure about binary compatibility --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20151: [SPARK-22959][PYTHON] Configuration to select the module...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/20151 The changes LGTM. Btw, what if we miss the module in python path? Can we see that the error is caused by the missing module from the exception message? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20178: [Spark-22952][CORE] Deprecate stageAttemptId in f...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20178#discussion_r160076144 --- Diff: core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala --- @@ -56,6 +56,8 @@ class StageInfo( completionTime = Some(System.currentTimeMillis) } + def attemptNumber(): Int = attemptId --- End diff -- how about ``` class StageInfo( ... val attempNumber: Int, ... { @deprecated def attempId: Int = attemptNumber } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/13599 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85777/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/13599 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13599: [SPARK-13587] [PYSPARK] Support virtualenv in pyspark
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/13599 **[Test build #85777 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85777/testReport)** for PR 13599 at commit [`05558b5`](https://github.com/apache/spark/commit/05558b5cccb2f47dca96913f1ce775b48cd9e260). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20165: [SPARK-22972] Couldn't find corresponding Hive SerDe for...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20165 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20165: [SPARK-22972] Couldn't find corresponding Hive SerDe for...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20165 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85779/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20165: [SPARK-22972] Couldn't find corresponding Hive SerDe for...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20165 **[Test build #85779 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85779/testReport)** for PR 20165 at commit [`cc4dd13`](https://github.com/apache/spark/commit/cc4dd13c8d65aec3758126c7e163fde56bf6033f). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19943: [SPARK-16060][SQL] Support Vectorized ORC Reader
Github user henrify commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160075503 --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/JavaOrcColumnarBatchReader.java --- @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.orc.OrcConf; +import org.apache.orc.OrcFile; +import org.apache.orc.Reader; +import org.apache.orc.TypeDescription; +import org.apache.orc.mapred.OrcInputFormat; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.*; +import org.apache.orc.storage.serde2.io.HiveDecimalWritable; + +import org.apache.spark.memory.MemoryMode; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; +import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; +import org.apache.spark.sql.execution.vectorized.WritableColumnVector; +import org.apache.spark.sql.types.*; +import org.apache.spark.sql.vectorized.ColumnarBatch; + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +public class JavaOrcColumnarBatchReader extends RecordReader{ + + /** + * ORC File Reader. + */ + private Reader reader; + + /** + * Vectorized Row Batch. + */ + private VectorizedRowBatch batch; + + /** + * Requested Column IDs. + */ + private int[] requestedColIds; + + /** + * Record reader from row batch. + */ + private org.apache.orc.RecordReader recordReader; + + /** + * Required Schema. + */ + private StructType requiredSchema; + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private ColumnarBatch columnarBatch; + + /** + * Writable column vectors of ColumnarBatch. + */ + private WritableColumnVector[] columnVectors; + + /** + * The number of rows read and considered to be returned. + */ + private long rowsReturned = 0L; + + /** + * Total number of rows. + */ + private long totalRowCount = 0L; + + @Override + public Void getCurrentKey() throws IOException, InterruptedException { +return null; + } + + @Override + public ColumnarBatch getCurrentValue() throws IOException, InterruptedException { +return columnarBatch; + } + + @Override + public float getProgress() throws IOException, InterruptedException { +return (float) rowsReturned / totalRowCount; + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { +return nextBatch(); + } + + @Override + public void close() throws IOException { +if (columnarBatch != null) { + columnarBatch.close(); + columnarBatch = null; +} +if (recordReader != null) { + recordReader.close(); + recordReader = null; +} + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) + throws IOException, InterruptedException { +
[GitHub] spark issue #20165: [SPARK-22972] Couldn't find corresponding Hive SerDe for...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20165 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85778/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20165: [SPARK-22972] Couldn't find corresponding Hive SerDe for...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20165 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20165: [SPARK-22972] Couldn't find corresponding Hive SerDe for...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20165 **[Test build #85778 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85778/testReport)** for PR 20165 at commit [`1cfa72f`](https://github.com/apache/spark/commit/1cfa72f12c417ad949abcf3344be2a461c38b246). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19792: [SPARK-22566][PYTHON] Better error message for `_merge_t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19792 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19792: [SPARK-22566][PYTHON] Better error message for `_merge_t...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19792 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85784/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19792: [SPARK-22566][PYTHON] Better error message for `_merge_t...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19792 **[Test build #85784 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85784/testReport)** for PR 19792 at commit [`6d171dd`](https://github.com/apache/spark/commit/6d171dda179ecdbe95dbc959c961397e08b8b537). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20184: [SPARK-22987][Core] UnsafeExternalSorter cases OOM when ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20184 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19792: [SPARK-22566][PYTHON] Better error message for `_merge_t...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/19792 LGTM, pending Jenkins. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20184: [SPARK-22987][Core] UnsafeExternalSorter cases OO...
GitHub user liutang123 opened a pull request: https://github.com/apache/spark/pull/20184 [SPARK-22987][Core] UnsafeExternalSorter cases OOM when invoking `getIterator` function. ## What changes were proposed in this pull request? ChainedIterator.UnsafeExternalSorter remains a Queue of UnsafeSorterIterator. When call `getIterator` function of UnsafeExternalSorter, UnsafeExternalSorter passes an ArrayList of UnsafeSorterSpillReader to the constructor of UnsafeExternalSorter. But, UnsafeSorterSpillReader maintains a byte array as buffer, witch capacity is more than 1 MB. When spilling frequently, this case maybe causes OOM. I try to change the Queue in ChainedIterator to a Iterator. ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/liutang123/spark SPARK-22987 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20184.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20184 commit d57ce865729ce4d0d84a0fee0edf4dd6febe54bc Author: liutang123Date: 2018-01-08T04:33:51Z [SPARK-22987][Core] UnsafeExternalSorter cases OOM when invoking `getIterator` function. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19876: [ML][SPARK-11171][SPARK-11239] Add PMML export to Spark ...
Github user holdenk commented on the issue: https://github.com/apache/spark/pull/19876 I'm going to update this tomorrow, but if no one has anything by EOW would folks be OK with this as an experimental developer API for 2.3? cc @JoshRosen ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20178: [Spark-22952][CORE] Deprecate stageAttemptId in favour o...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20178 **[Test build #85785 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85785/testReport)** for PR 20178 at commit [`2ec9197`](https://github.com/apache/spark/commit/2ec919761ced379f00e1fa9804a66e3b15e9d2e9). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org