Repository: spark Updated Branches: refs/heads/master 30b18e693 -> cabe1df86
[SPARK-12334][SQL][PYSPARK] Support read from multiple input paths for orc file in DataFrameReader.orc Beside the issue in spark api, also fix 2 minor issues in pyspark - support read from multiple input paths for orc - support read from multiple input paths for text Author: Jeff Zhang <zjf...@apache.org> Closes #10307 from zjffdu/SPARK-12334. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cabe1df8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cabe1df8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cabe1df8 Branch: refs/heads/master Commit: cabe1df8606e7e5b9e6efb106045deb3f39f5f13 Parents: 30b18e6 Author: Jeff Zhang <zjf...@apache.org> Authored: Thu Mar 9 11:44:34 2017 -0800 Committer: Holden Karau <hol...@us.ibm.com> Committed: Thu Mar 9 11:44:34 2017 -0800 ---------------------------------------------------------------------- python/pyspark/sql/readwriter.py | 14 ++++++++------ python/pyspark/sql/tests.py | 5 +++++ .../scala/org/apache/spark/sql/DataFrameReader.scala | 6 +++--- .../org/apache/spark/sql/hive/orc/OrcQuerySuite.scala | 9 +++++++++ 4 files changed, 25 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/cabe1df8/python/pyspark/sql/readwriter.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 45fb9b7..4354345 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -161,7 +161,7 @@ class DataFrameReader(OptionUtils): mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None, timeZone=None, wholeFile=None): """ - Loads a JSON file and returns the results as a :class:`DataFrame`. + Loads JSON files and returns the results as a :class:`DataFrame`. `JSON Lines <http://jsonlines.org/>`_(newline-delimited JSON) is supported by default. For JSON (one record per file), set the `wholeFile` parameter to ``true``. @@ -169,7 +169,7 @@ class DataFrameReader(OptionUtils): If the ``schema`` parameter is not specified, this function goes through the input once to determine the input schema. - :param path: string represents path to the JSON dataset, + :param path: string represents path to the JSON dataset, or a list of paths, or RDD of Strings storing JSON objects. :param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema. :param primitivesAsString: infers all primitive values as a string type. If None is set, @@ -252,7 +252,7 @@ class DataFrameReader(OptionUtils): jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString()) return self._df(self._jreader.json(jrdd)) else: - raise TypeError("path can be only string or RDD") + raise TypeError("path can be only string, list or RDD") @since(1.4) def table(self, tableName): @@ -269,7 +269,7 @@ class DataFrameReader(OptionUtils): @since(1.4) def parquet(self, *paths): - """Loads a Parquet file, returning the result as a :class:`DataFrame`. + """Loads Parquet files, returning the result as a :class:`DataFrame`. You can set the following Parquet-specific option(s) for reading Parquet files: * ``mergeSchema``: sets whether we should merge schemas collected from all \ @@ -407,7 +407,7 @@ class DataFrameReader(OptionUtils): @since(1.5) def orc(self, path): - """Loads an ORC file, returning the result as a :class:`DataFrame`. + """Loads ORC files, returning the result as a :class:`DataFrame`. .. note:: Currently ORC support is only available together with Hive support. @@ -415,7 +415,9 @@ class DataFrameReader(OptionUtils): >>> df.dtypes [('a', 'bigint'), ('b', 'int'), ('c', 'int')] """ - return self._df(self._jreader.orc(path)) + if isinstance(path, basestring): + path = [path] + return self._df(self._jreader.orc(_to_seq(self._spark._sc, path))) @since(1.4) def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, http://git-wip-us.apache.org/repos/asf/spark/blob/cabe1df8/python/pyspark/sql/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 1b873e9..f0a9a04 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -450,6 +450,11 @@ class SQLTests(ReusedPySparkTestCase): Row(_c0=u'Hyukjin', _c1=u'25', _c2=u'I am Hyukjin\n\nI love Spark!')] self.assertEqual(ages_newlines.collect(), expected) + def test_read_multiple_orc_file(self): + df = self.spark.read.orc(["python/test_support/sql/orc_partitioned/b=0/c=0", + "python/test_support/sql/orc_partitioned/b=1/c=1"]) + self.assertEqual(2, df.count()) + def test_udf_with_input_file_name(self): from pyspark.sql.functions import udf, input_file_name from pyspark.sql.types import StringType http://git-wip-us.apache.org/repos/asf/spark/blob/cabe1df8/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index a5e38e2..4f4cc93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -262,7 +262,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { } /** - * Loads a JSON file and returns the results as a `DataFrame`. + * Loads JSON files and returns the results as a `DataFrame`. * * <a href="http://jsonlines.org/">JSON Lines</a> (newline-delimited JSON) is supported by * default. For JSON (one record per file), set the `wholeFile` option to true. @@ -438,7 +438,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { } /** - * Loads a CSV file and returns the result as a `DataFrame`. + * Loads CSV files and returns the result as a `DataFrame`. * * This function will go through the input once to determine the input schema if `inferSchema` * is enabled. To avoid going through the entire data once, disable `inferSchema` option or @@ -549,7 +549,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { } /** - * Loads an ORC file and returns the result as a `DataFrame`. + * Loads ORC files and returns the result as a `DataFrame`. * * @param paths input paths * @since 2.0.0 http://git-wip-us.apache.org/repos/asf/spark/blob/cabe1df8/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 38a5477..5d8ba9d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.util.Utils case class AllDataTypesWithNonPrimitiveType( stringField: String, @@ -611,4 +612,12 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } } + + test("read from multiple orc input paths") { + val path1 = Utils.createTempDir() + val path2 = Utils.createTempDir() + makeOrcFile((1 to 10).map(Tuple1.apply), path1) + makeOrcFile((1 to 10).map(Tuple1.apply), path2) + assertResult(20)(read.orc(path1.getCanonicalPath, path2.getCanonicalPath).count()) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org