This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a745381 [SPARK-25382][SQL][PYSPARK] Remove ImageSchema.readImages in 3.0 a745381 is described below commit a745381b9d3dd290057ef3089de7fdb9264f1f8b Author: WeichenXu <weichen...@databricks.com> AuthorDate: Wed Jul 31 14:26:18 2019 +0900 [SPARK-25382][SQL][PYSPARK] Remove ImageSchema.readImages in 3.0 ## What changes were proposed in this pull request? I remove the deprecate `ImageSchema.readImages`. Move some useful methods from class `ImageSchema` into class `ImageFileFormat`. In pyspark, I rename `ImageSchema` class to be `ImageUtils`, and keep some useful python methods in it. ## How was this patch tested? UT. Please review https://spark.apache.org/contributing.html before opening a pull request. Closes #25245 from WeichenXu123/remove_image_schema. Authored-by: WeichenXu <weichen...@databricks.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- .../org/apache/spark/ml/image/ImageSchema.scala | 72 --------- .../apache/spark/ml/image/ImageSchemaSuite.scala | 171 --------------------- .../ml/source/image/ImageFileFormatSuite.scala | 18 +++ project/MimaExcludes.scala | 6 +- python/pyspark/ml/image.py | 38 ----- python/pyspark/ml/tests/test_image.py | 29 ++-- 6 files changed, 42 insertions(+), 292 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala b/mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala index a7ddf2f..0313626 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/image/ImageSchema.scala @@ -191,76 +191,4 @@ object ImageSchema { Some(Row(Row(origin, height, width, nChannels, mode, decoded))) } } - - /** - * Read the directory of images from the local or remote source - * - * @note If multiple jobs are run in parallel with different sampleRatio or recursive flag, - * there may be a race condition where one job overwrites the hadoop configs of another. - * @note If sample ratio is less than 1, sampling uses a PathFilter that is efficient but - * potentially non-deterministic. - * - * @param path Path to the image directory - * @return DataFrame with a single column "image" of images; - * see ImageSchema for the details - */ - @deprecated("use `spark.read.format(\"image\").load(path)` and this `readImages` will be " + - "removed in 3.0.0.", "2.4.0") - def readImages(path: String): DataFrame = readImages(path, null, false, -1, false, 1.0, 0) - - /** - * Read the directory of images from the local or remote source - * - * @note If multiple jobs are run in parallel with different sampleRatio or recursive flag, - * there may be a race condition where one job overwrites the hadoop configs of another. - * @note If sample ratio is less than 1, sampling uses a PathFilter that is efficient but - * potentially non-deterministic. - * - * @param path Path to the image directory - * @param sparkSession Spark Session, if omitted gets or creates the session - * @param recursive Recursive path search flag - * @param numPartitions Number of the DataFrame partitions, - * if omitted uses defaultParallelism instead - * @param dropImageFailures Drop the files that are not valid images from the result - * @param sampleRatio Fraction of the files loaded - * @return DataFrame with a single column "image" of images; - * see ImageSchema for the details - */ - @deprecated("use `spark.read.format(\"image\").load(path)` and this `readImages` will be " + - "removed in 3.0.0.", "2.4.0") - def readImages( - path: String, - sparkSession: SparkSession, - recursive: Boolean, - numPartitions: Int, - dropImageFailures: Boolean, - sampleRatio: Double, - seed: Long): DataFrame = { - require(sampleRatio <= 1.0 && sampleRatio >= 0, "sampleRatio should be between 0 and 1") - - val session = if (sparkSession != null) sparkSession else SparkSession.builder().getOrCreate - val partitions = - if (numPartitions > 0) { - numPartitions - } else { - session.sparkContext.defaultParallelism - } - - RecursiveFlag.withRecursiveFlag(recursive, session) { - SamplePathFilter.withPathFilter(sampleRatio, session, seed) { - val binResult = session.sparkContext.binaryFiles(path, partitions) - val streams = if (numPartitions == -1) binResult else binResult.repartition(partitions) - val convert = (origin: String, bytes: PortableDataStream) => - decode(origin, bytes.toArray()) - val images = if (dropImageFailures) { - streams.flatMap { case (origin, bytes) => convert(origin, bytes) } - } else { - streams.map { case (origin, bytes) => - convert(origin, bytes).getOrElse(invalidImageRow(origin)) - } - } - session.createDataFrame(images, imageSchema) - } - } - } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/image/ImageSchemaSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/image/ImageSchemaSuite.scala deleted file mode 100644 index e16ec906..0000000 --- a/mllib/src/test/scala/org/apache/spark/ml/image/ImageSchemaSuite.scala +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ml.image - -import java.nio.file.Paths -import java.util.Arrays - -import org.apache.spark.SparkFunSuite -import org.apache.spark.ml.image.ImageSchema._ -import org.apache.spark.mllib.util.MLlibTestSparkContext -import org.apache.spark.sql.Row -import org.apache.spark.sql.types._ - -class ImageSchemaSuite extends SparkFunSuite with MLlibTestSparkContext { - // Single column of images named "image" - private lazy val imagePath = "../data/mllib/images/origin" - - test("Smoke test: create basic ImageSchema dataframe") { - val origin = "path" - val width = 1 - val height = 1 - val nChannels = 3 - val data = Array[Byte](0, 0, 0) - val mode = ocvTypes("CV_8UC3") - - // Internal Row corresponds to image StructType - val rows = Seq(Row(Row(origin, height, width, nChannels, mode, data)), - Row(Row(null, height, width, nChannels, mode, data))) - val rdd = sc.makeRDD(rows) - val df = spark.createDataFrame(rdd, ImageSchema.imageSchema) - - assert(df.count === 2, "incorrect image count") - assert(df.schema("image").dataType == columnSchema, "data do not fit ImageSchema") - } - - test("readImages count test") { - var df = readImages(imagePath) - assert(df.count === 1) - - df = readImages(imagePath, null, true, -1, false, 1.0, 0) - assert(df.count === 10) - - df = readImages(imagePath, null, true, -1, true, 1.0, 0) - val countTotal = df.count - assert(countTotal === 8) - - df = readImages(imagePath, null, true, -1, true, 0.5, 0) - // Random number about half of the size of the original dataset - val count50 = df.count - assert(count50 > 0 && count50 < countTotal) - } - - test("readImages test: recursive = false") { - val df = readImages(imagePath, null, false, 3, true, 1.0, 0) - assert(df.count() === 0) - } - - test("readImages test: read jpg image") { - val df = readImages(imagePath + "/kittens/DP153539.jpg", null, false, 3, true, 1.0, 0) - assert(df.count() === 1) - } - - test("readImages test: read png image") { - val df = readImages(imagePath + "/multi-channel/BGRA.png", null, false, 3, true, 1.0, 0) - assert(df.count() === 1) - } - - test("readImages test: read non image") { - val df = readImages(imagePath + "/kittens/not-image.txt", null, false, 3, true, 1.0, 0) - assert(df.schema("image").dataType == columnSchema, "data do not fit ImageSchema") - assert(df.count() === 0) - } - - test("readImages test: read non image and dropImageFailures is false") { - val df = readImages(imagePath + "/kittens/not-image.txt", null, false, 3, false, 1.0, 0) - assert(df.count() === 1) - } - - test("readImages test: sampleRatio > 1") { - val e = intercept[IllegalArgumentException] { - readImages(imagePath, null, true, 3, true, 1.1, 0) - } - assert(e.getMessage.contains("sampleRatio")) - } - - test("readImages test: sampleRatio < 0") { - val e = intercept[IllegalArgumentException] { - readImages(imagePath, null, true, 3, true, -0.1, 0) - } - assert(e.getMessage.contains("sampleRatio")) - } - - test("readImages test: sampleRatio = 0") { - val df = readImages(imagePath, null, true, 3, true, 0.0, 0) - assert(df.count() === 0) - } - - test("readImages test: with sparkSession") { - val df = readImages(imagePath, sparkSession = spark, true, 3, true, 1.0, 0) - assert(df.count() === 8) - } - - test("readImages partition test") { - val df = readImages(imagePath, null, true, 3, true, 1.0, 0) - assert(df.rdd.getNumPartitions === 3) - } - - test("readImages partition test: < 0") { - val df = readImages(imagePath, null, true, -3, true, 1.0, 0) - assert(df.rdd.getNumPartitions === spark.sparkContext.defaultParallelism) - } - - test("readImages partition test: = 0") { - val df = readImages(imagePath, null, true, 0, true, 1.0, 0) - assert(df.rdd.getNumPartitions === spark.sparkContext.defaultParallelism) - } - - // Images with the different number of channels - test("readImages pixel values test") { - - val images = readImages(imagePath + "/multi-channel/").collect - - images.foreach { rrow => - val row = rrow.getAs[Row](0) - val filename = Paths.get(getOrigin(row)).getFileName().toString() - if (firstBytes20.contains(filename)) { - val mode = getMode(row) - val bytes20 = getData(row).slice(0, 20) - - val (expectedMode, expectedBytes) = firstBytes20(filename) - assert(ocvTypes(expectedMode) === mode, "mode of the image is not read correctly") - assert(Arrays.equals(expectedBytes, bytes20), "incorrect numeric value for flattened image") - } - } - } - - // number of channels and first 20 bytes of OpenCV representation - // - default representation for 3-channel RGB images is BGR row-wise: - // (B00, G00, R00, B10, G10, R10, ...) - // - default representation for 4-channel RGB images is BGRA row-wise: - // (B00, G00, R00, A00, B10, G10, R10, A10, ...) - private val firstBytes20 = Map( - "grayscale.jpg" -> - (("CV_8UC1", Array[Byte](-2, -33, -61, -60, -59, -59, -64, -59, -66, -67, -73, -73, -62, - -57, -60, -63, -53, -49, -55, -69))), - "chr30.4.184.jpg" -> (("CV_8UC3", - Array[Byte](-9, -3, -1, -43, -32, -28, -75, -60, -57, -78, -59, -56, -74, -59, -57, - -71, -58, -56, -73, -64))), - "BGRA.png" -> (("CV_8UC4", - Array[Byte](-128, -128, -8, -1, -128, -128, -8, -1, -128, - -128, -8, -1, 127, 127, -9, -1, 127, 127, -9, -1))), - "BGRA_alpha_60.png" -> (("CV_8UC4", - Array[Byte](-128, -128, -8, 60, -128, -128, -8, 60, -128, - -128, -8, 60, 127, 127, -9, 60, 127, 127, -9, 60))) - ) -} diff --git a/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala index 38bb246..0ec2747 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala @@ -32,6 +32,24 @@ class ImageFileFormatSuite extends SparkFunSuite with MLlibTestSparkContext { private lazy val imagePath = "../data/mllib/images/partitioned" private lazy val recursiveImagePath = "../data/mllib/images" + test("Smoke test: create basic ImageSchema dataframe") { + val origin = "path" + val width = 1 + val height = 1 + val nChannels = 3 + val data = Array[Byte](0, 0, 0) + val mode = ocvTypes("CV_8UC3") + + // Internal Row corresponds to image StructType + val rows = Seq(Row(Row(origin, height, width, nChannels, mode, data)), + Row(Row(null, height, width, nChannels, mode, data))) + val rdd = sc.makeRDD(rows) + val df = spark.createDataFrame(rdd, imageSchema) + + assert(df.count === 2, "incorrect image count") + assert(df.schema("image").dataType == columnSchema, "data do not fit ImageSchema") + } + test("image datasource count test") { val df1 = spark.read.format("image").load(imagePath) assert(df1.count === 9) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 51d5861..4afca5a 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -380,7 +380,11 @@ object MimaExcludes { // [SPARK-28556][SQL] QueryExecutionListener should also notify Error ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure") + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.util.QueryExecutionListener.onFailure"), + + // [SPARK-25382][SQL][PYSPARK] Remove ImageSchema.readImages in 3.0 + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.image.ImageSchema.readImages"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.image.ImageSchema.readImages") ) // Exclude rules for 2.4.x diff --git a/python/pyspark/ml/image.py b/python/pyspark/ml/image.py index a1aacea..4fb1036 100644 --- a/python/pyspark/ml/image.py +++ b/python/pyspark/ml/image.py @@ -203,44 +203,6 @@ class _ImageSchema(object): return _create_row(self.imageFields, [origin, height, width, nChannels, mode, data]) - def readImages(self, path, recursive=False, numPartitions=-1, - dropImageFailures=False, sampleRatio=1.0, seed=0): - """ - Reads the directory of images from the local or remote source. - - .. note:: If multiple jobs are run in parallel with different sampleRatio or recursive flag, - there may be a race condition where one job overwrites the hadoop configs of another. - - .. note:: If sample ratio is less than 1, sampling uses a PathFilter that is efficient but - potentially non-deterministic. - - .. note:: Deprecated in 2.4.0. Use `spark.read.format("image").load(path)` instead and - this `readImages` will be removed in 3.0.0. - - :param str path: Path to the image directory. - :param bool recursive: Recursive search flag. - :param int numPartitions: Number of DataFrame partitions. - :param bool dropImageFailures: Drop the files that are not valid images. - :param float sampleRatio: Fraction of the images loaded. - :param int seed: Random number seed. - :return: a :class:`DataFrame` with a single column of "images", - see ImageSchema for details. - - >>> df = ImageSchema.readImages('data/mllib/images/origin/kittens', recursive=True) - >>> df.count() - 5 - - .. versionadded:: 2.3.0 - """ - warnings.warn("`ImageSchema.readImage` is deprecated. " + - "Use `spark.read.format(\"image\").load(path)` instead.", DeprecationWarning) - spark = SparkSession.builder.getOrCreate() - image_schema = spark._jvm.org.apache.spark.ml.image.ImageSchema - jsession = spark._jsparkSession - jresult = image_schema.readImages(path, jsession, recursive, numPartitions, - dropImageFailures, float(sampleRatio), seed) - return DataFrame(jresult, spark._wrapped) - ImageSchema = _ImageSchema() diff --git a/python/pyspark/ml/tests/test_image.py b/python/pyspark/ml/tests/test_image.py index 95efa73..0008b0b 100644 --- a/python/pyspark/ml/tests/test_image.py +++ b/python/pyspark/ml/tests/test_image.py @@ -24,18 +24,24 @@ from pyspark.sql import HiveContext, Row from pyspark.testing.utils import QuietTest -class ImageReaderTest(SparkSessionTestCase): +class ImageFileFormatTest(SparkSessionTestCase): def test_read_images(self): data_path = 'data/mllib/images/origin/kittens' - df = ImageSchema.readImages(data_path, recursive=True, dropImageFailures=True) + df = self.spark.read.format("image") \ + .option("dropInvalid", True) \ + .option("recursiveFileLookup", True) \ + .load(data_path) self.assertEqual(df.count(), 4) first_row = df.take(1)[0][0] + # compare `schema.simpleString()` instead of directly compare schema, + # because the df loaded from datasouce may change schema column nullability. + self.assertEqual(df.schema.simpleString(), ImageSchema.imageSchema.simpleString()) + self.assertEqual(df.schema["image"].dataType.simpleString(), + ImageSchema.columnSchema.simpleString()) array = ImageSchema.toNDArray(first_row) self.assertEqual(len(array), first_row[1]) self.assertEqual(ImageSchema.toImage(array, origin=first_row[0]), first_row) - self.assertEqual(df.schema, ImageSchema.imageSchema) - self.assertEqual(df.schema["image"].dataType, ImageSchema.columnSchema) expected = {'CV_8UC3': 16, 'Undefined': -1, 'CV_8U': 0, 'CV_8UC1': 0, 'CV_8UC4': 24} self.assertEqual(ImageSchema.ocvTypes, expected) expected = ['origin', 'height', 'width', 'nChannels', 'mode', 'data'] @@ -61,11 +67,11 @@ class ImageReaderTest(SparkSessionTestCase): lambda: ImageSchema.toImage("a")) -class ImageReaderTest2(PySparkTestCase): +class ImageFileFormatOnHiveContextTest(PySparkTestCase): @classmethod def setUpClass(cls): - super(ImageReaderTest2, cls).setUpClass() + super(ImageFileFormatOnHiveContextTest, cls).setUpClass() cls.hive_available = True # Note that here we enable Hive's support. cls.spark = None @@ -86,17 +92,20 @@ class ImageReaderTest2(PySparkTestCase): @classmethod def tearDownClass(cls): - super(ImageReaderTest2, cls).tearDownClass() + super(ImageFileFormatOnHiveContextTest, cls).tearDownClass() if cls.spark is not None: cls.spark.sparkSession.stop() cls.spark = None def test_read_images_multiple_times(self): - # This test case is to check if `ImageSchema.readImages` tries to + # This test case is to check if ImageFileFormat tries to # initiate Hive client multiple times. See SPARK-22651. data_path = 'data/mllib/images/origin/kittens' - ImageSchema.readImages(data_path, recursive=True, dropImageFailures=True) - ImageSchema.readImages(data_path, recursive=True, dropImageFailures=True) + for i in range(2): + self.spark.read.format("image") \ + .option("dropInvalid", True) \ + .option("recursiveFileLookup", True) \ + .load(data_path) if __name__ == "__main__": --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org