[GitHub] spark issue #18538: [SPARK-14516][ML] Adding ClusteringEvaluator with the im...
Github user yanboliang commented on the issue: https://github.com/apache/spark/pull/18538 @mgaido91 These are my last comments, it should be ready to merge once they are addressed. Thanks for your contribution. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18538: [SPARK-14516][ML] Adding ClusteringEvaluator with...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/18538#discussion_r138255937 --- Diff: mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala --- @@ -0,0 +1,438 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.evaluation + +import org.apache.spark.SparkContext +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.ml.linalg.{BLAS, DenseVector, Vector, Vectors, VectorUDT} +import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators} +import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol} +import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, SchemaUtils} +import org.apache.spark.sql.{DataFrame, Dataset} +import org.apache.spark.sql.functions.{avg, col, udf} +import org.apache.spark.sql.types.IntegerType + +/** + * :: Experimental :: + * Evaluator for clustering results. + * The metric computes the Silhouette measure + * using the squared Euclidean distance. + * + * The Silhouette is a measure for the validation + * of the consistency within clusters. It ranges + * between 1 and -1, where a value close to 1 + * means that the points in a cluster are close + * to the other points in the same cluster and + * far from the points of the other clusters. + */ +@Experimental +@Since("2.3.0") +class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") override val uid: String) + extends Evaluator with HasPredictionCol with HasFeaturesCol with DefaultParamsWritable { + + @Since("2.3.0") + def this() = this(Identifiable.randomUID("cluEval")) + + @Since("2.3.0") + override def copy(pMap: ParamMap): ClusteringEvaluator = this.defaultCopy(pMap) + + @Since("2.3.0") + override def isLargerBetter: Boolean = true + + /** @group setParam */ + @Since("2.3.0") + def setPredictionCol(value: String): this.type = set(predictionCol, value) + + /** @group setParam */ + @Since("2.3.0") + def setFeaturesCol(value: String): this.type = set(featuresCol, value) + + /** + * param for metric name in evaluation + * (supports `"silhouette"` (default)) + * @group param + */ + @Since("2.3.0") + val metricName: Param[String] = { +val allowedParams = ParamValidators.inArray(Array("silhouette")) +new Param( + this, + "metricName", + "metric name in evaluation (silhouette)", + allowedParams +) + } + + /** @group getParam */ + @Since("2.3.0") + def getMetricName: String = $(metricName) + + /** @group setParam */ + @Since("2.3.0") + def setMetricName(value: String): this.type = set(metricName, value) + + setDefault(metricName -> "silhouette") + + @Since("2.3.0") + override def evaluate(dataset: Dataset[_]): Double = { +SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT) +SchemaUtils.checkColumnType(dataset.schema, $(predictionCol), IntegerType) --- End diff -- We should support all numeric type for prediction column, not only integer. ``` SchemaUtils.checkNumericType(schema, $(labelCol)) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18538: [SPARK-14516][ML] Adding ClusteringEvaluator with...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/18538#discussion_r138256035 --- Diff: mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala --- @@ -0,0 +1,438 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.evaluation + +import org.apache.spark.SparkContext +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.ml.linalg.{BLAS, DenseVector, Vector, Vectors, VectorUDT} +import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators} +import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol} +import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, SchemaUtils} +import org.apache.spark.sql.{DataFrame, Dataset} +import org.apache.spark.sql.functions.{avg, col, udf} +import org.apache.spark.sql.types.IntegerType + +/** + * :: Experimental :: + * Evaluator for clustering results. + * The metric computes the Silhouette measure + * using the squared Euclidean distance. + * + * The Silhouette is a measure for the validation + * of the consistency within clusters. It ranges + * between 1 and -1, where a value close to 1 + * means that the points in a cluster are close + * to the other points in the same cluster and + * far from the points of the other clusters. + */ +@Experimental +@Since("2.3.0") +class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") override val uid: String) + extends Evaluator with HasPredictionCol with HasFeaturesCol with DefaultParamsWritable { + + @Since("2.3.0") + def this() = this(Identifiable.randomUID("cluEval")) + + @Since("2.3.0") + override def copy(pMap: ParamMap): ClusteringEvaluator = this.defaultCopy(pMap) + + @Since("2.3.0") + override def isLargerBetter: Boolean = true + + /** @group setParam */ + @Since("2.3.0") + def setPredictionCol(value: String): this.type = set(predictionCol, value) + + /** @group setParam */ + @Since("2.3.0") + def setFeaturesCol(value: String): this.type = set(featuresCol, value) + + /** + * param for metric name in evaluation + * (supports `"silhouette"` (default)) + * @group param + */ + @Since("2.3.0") + val metricName: Param[String] = { +val allowedParams = ParamValidators.inArray(Array("silhouette")) +new Param( + this, + "metricName", + "metric name in evaluation (silhouette)", + allowedParams +) --- End diff -- Reorg as: ``` val metricName: Param[String] = { val allowedParams = ParamValidators.inArray(Array("squaredSilhouette")) new Param( this, "metricName", "metric name in evaluation (squaredSilhouette)", allowedParams) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18538: [SPARK-14516][ML] Adding ClusteringEvaluator with...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/18538#discussion_r138255648 --- Diff: mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala --- @@ -0,0 +1,438 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.evaluation + +import org.apache.spark.SparkContext +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.ml.linalg.{BLAS, DenseVector, Vector, Vectors, VectorUDT} +import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators} +import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol} +import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, SchemaUtils} +import org.apache.spark.sql.{DataFrame, Dataset} +import org.apache.spark.sql.functions.{avg, col, udf} +import org.apache.spark.sql.types.IntegerType + +/** + * :: Experimental :: --- End diff -- Usually we leave a blank line under ```:: Experimental ::```. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18538: [SPARK-14516][ML] Adding ClusteringEvaluator with...
Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/18538#discussion_r138255474 --- Diff: mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala --- @@ -0,0 +1,438 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.evaluation + +import org.apache.spark.SparkContext +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.ml.linalg.{BLAS, DenseVector, Vector, Vectors, VectorUDT} +import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators} +import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol} +import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, SchemaUtils} +import org.apache.spark.sql.{DataFrame, Dataset} +import org.apache.spark.sql.functions.{avg, col, udf} +import org.apache.spark.sql.types.IntegerType + +/** + * :: Experimental :: + * Evaluator for clustering results. + * The metric computes the Silhouette measure + * using the squared Euclidean distance. + * + * The Silhouette is a measure for the validation + * of the consistency within clusters. It ranges + * between 1 and -1, where a value close to 1 + * means that the points in a cluster are close + * to the other points in the same cluster and + * far from the points of the other clusters. + */ +@Experimental +@Since("2.3.0") +class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") override val uid: String) + extends Evaluator with HasPredictionCol with HasFeaturesCol with DefaultParamsWritable { + + @Since("2.3.0") + def this() = this(Identifiable.randomUID("cluEval")) + + @Since("2.3.0") + override def copy(pMap: ParamMap): ClusteringEvaluator = this.defaultCopy(pMap) + + @Since("2.3.0") + override def isLargerBetter: Boolean = true + + /** @group setParam */ + @Since("2.3.0") + def setPredictionCol(value: String): this.type = set(predictionCol, value) + + /** @group setParam */ + @Since("2.3.0") + def setFeaturesCol(value: String): this.type = set(featuresCol, value) + + /** + * param for metric name in evaluation + * (supports `"silhouette"` (default)) + * @group param + */ + @Since("2.3.0") + val metricName: Param[String] = { +val allowedParams = ParamValidators.inArray(Array("silhouette")) +new Param( + this, + "metricName", + "metric name in evaluation (silhouette)", + allowedParams +) + } + + /** @group getParam */ + @Since("2.3.0") + def getMetricName: String = $(metricName) + + /** @group setParam */ + @Since("2.3.0") + def setMetricName(value: String): this.type = set(metricName, value) + + setDefault(metricName -> "silhouette") + + @Since("2.3.0") + override def evaluate(dataset: Dataset[_]): Double = { +SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT) +SchemaUtils.checkColumnType(dataset.schema, $(predictionCol), IntegerType) + +$(metricName) match { + case "silhouette" => SquaredEuclideanSilhouette.computeSilhouetteScore( +dataset, +$(predictionCol), +$(featuresCol) + ) --- End diff -- Reorg as: ``` $(metricName) match { case "squaredSilhouette" => SquaredEuclideanSilhouette.computeSilhouetteScore( dataset, $(predictionCol), $(featuresCol)) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19110: [SPARK-21027][ML][PYTHON] Added tunable parallelism to o...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19110 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19110: [SPARK-21027][ML][PYTHON] Added tunable parallelism to o...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19110 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81653/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19110: [SPARK-21027][ML][PYTHON] Added tunable parallelism to o...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19110 **[Test build #81653 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81653/testReport)** for PR 19110 at commit [`c24d4e2`](https://github.com/apache/spark/commit/c24d4e261550fea22e4ee95909f4b078186bc83c). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19196 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81656/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19196 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19196 **[Test build #81656 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81656/testReport)** for PR 19196 at commit [`2f94951`](https://github.com/apache/spark/commit/2f949517ea1d667aee8ca6838a374e222492c0c7). * This patch **fails to generate documentation**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19196 **[Test build #81656 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81656/testReport)** for PR 19196 at commit [`2f94951`](https://github.com/apache/spark/commit/2f949517ea1d667aee8ca6838a374e222492c0c7). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19197: [SPARK-18608][ML] Fix double caching
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19197 **[Test build #81654 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81654/testReport)** for PR 19197 at commit [`b485614`](https://github.com/apache/spark/commit/b4856147e04c3d57f2bfc70c70e3f136f46fa873). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16422: [SPARK-17642] [SQL] support DESC EXTENDED/FORMATTED tabl...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/16422 **[Test build #81655 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81655/testReport)** for PR 16422 at commit [`0d49ee9`](https://github.com/apache/spark/commit/0d49ee91508c908daef672a04768c15a9e5c5dba). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19197: [SPARK-18608][ML] Fix double caching
GitHub user zhengruifeng opened a pull request: https://github.com/apache/spark/pull/19197 [SPARK-18608][ML] Fix double caching ## What changes were proposed in this pull request? `df.rdd.getStorageLevel` => `df.storageLevel` using cmd `find . -name '*.scala' | xargs -i bash -c 'egrep -in "\.rdd\.getStorageLevel" {} && echo {}'` to make sure all algs involved in this issue are fixed. Previous discussion in other PRs: https://github.com/apache/spark/pull/19107, https://github.com/apache/spark/pull/17014 ## How was this patch tested? existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhengruifeng/spark double_caching Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19197.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19197 commit b4856147e04c3d57f2bfc70c70e3f136f46fa873 Author: Zheng RuiFengDate: 2017-09-12T04:53:35Z recreate pr --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16422: [SPARK-17642] [SQL] support DESC EXTENDED/FORMATTED tabl...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/16422 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19196 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19196 **[Test build #81652 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81652/testReport)** for PR 19196 at commit [`090044c`](https://github.com/apache/spark/commit/090044ca089870befff464d37f098c4d4fd19657). * This patch **fails to generate documentation**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19196 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81652/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19122: [SPARK-21911][ML][PySpark] Parallel Model Evaluat...
Github user WeichenXu123 commented on a diff in the pull request: https://github.com/apache/spark/pull/19122#discussion_r138249937 --- Diff: python/pyspark/ml/param/_shared_params_code_gen.py --- @@ -152,6 +152,8 @@ def get$Name(self): ("varianceCol", "column name for the biased sample variance of prediction.", None, "TypeConverters.toString"), ("aggregationDepth", "suggested depth for treeAggregate (>= 2).", "2", + "TypeConverters.toInt"), +("parallelism", "number of threads to use when fitting models in parallel (>= 1).", "1", --- End diff -- No worry, #19110 will merge first and then I will merge it to this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19196 **[Test build #81652 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81652/testReport)** for PR 19196 at commit [`090044c`](https://github.com/apache/spark/commit/090044ca089870befff464d37f098c4d4fd19657). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19110: [SPARK-21027][ML][PYTHON] Added tunable parallelism to o...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19110 **[Test build #81653 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81653/testReport)** for PR 19110 at commit [`c24d4e2`](https://github.com/apache/spark/commit/c24d4e261550fea22e4ee95909f4b078186bc83c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...
GitHub user brkyvz opened a pull request: https://github.com/apache/spark/pull/19196 [SPARK-21977] SinglePartition optimizations break certain Streaming Stateful Aggregation requirements ## What changes were proposed in this pull request? This is a bit hard to explain as there are several issues here, I'll try my best. Here are the requirements: 1. A StructuredStreaming Source that can generate empty RDDs with 0 partitions 2. A StructuredStreaming query that uses the above source, performs a stateful aggregation (mapGroupsWithState, groupBy.count, ...), and coalesce's by 1 The crux of the problem is that when a dataset has a `coalesce(1)` call, it receives a `SinglePartition` partitioning scheme. This scheme satisfies most required distributions used for aggregations such as HashAggregateExec. This causes a world of problems: Symptom 1. If the input RDD has 0 partitions, the whole lineage will receive 0 partitions, nothing will be executed, the state store will not create any delta files. When this happens, the next trigger fails, because the StateStore fails to load the delta file for the previous trigger Symptom 2. Let's say that there was data. Then in this case, if you stop your stream, and change `coalesce(1)` with `coalesce(2)`, then restart your stream, your stream will fail, because `spark.sql.shuffle.partitions - 1` number of StateStores will fail to find its delta files. To fix the issues above, we must check that the partitioning of the child of a `StatefulOperator` satisfies: If the grouping expressions are empty: a) AllTuple distribution b) Single physical partition If the grouping expressions are non empty: a) Clustered distribution b) spark.sql.shuffle.partition # of partitions whether or not coalesce(1) exists in the plan, and whether or not the input RDD for the trigger has any data. Once you fix the above problem by adding an Exchange to the plan, you come across the following bug: If you call `coalesce(1).groupBy().count()` on a Streaming DataFrame, and if you have a trigger with no data, `StateStoreRestoreExec` doesn't return the prior state. However, for this specific aggregation, `HashAggregateExec` after the restore returns a (0, 0) row, since we're performing a count, and there is no data. Then this data gets stored in `StateStoreSaveExec` causing the previous counts to be overwritten and lost. ## How was this patch tested? Regression tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/brkyvz/spark sa-0 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19196.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19196 commit b7aeed6af2aaf6eb347dd0a492a62e6530900eb5 Author: Burak YavuzDate: 2017-09-08T18:36:02Z couldn't repro commit 4a7d1240196cc4660d33aef33d893526da5f0ceb Author: Burak Yavuz Date: 2017-09-11T17:44:15Z save commit 00fa5923c7663f58df72937626bfadac5dc2f1fd Author: Burak Yavuz Date: 2017-09-12T04:32:30Z ready for review commit 090044ca089870befff464d37f098c4d4fd19657 Author: Burak Yavuz Date: 2017-09-12T04:33:05Z uncomment --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19195: [DOCS] Fix unreachable links in the document
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19195#discussion_r138249514 --- Diff: docs/building-spark.md --- @@ -111,7 +111,7 @@ should run continuous compilation (i.e. wait for changes). However, this has not extensively. A couple of gotchas to note: * it only scans the paths `src/main` and `src/test` (see -[docs](http://scala-tools.org/mvnsites/maven-scala-plugin/usage_cc.html)), so it will only work +[docs](http://davidb.github.io/scala-maven-plugin/example_compile.html)), so it will only work --- End diff -- @sarutak, what do you think about using http://davidb.github.io/scala-maven-plugin/example_cc.html instead? (I [checked it](https://github.com/davidB/scala-maven-plugin/blob/2.9.1/src/site/xdoc/usage_cc.xml) although it looks not a big deal though as they have similar information). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19107: [SPARK-21799][ML] Fix `KMeans` performance regression ca...
Github user WeichenXu123 commented on the issue: https://github.com/apache/spark/pull/19107 OK. Thanks @zhengruifeng .I will close this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19107: [SPARK-21799][ML] Fix `KMeans` performance regres...
Github user WeichenXu123 closed the pull request at: https://github.com/apache/spark/pull/19107 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18317: [SPARK-21113][CORE] Read ahead input stream to amortize ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18317 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18317: [SPARK-21113][CORE] Read ahead input stream to amortize ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18317 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81649/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18317: [SPARK-21113][CORE] Read ahead input stream to amortize ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18317 **[Test build #81649 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81649/testReport)** for PR 18317 at commit [`36ae448`](https://github.com/apache/spark/commit/36ae4482d3193509c6dc06a79b8ebfe5a2b88752). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18317: [SPARK-21113][CORE] Read ahead input stream to amortize ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18317 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18317: [SPARK-21113][CORE] Read ahead input stream to amortize ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18317 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81648/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18317: [SPARK-21113][CORE] Read ahead input stream to amortize ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18317 **[Test build #81648 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81648/testReport)** for PR 18317 at commit [`ed426f3`](https://github.com/apache/spark/commit/ed426f33f1d858c74f380218e520127b66f43453). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19132: [SPARK-21922] Fix duration always updating when t...
Github user caneGuy commented on a diff in the pull request: https://github.com/apache/spark/pull/19132#discussion_r138243757 --- Diff: core/src/main/scala/org/apache/spark/ui/SparkUI.scala --- @@ -50,6 +50,7 @@ private[spark] class SparkUI private ( val operationGraphListener: RDDOperationGraphListener, var appName: String, val basePath: String, +val lastUpdateTime: Long = -1L, --- End diff -- Update @jerryshao Thanks for your time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19107: [SPARK-21799][ML] Fix `KMeans` performance regression ca...
Github user zhengruifeng commented on the issue: https://github.com/apache/spark/pull/19107 I am OK to resubmit the original PR if needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19186: [SPARK-21972][ML] Add param handlePersistence
Github user zhengruifeng commented on a diff in the pull request: https://github.com/apache/spark/pull/19186#discussion_r138243247 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala --- @@ -444,13 +444,13 @@ class LogisticRegressionWithLBFGS lr.setFitIntercept(addIntercept) lr.setMaxIter(optimizer.getNumIterations()) lr.setTol(optimizer.getConvergenceTol()) +// Determine if we should cache the DF +lr.setHandlePersistence(input.getStorageLevel == StorageLevel.NONE) --- End diff -- `mlllib.LoR` do not expose `HandlePersistence` to users now, and I think it maybe better to keep it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19132: [SPARK-21922] Fix duration always updating when task fai...
Github user caneGuy commented on the issue: https://github.com/apache/spark/pull/19132 Yes i have confirmed.Below is a testing job: @jerryshao ![_001](https://user-images.githubusercontent.com/26762018/30306401-7c682822-97aa-11e7-82ae-9c5802680dd5.png) ![_002](https://user-images.githubusercontent.com/26762018/30306402-7c6d7cfa-97aa-11e7-9788-33774ee06c4d.png) ![_003](https://user-images.githubusercontent.com/26762018/30306403-7c94e10a-97aa-11e7-899e-a1190c0f09da.png) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19186: [SPARK-21972][ML] Add param handlePersistence
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19186 **[Test build #81651 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81651/testReport)** for PR 19186 at commit [`9e53579`](https://github.com/apache/spark/commit/9e53579b4e8e69761f5a6c89cc60ab179ff78ea6). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19186: [SPARK-21972][ML] Add param handlePersistence
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19186 **[Test build #81650 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81650/testReport)** for PR 19186 at commit [`e112b42`](https://github.com/apache/spark/commit/e112b42a2df231f6b200bcb0cd3759cc143c8c80). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16422: [SPARK-17642] [SQL] support DESC EXTENDED/FORMATTED tabl...
Github user wzhfy commented on the issue: https://github.com/apache/spark/pull/16422 @gatorsmile @cloud-fan Fixed comments. Do you have time to take another look? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19132: [SPARK-21922] Fix duration always updating when task fai...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19132 Looks like I don't have the Jenkins permission to trigger UT 😞 . Let me ping @srowen to trigger the test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19132: [SPARK-21922] Fix duration always updating when t...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19132#discussion_r138240053 --- Diff: core/src/main/scala/org/apache/spark/ui/SparkUI.scala --- @@ -50,6 +50,7 @@ private[spark] class SparkUI private ( val operationGraphListener: RDDOperationGraphListener, var appName: String, val basePath: String, +val lastUpdateTime: Long = -1L, --- End diff -- I would like to user `Option[Long] = None` as default value to reflect there's no update time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19132: [SPARK-21922] Fix duration always updating when task fai...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19132 ok to test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19118: [SPARK-21882][CORE] OutputMetrics doesn't count w...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19118#discussion_r138238318 --- Diff: core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala --- @@ -112,11 +112,12 @@ object SparkHadoopWriter extends Logging { jobTrackerId, sparkStageId, sparkPartitionId, sparkAttemptNumber) committer.setupTask(taskContext) -val (outputMetrics, callback) = initHadoopOutputMetrics(context) - // Initiate the writer. config.initWriter(taskContext, sparkPartitionId) var recordsWritten = 0L + +// Initialize callback function after the writer. --- End diff -- The comment is better to explain why we must initialize the callback here. E.g., `We must initialize the callback for calculating bytes written after the statistic table is initialized in FileSystem`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19186: [SPARK-21972][ML] Add param handlePersistence
Github user zhengruifeng commented on a diff in the pull request: https://github.com/apache/spark/pull/19186#discussion_r138237760 --- Diff: mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala --- @@ -483,24 +488,17 @@ class LogisticRegression @Since("1.2.0") ( this } - override protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel = { -val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE -train(dataset, handlePersistence) - } - - protected[spark] def train( - dataset: Dataset[_], - handlePersistence: Boolean): LogisticRegressionModel = { + protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel = { val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol)) val instances: RDD[Instance] = dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd.map { case Row(label: Double, weight: Double, features: Vector) => Instance(label, weight, features) } -if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK) +if ($(handlePersistence)) instances.persist(StorageLevel.MEMORY_AND_DISK) --- End diff -- +1. I supposed that it's up to the users to check the `storageLevel` to avoid double caching. But I now approve to check in the algs, and it may be better to log a warning if the dataset is already cached and the `handlePersistence` is set `true`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user goldmedal commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r138237482 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -193,14 +223,35 @@ private[sql] class JacksonGenerator( * * @param row The row to convert */ - def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, rootFieldWriters)) + def write(row: InternalRow): Unit = { +writeObject(writeFields( + fieldWriters = rootFieldWriters, + row = row, + schema = dataType.asInstanceOf[StructType])) + } + + + /** + * Transforms multiple `InternalRow`s or `MapData`s to JSON array using Jackson + * + * @param array The array of rows or maps to convert + */ + def write(array: ArrayData): Unit = writeArray(writeArrayData( +fieldWriter = arrElementWriter, +array = array + )) /** - * Transforms multiple `InternalRow`s to JSON array using Jackson + * Transforms a single `MapData` to JSON object using Jackson * - * @param array The array of rows to convert + * @param map a map to convert */ - def write(array: ArrayData): Unit = writeArray(writeArrayData(array, arrElementWriter)) + def write(map: MapData): Unit = { +writeObject(writeMapData( + fieldWriter = mapElementWriter, --- End diff -- Ok Thanks for review :) I'll update it tonight. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user sitalkedia commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138230590 --- Diff: core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java --- @@ -72,10 +72,15 @@ public UnsafeSorterSpillReader( bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES; } +final Double readAheadFraction = +SparkEnv.get() == null ? 0.5 : + SparkEnv.get().conf().getDouble("spark.unsafe.sorter.spill.read.ahead.fraction", 0.5); + final InputStream bs = new NioBufferedFileInputStream(file, (int) bufferSizeBytes); try { - this.in = serializerManager.wrapStream(blockId, bs); + this.in = new ReadAheadInputStream(serializerManager.wrapStream(blockId, bs), --- End diff -- sure, done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user sitalkedia commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138231043 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocaloneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user sitalkedia commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138230901 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocaloneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user sitalkedia commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138230596 --- Diff: core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java --- @@ -72,10 +72,15 @@ public UnsafeSorterSpillReader( bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES; } +final Double readAheadFraction = --- End diff -- done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18875: [SPARK-21513][SQL] Allow UDF to_json support converting ...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/18875 Minor comments left, otherwise LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r138233095 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.json + +import java.io.CharArrayWriter + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} +import org.apache.spark.sql.types._ + +class JacksonGeneratorSuite extends SparkFunSuite { + + val gmtId = DateTimeUtils.TimeZoneGMT.getID + val option = new JSONOptions(Map.empty, gmtId) + + test("initial with StructType and write out a row") { +val dataType = StructType(StructField("a", IntegerType) :: Nil) +val input = InternalRow(1) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +gen.flush() +assert(writer.toString === """{"a":1}""") + } + + test("initial with StructType and write out rows") { +val dataType = StructType(StructField("a", IntegerType) :: Nil) +val input = new GenericArrayData(InternalRow(1) :: InternalRow(2) :: Nil) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +gen.flush() +assert(writer.toString === """[{"a":1},{"a":2}]""") + } + + test("initial with StructType and write out an array with single empty row") { +val dataType = StructType(StructField("a", IntegerType) :: Nil) +val input = new GenericArrayData(InternalRow(null) :: Nil) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +gen.flush() +assert(writer.toString === """[{}]""") + } + + test("initial with StructType and write out an empty array") { +val dataType = StructType(StructField("a", IntegerType) :: Nil) +val input = new GenericArrayData(Nil) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +gen.flush() +assert(writer.toString === """[]""") + } + + test("initial with Map and write out a map data") { +val dataType = MapType(StringType, IntegerType) +val input = ArrayBasedMapData(Map("a" -> 1)) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +gen.flush() +assert(writer.toString === """{"a":1}""") + } + + test("initial with Map and write out an array of maps") { +val dataType = MapType(StringType, IntegerType) +val input = new GenericArrayData( + ArrayBasedMapData(Map("a" -> 1)) :: ArrayBasedMapData(Map("b" -> 2)) :: Nil) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType, writer, option) +gen.write(input) +gen.flush() +assert(writer.toString === """[{"a":1},{"b":2}]""") + } + + test("error handling: initial with StructType but error calling write a map") { +val dataType = StructType(StructField("a", IntegerType) :: Nil) +val input = ArrayBasedMapData(Map("a" -> 1)) +val writer = new CharArrayWriter() +val gen = new JacksonGenerator(dataType, writer, option) +intercept[Exception] { + gen.write(input) +} + } + + test("error handling: initial with StructType but error calling write an array of maps") { +val dataType = StructType(StructField("a", IntegerType) :: Nil) +val input = new GenericArrayData( + ArrayBasedMapData(Map("a" -> 1)) :: ArrayBasedMapData(Map("b" -> 2)) :: Nil) +val writer = new
[GitHub] spark issue #18853: [SPARK-21646][SQL] CommonType for binary comparison
Github user wangyum commented on the issue: https://github.com/apache/spark/pull/18853 CC @gatorsmile, @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r138232491 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -193,14 +223,35 @@ private[sql] class JacksonGenerator( * * @param row The row to convert */ - def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, rootFieldWriters)) + def write(row: InternalRow): Unit = { +writeObject(writeFields( + fieldWriters = rootFieldWriters, + row = row, + schema = dataType.asInstanceOf[StructType])) + } + + + /** + * Transforms multiple `InternalRow`s or `MapData`s to JSON array using Jackson + * + * @param array The array of rows or maps to convert + */ + def write(array: ArrayData): Unit = writeArray(writeArrayData( +fieldWriter = arrElementWriter, +array = array + )) /** - * Transforms multiple `InternalRow`s to JSON array using Jackson + * Transforms a single `MapData` to JSON object using Jackson * - * @param array The array of rows to convert + * @param map a map to convert */ - def write(array: ArrayData): Unit = writeArray(writeArrayData(array, arrElementWriter)) + def write(map: MapData): Unit = { +writeObject(writeMapData( + fieldWriter = mapElementWriter, --- End diff -- And also add a similar comment in `write(row: InternalRow)`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18317: [SPARK-21113][CORE] Read ahead input stream to amortize ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18317 **[Test build #81649 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81649/testReport)** for PR 18317 at commit [`36ae448`](https://github.com/apache/spark/commit/36ae4482d3193509c6dc06a79b8ebfe5a2b88752). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18317: [SPARK-21113][CORE] Read ahead input stream to amortize ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18317 **[Test build #81648 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81648/testReport)** for PR 18317 at commit [`ed426f3`](https://github.com/apache/spark/commit/ed426f33f1d858c74f380218e520127b66f43453). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19192: Quote table names in JDBC queries
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19192#discussion_r138228826 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala --- @@ -100,7 +100,7 @@ abstract class JdbcDialect extends Serializable { * @return The SQL query to use for checking the table. */ def getTableExistsQuery(table: String): String = { -s"SELECT * FROM $table WHERE 1=0" +s"SELECT * FROM ${quoteIdentifier(table)} WHERE 1=0" --- End diff -- This is a behavior change. After we quote the names, the table names become case sensitive in most RDBMS. We need at least a JDBC option to control it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19192: Quote table names in JDBC queries
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19192 Without this PR change, can users add quotes in the JDBC option `dbtable`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r138228117 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -193,14 +223,35 @@ private[sql] class JacksonGenerator( * * @param row The row to convert */ - def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, rootFieldWriters)) + def write(row: InternalRow): Unit = { +writeObject(writeFields( + fieldWriters = rootFieldWriters, + row = row, + schema = dataType.asInstanceOf[StructType])) + } + + + /** + * Transforms multiple `InternalRow`s or `MapData`s to JSON array using Jackson + * + * @param array The array of rows or maps to convert + */ + def write(array: ArrayData): Unit = writeArray(writeArrayData( +fieldWriter = arrElementWriter, +array = array + )) --- End diff -- Let's change this one back to: ```scala def write(array: ArrayData): Unit = writeArray(writeArrayData(array, arrElementWriter)) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r138228185 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -193,14 +223,35 @@ private[sql] class JacksonGenerator( * * @param row The row to convert */ - def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, rootFieldWriters)) + def write(row: InternalRow): Unit = { +writeObject(writeFields( + fieldWriters = rootFieldWriters, + row = row, + schema = dataType.asInstanceOf[StructType])) + } + + + /** + * Transforms multiple `InternalRow`s or `MapData`s to JSON array using Jackson + * + * @param array The array of rows or maps to convert + */ + def write(array: ArrayData): Unit = writeArray(writeArrayData( +fieldWriter = arrElementWriter, +array = array + )) /** - * Transforms multiple `InternalRow`s to JSON array using Jackson + * Transforms a single `MapData` to JSON object using Jackson * - * @param array The array of rows to convert + * @param map a map to convert */ - def write(array: ArrayData): Unit = writeArray(writeArrayData(array, arrElementWriter)) + def write(map: MapData): Unit = { +writeObject(writeMapData( + fieldWriter = mapElementWriter, --- End diff -- Let's add a small comment saying this actually triggers the proper validation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/18875#discussion_r138228111 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala --- @@ -193,14 +223,35 @@ private[sql] class JacksonGenerator( * * @param row The row to convert */ - def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, rootFieldWriters)) + def write(row: InternalRow): Unit = { +writeObject(writeFields( + fieldWriters = rootFieldWriters, + row = row, + schema = dataType.asInstanceOf[StructType])) + } + + + /** + * Transforms multiple `InternalRow`s or `MapData`s to JSON array using Jackson + * + * @param array The array of rows or maps to convert + */ + def write(array: ArrayData): Unit = writeArray(writeArrayData( +fieldWriter = arrElementWriter, +array = array + )) --- End diff -- Let's change this one back to: ```scala def write(array: ArrayData): Unit = writeArray(writeArrayData(array, arrElementWriter)) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138225592 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocaloneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138201871 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocaloneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138207519 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocaloneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138193555 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocaloneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138225438 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocaloneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138198506 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocaloneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; --- End diff -- nit: double spaces
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138207456 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocaloneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138195259 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocaloneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); --- End diff -- nit: could you add the `bufferSizeInBytes` value to the error message? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138194732 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocaloneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138204793 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,292 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean isReadInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean isReadAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private final byte[] oneByte = new byte[1]; + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if(activeBuffer.remaining() == 0 && readAheadBuffer.remaining() == 0 && endOfStream) { + return true; +} +return false; + } + + + private void readAsync(final ByteBuffer byteBuffer) throws IOException { +stateChangeLock.lock(); +if (endOfStream || isReadInProgress) { +
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138200321 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,317 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private final byte[] oneByte = new byte[1]; + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean hasRemaining() { +if(activeBuffer.remaining() == 0 && readAheadBuffer.remaining() == 0 && endOfStream) { + return true; +} +return false; + } + private void readAsync(final ByteBuffer byteBuffer) throws IOException { +
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138194101 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocaloneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138189593 --- Diff: core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java --- @@ -72,10 +72,15 @@ public UnsafeSorterSpillReader( bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES; } +final Double readAheadFraction = +SparkEnv.get() == null ? 0.5 : + SparkEnv.get().conf().getDouble("spark.unsafe.sorter.spill.read.ahead.fraction", 0.5); + final InputStream bs = new NioBufferedFileInputStream(file, (int) bufferSizeBytes); try { - this.in = serializerManager.wrapStream(blockId, bs); + this.in = new ReadAheadInputStream(serializerManager.wrapStream(blockId, bs), --- End diff -- Could you add an internal conf to disable it? It will allow the user to disable it when the new feature causes a regression. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138189733 --- Diff: core/src/test/java/org/apache/spark/io/GenericFileInputStreamSuite.java --- @@ -50,17 +52,16 @@ public void tearDown() { inputFile.delete(); } + --- End diff -- nit: extra empty line --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138201264 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocaloneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138188910 --- Diff: core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java --- @@ -72,10 +72,15 @@ public UnsafeSorterSpillReader( bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES; } +final Double readAheadFraction = --- End diff -- nit: Double -> double --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138195292 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocaloneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138200201 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocaloneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138194341 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocaloneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138208239 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocaloneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; + } + private void readAsync(final
[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/18317#discussion_r138198579 --- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java --- @@ -0,0 +1,315 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io; + +import com.google.common.base.Preconditions; +import org.apache.spark.storage.StorageUtils; +import org.apache.spark.util.ThreadUtils; + +import javax.annotation.concurrent.GuardedBy; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +/** + * {@link InputStream} implementation which asynchronously reads ahead from the underlying input + * stream when specified amount of data has been read from the current buffer. It does it by maintaining + * two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned + * when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying + * input stream and once the current active buffer is exhausted, we flip the two buffers so that we can + * start reading from the read ahead buffer without being blocked in disk I/O. + */ +public class ReadAheadInputStream extends InputStream { + + private ReentrantLock stateChangeLock = new ReentrantLock(); + + @GuardedBy("stateChangeLock") + private ByteBuffer activeBuffer; + + @GuardedBy("stateChangeLock") + private ByteBuffer readAheadBuffer; + + @GuardedBy("stateChangeLock") + private boolean endOfStream; + + @GuardedBy("stateChangeLock") + // true if async read is in progress + private boolean readInProgress; + + @GuardedBy("stateChangeLock") + // true if read is aborted due to an exception in reading from underlying input stream. + private boolean readAborted; + + @GuardedBy("stateChangeLock") + private Exception readException; + + // If the remaining data size in the current buffer is below this threshold, + // we issue an async read from the underlying input stream. + private final int readAheadThresholdInBytes; + + private final InputStream underlyingInputStream; + + private final ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("read-ahead"); + + private final Condition asyncReadComplete = stateChangeLock.newCondition(); + + private static final ThreadLocaloneByte = ThreadLocal.withInitial(() -> new byte[1]); + + /** + * Creates a ReadAheadInputStream with the specified buffer size and read-ahead + * threshold + * + * @param inputStream The underlying input stream. + * @param bufferSizeInBytes The buffer size. + * @param readAheadThresholdInBytes If the active buffer has less data than the read-ahead + * threshold, an async read is triggered. + */ + public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) { +Preconditions.checkArgument(bufferSizeInBytes > 0, +"bufferSizeInBytes should be greater than 0"); +Preconditions.checkArgument(readAheadThresholdInBytes > 0 && +readAheadThresholdInBytes < bufferSizeInBytes, +"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" ); +activeBuffer = ByteBuffer.allocate(bufferSizeInBytes); +readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes); +this.readAheadThresholdInBytes = readAheadThresholdInBytes; +this.underlyingInputStream = inputStream; +activeBuffer.flip(); +readAheadBuffer.flip(); + } + + private boolean isEndOfStream() { +if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream) { + return true; +} +return false; --- End diff -- why not just `return
[GitHub] spark pull request #19136: [SPARK-15689][SQL] data source v2
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19136#discussion_r138224219 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.Strategy +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} +import org.apache.spark.sql.execution.datasources.DataSourceStrategy +import org.apache.spark.sql.sources.Filter +import org.apache.spark.sql.sources.v2.reader.downward.{CatalystFilterPushDownSupport, ColumnPruningSupport, FilterPushDownSupport} + +object DataSourceV2Strategy extends Strategy { + // TODO: write path + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { +case PhysicalOperation(projects, filters, DataSourceV2Relation(output, reader)) => + val attrMap = AttributeMap(output.zip(output)) + + val projectSet = AttributeSet(projects.flatMap(_.references)) + val filterSet = AttributeSet(filters.flatMap(_.references)) + + // Match original case of attributes. + // TODO: nested fields pruning + val requiredColumns = (projectSet ++ filterSet).toSeq.map(attrMap) + reader match { +case r: ColumnPruningSupport => + r.pruneColumns(requiredColumns.toStructType) +case _ => + } + + val stayUpFilters: Seq[Expression] = reader match { +case r: CatalystFilterPushDownSupport => + r.pushCatalystFilters(filters.toArray) + +case r: FilterPushDownSupport => --- End diff -- yea we can't prevent users to implement them both, and we will pick `CatalystFilterPushDownSupport` over `FilterPushDownSupport`. Let me document it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19136: [SPARK-15689][SQL] data source v2
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19136#discussion_r138224082 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} +import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader +import org.apache.spark.sql.sources.v2.reader.upward.StatisticsSupport + +case class DataSourceV2Relation( +output: Seq[AttributeReference], +reader: DataSourceV2Reader) extends LeafNode { + + override def computeStats(): Statistics = reader match { +case r: StatisticsSupport => Statistics(sizeInBytes = r.getStatistics.sizeInBytes()) +case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes) + } +} + +object DataSourceV2Relation { + def apply(reader: DataSourceV2Reader): DataSourceV2Relation = { +new DataSourceV2Relation(reader.readSchema().toAttributes, reader) --- End diff -- I think users can write a special `ReadTask` to do it, but we can't save memory by doing this. When an operator(the scan operator) transfers data to another operator, the data must be `UnsafeRow`s. So even users return a joined row in `ReadTask`, Spark need to convert it to `UnsafeRow`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19185: [Spark-21854] Added LogisticRegressionTrainingSum...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/19185#discussion_r138220213 --- Diff: python/pyspark/ml/tests.py --- @@ -1473,11 +1473,59 @@ def test_logistic_regression_summary(self): self.assertTrue(isinstance(s.fMeasureByThreshold, DataFrame)) self.assertTrue(isinstance(s.precisionByThreshold, DataFrame)) self.assertTrue(isinstance(s.recallByThreshold, DataFrame)) + +self.assertAlmostEqual(s.accuracy, 1.0, 2) --- End diff -- also nit, but should probably add tests for all the new attributes, like `falsePositiveRateByLabel` as below. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19185: [Spark-21854] Added LogisticRegressionTrainingSum...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/19185#discussion_r138219915 --- Diff: python/pyspark/ml/tests.py --- @@ -1473,11 +1473,59 @@ def test_logistic_regression_summary(self): self.assertTrue(isinstance(s.fMeasureByThreshold, DataFrame)) self.assertTrue(isinstance(s.precisionByThreshold, DataFrame)) self.assertTrue(isinstance(s.recallByThreshold, DataFrame)) + +self.assertAlmostEqual(s.accuracy, 1.0, 2) +self.assertAlmostEqual(s.weightedTruePositiveRate, 1.0, 2) +self.assertAlmostEqual(s.weightedFalsePositiveRate, 0.0, 2) +self.assertAlmostEqual(s.weightedRecall, 1.0, 2) +self.assertAlmostEqual(s.weightedPrecision, 1.0, 2) +self.assertAlmostEqual(s.weightedFMeasure(), 1.0, 2) # test evaluation (with training dataset) produces a summary with same values # one check is enough to verify a summary is returned, Scala version runs full test sameSummary = model.evaluate(df) self.assertAlmostEqual(sameSummary.areaUnderROC, s.areaUnderROC) +def test_multiclass_logistic_regression_summary(self): +df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)), + (0.0, 2.0, Vectors.sparse(1, [], [])), + (2.0, 2.0, Vectors.dense(2.0)), + (2.0, 2.0, Vectors.dense(1.9))], +["label", "weight", "features"]) +lr = LogisticRegression(maxIter=5, regParam=0.01, weightCol="weight", fitIntercept=False) +model = lr.fit(df) +self.assertTrue(model.hasSummary) +s = model.summary +# test that api is callable and returns expected types +self.assertTrue(isinstance(s.predictions, DataFrame)) +self.assertEqual(s.probabilityCol, "probability") +self.assertEqual(s.labelCol, "label") +self.assertEqual(s.featuresCol, "features") +self.assertEqual(s.predictionCol, "prediction") +objHist = s.objectiveHistory +self.assertTrue(isinstance(objHist, list) and isinstance(objHist[0], float)) +self.assertGreater(s.totalIterations, 0) +self.assertTrue(isinstance(s.labels, list)) +self.assertTrue(isinstance(s.truePositiveRateByLabel, list)) +self.assertTrue(isinstance(s.falsePositiveRateByLabel, list)) +self.assertTrue(isinstance(s.precisionByLabel, list)) +self.assertTrue(isinstance(s.recallByLabel, list)) +self.assertTrue(isinstance(s.fMeasureByLabel(), list)) +self.assertAlmostEqual(s.accuracy, 0.75, 2) +self.assertAlmostEqual(s.weightedTruePositiveRate, 0.75, 2) +self.assertAlmostEqual(s.weightedFalsePositiveRate, 0.25, 2) +self.assertAlmostEqual(s.weightedRecall, 0.75, 2) +self.assertAlmostEqual(s.weightedPrecision, 0.583, 2) +self.assertAlmostEqual(s.weightedFMeasure(), 0.65, 2) --- End diff -- maybe add `beta=1.0` to the methods that take beta as a parameter. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19185: [Spark-21854] Added LogisticRegressionTrainingSum...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/19185#discussion_r138220005 --- Diff: python/pyspark/ml/tests.py --- @@ -1473,11 +1473,59 @@ def test_logistic_regression_summary(self): self.assertTrue(isinstance(s.fMeasureByThreshold, DataFrame)) self.assertTrue(isinstance(s.precisionByThreshold, DataFrame)) self.assertTrue(isinstance(s.recallByThreshold, DataFrame)) + +self.assertAlmostEqual(s.accuracy, 1.0, 2) --- End diff -- care to add these to the scala unit test for binary summary as well? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19185: [Spark-21854] Added LogisticRegressionTrainingSum...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/19185#discussion_r138220297 --- Diff: python/pyspark/ml/classification.py --- @@ -528,9 +528,11 @@ def summary(self): trained on the training set. An exception is thrown if `trainingSummary is None`. """ if self.hasSummary: -java_blrt_summary = self._call_java("summary") -# Note: Once multiclass is added, update this to return correct summary -return BinaryLogisticRegressionTrainingSummary(java_blrt_summary) +java_lrt_summary = self._call_java("summary") +if (self.numClasses <= 2): --- End diff -- nit: remove parentheses --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19188: [SPARK-21973][SQL] Add an new option to filter qu...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/19188#discussion_r138221684 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala --- @@ -113,12 +114,39 @@ object TPCDSQueryBenchmark { "q81", "q82", "q83", "q84", "q85", "q86", "q87", "q88", "q89", "q90", "q91", "q92", "q93", "q94", "q95", "q96", "q97", "q98", "q99") +val sparkConf = new SparkConf() --- End diff -- ok --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18592: [SPARK-21368][SQL] TPCDSQueryBenchmark can't refe...
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/18592#discussion_r138220942 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala --- @@ -99,6 +95,20 @@ object TPCDSQueryBenchmark { } def main(args: Array[String]): Unit = { +if (args.length < 1) { --- End diff -- Good idea. I'll add `TPCDSQueryBenchmarkArguments`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19141: [SPARK-21384] [YARN] Spark + YARN fails with Loca...
Github user devaraj-kavali commented on a diff in the pull request: https://github.com/apache/spark/pull/19141#discussion_r138219530 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -565,7 +565,6 @@ private[spark] class Client( distribute(jarsArchive.toURI.getPath, resType = LocalResourceType.ARCHIVE, destName = Some(LOCALIZED_LIB_DIR)) - jarsArchive.delete() --- End diff -- Thanks @vanzin for the pointer. It was my mistake, I missed the change reason while looking at the history of the file. I still see that SPARK-20741 has fixed the issue partially, it leaves \_\_spark_conf\_\_*.zip file to delete as part of shutdownhook. I see these approaches to fix it further, 1. Delete \_\_spark_conf\_\_*.zip and \_\_spark_libs\_\_*.zip files after completing the application similar to cleanupStagingDir. (Or) 2. Add a configuration whether to delete \_\_spark_conf\_\_*.zip and \_\_spark_libs\_\_*.zip files after copying to dest dir, so that users can decide whether to delete these immediately or as part of process exit. In case of SPARK-20741, this new configuration can be enabled to delete these files immediately. @vanzin & @jerryshao Please let me know your thoughts on this or if you have any other way to do this. Thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18945: Add option to convert nullable int columns to float colu...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/18945 Hey @logannc, have you had some time to work on this? I want to fix this issue asap. Ortherwise, would anyone here be interested in submitimg another PR for the another approach? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19188: [SPARK-21973][SQL] Add an new option to filter qu...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19188#discussion_r138217212 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala --- @@ -113,12 +114,39 @@ object TPCDSQueryBenchmark { "q81", "q82", "q83", "q84", "q85", "q86", "q87", "q88", "q89", "q90", "q91", "q92", "q93", "q94", "q95", "q96", "q97", "q98", "q99") +val sparkConf = new SparkConf() --- End diff -- Could we add an argument, instead of using the SQLConf? See https://github.com/apache/spark/pull/18592#discussion_r138217049 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19106: [SPARK-21770][ML] ProbabilisticClassificationModel fix c...
Github user sethah commented on the issue: https://github.com/apache/spark/pull/19106 I'm confused how this issue was discovered in the first place. Did someone actually train an RF/DT and receive all zero probabilities? If so, shouldn't there be a unit test that recreates that scenario? Anyway, AFAICT the only way this could happen is if the RandomForest was trained on an empty DataFrame which couldn't happen because it would fail before the train method was called anyway. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18592: [SPARK-21368][SQL] TPCDSQueryBenchmark can't refe...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18592#discussion_r138217049 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala --- @@ -99,6 +95,20 @@ object TPCDSQueryBenchmark { } def main(args: Array[String]): Unit = { +if (args.length < 1) { --- End diff -- @sarutak @maropu Could we do something like https://github.com/apache/spark/blob/master/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala? We also can use add another option for outputing the plans of TPC-DS queries, instead of running all the queries. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19147: [WIP][SPARK-21190][SQL][PYTHON] Vectorized UDFs i...
Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19147#discussion_r138215300 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/VectorizedPythonRunner.scala --- @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} +import java.net.Socket +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ + +import org.apache.arrow.vector.VectorSchemaRoot +import org.apache.arrow.vector.stream.{ArrowStreamReader, ArrowStreamWriter} + +import org.apache.spark.{SparkEnv, SparkFiles, TaskContext} +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType, PythonException, PythonRDD, SpecialLengths} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.arrow.{ArrowUtils, ArrowWriter} +import org.apache.spark.sql.execution.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector} +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + +/** + * Similar to `PythonRunner`, but exchange data with Python worker via columnar format. + */ +class VectorizedPythonRunner( +funcs: Seq[ChainedPythonFunctions], +batchSize: Int, +bufferSize: Int, +reuse_worker: Boolean, +argOffsets: Array[Array[Int]]) extends Logging { + + require(funcs.length == argOffsets.length, "argOffsets should have the same length as funcs") + + // All the Python functions should have the same exec, version and envvars. + private val envVars = funcs.head.funcs.head.envVars + private val pythonExec = funcs.head.funcs.head.pythonExec + private val pythonVer = funcs.head.funcs.head.pythonVer + + // TODO: support accumulator in multiple UDF + private val accumulator = funcs.head.funcs.head.accumulator + + // todo: return column batch? + def compute( --- End diff -- I was referring to the protocol between Scala and Python that is changed here and could act differently under some circumstances. Here is the behavior of the `PythonRunner` protocol and `VectorizedPythonRunner` protocol that you introduce here: **PythonRunner** Data blocks are framed by a special length integer. Scala reads each data block one at a time and checks the length code. If the code is a `PythonException`, the error is read from Python and a `SparkException` is thrown with that being the cause. **VectorizedPythonRunner** A data stream is opened in Scala with `ArrowStreamReader` and batches are transferred until `ArrowStreamReader` returns False indicating there is no more data. Only at this point are the special length codes checked to handle an error from Python. This behavior change would probably only cause problems if things are not working normally. For example, what would happen if `pyarrow` was not installed on an executor? With `PythonRunner` the ImportError would cause a `PythonException` to be transferred and thrown in Scala. In `VectorizedPythonRunner` I believe the `ArrowStreamReader` would try to read the special length code and then fail somewhere internally to Arrow, not showing the ImportError. My point was that this type of behavior change should probably be implemented in a separate JIRA where we could make sure to handle all of these cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19107: [SPARK-21799][ML] Fix `KMeans` performance regression ca...
Github user jkbradley commented on the issue: https://github.com/apache/spark/pull/19107 @WeichenXu123 I just commented on https://issues.apache.org/jira/browse/SPARK-18608 to clarify our efforts here. Can you please either retarget this for SPARK-18608 and update it, or ask @zhengruifeng to submit his original PR as the fix? Please coordinate, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r138191841 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/config.scala --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.util.concurrent.TimeUnit + +import scala.annotation.meta.getter + +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.util.kvstore.KVIndex + +private[spark] object config { + + /** Use this to annotate constructor params to be used as KVStore indices. */ + type KVIndexParam = KVIndex @getter + + val DEFAULT_LOG_DIR = "file:/tmp/spark-events" + + val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory") +.stringConf +.createWithDefault(DEFAULT_LOG_DIR) + + val MAX_LOG_AGE_S = ConfigBuilder("spark.history.fs.cleaner.maxAge") +.timeConf(TimeUnit.SECONDS) +.createWithDefaultString("7d") + + val LOCAL_STORE_DIR = ConfigBuilder("spark.history.store.path") --- End diff -- It'd better to document the default one is an in-memory store. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r137942598 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -229,10 +254,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - override def getListing(): Iterator[FsApplicationHistoryInfo] = applications.values.iterator + override def getListing(): Iterator[ApplicationHistoryInfo] = { --- End diff -- The returned order is `descending`, right? This is not straightforward from the codes. Please add a comment --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r137942907 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -316,25 +353,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) try { val newLastScanTime = getNewLastScanTime() logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime") - val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) -.getOrElse(Seq.empty[FileStatus]) + val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil) // scan for modified applications, replay and merge them - val logInfos: Seq[FileStatus] = statusList + val logInfos = statusList .filter { entry => - val fileInfo = fileToAppInfo.get(entry.getPath()) - val prevFileSize = if (fileInfo != null) fileInfo.fileSize else 0L !entry.isDirectory() && // FsHistoryProvider generates a hidden file which can't be read. Accidentally // reading a garbage file is safe, but we would log an error which can be scary to // the end-user. !entry.getPath().getName().startsWith(".") && -prevFileSize < entry.getLen() && -SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) +SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ) && +recordedFileSize(entry.getPath()) < entry.getLen() --- End diff -- Can we add a comment to explain what `recordedFileSize(entry.getPath())` returns? In the original code, the variable name is self descriptive. The new change does not have it any more. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18887#discussion_r137940658 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/config.scala --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.history + +import java.util.concurrent.TimeUnit + +import scala.annotation.meta.getter + +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.util.kvstore.KVIndex + +private[spark] object config { + + /** Use this to annotate constructor params to be used as KVStore indices. */ + type KVIndexParam = KVIndex @getter + + val DEFAULT_LOG_DIR = "file:/tmp/spark-events" + + val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory") +.stringConf +.createWithDefault(DEFAULT_LOG_DIR) + + val MAX_LOG_AGE_S = ConfigBuilder("spark.history.fs.cleaner.maxAge") +.timeConf(TimeUnit.SECONDS) +.createWithDefaultString("7d") + + val LOCAL_STORE_DIR = ConfigBuilder("spark.history.store.path") --- End diff -- Just want to confirm it. Except this, no change on the other parameters. Right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org