[GitHub] spark issue #18538: [SPARK-14516][ML] Adding ClusteringEvaluator with the im...

2017-09-11 Thread yanboliang
Github user yanboliang commented on the issue:

https://github.com/apache/spark/pull/18538
  
@mgaido91 These are my last comments, it should be ready to merge once they 
are addressed. Thanks for your contribution.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18538: [SPARK-14516][ML] Adding ClusteringEvaluator with...

2017-09-11 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/18538#discussion_r138255937
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala 
---
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.evaluation
+
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.ml.linalg.{BLAS, DenseVector, Vector, Vectors, 
VectorUDT}
+import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators}
+import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol}
+import org.apache.spark.ml.util.{DefaultParamsReadable, 
DefaultParamsWritable, Identifiable, SchemaUtils}
+import org.apache.spark.sql.{DataFrame, Dataset}
+import org.apache.spark.sql.functions.{avg, col, udf}
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * :: Experimental ::
+ * Evaluator for clustering results.
+ * The metric computes the Silhouette measure
+ * using the squared Euclidean distance.
+ *
+ * The Silhouette is a measure for the validation
+ * of the consistency within clusters. It ranges
+ * between 1 and -1, where a value close to 1
+ * means that the points in a cluster are close
+ * to the other points in the same cluster and
+ * far from the points of the other clusters.
+ */
+@Experimental
+@Since("2.3.0")
+class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") override val 
uid: String)
+  extends Evaluator with HasPredictionCol with HasFeaturesCol with 
DefaultParamsWritable {
+
+  @Since("2.3.0")
+  def this() = this(Identifiable.randomUID("cluEval"))
+
+  @Since("2.3.0")
+  override def copy(pMap: ParamMap): ClusteringEvaluator = 
this.defaultCopy(pMap)
+
+  @Since("2.3.0")
+  override def isLargerBetter: Boolean = true
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setPredictionCol(value: String): this.type = set(predictionCol, 
value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setFeaturesCol(value: String): this.type = set(featuresCol, value)
+
+  /**
+   * param for metric name in evaluation
+   * (supports `"silhouette"` (default))
+   * @group param
+   */
+  @Since("2.3.0")
+  val metricName: Param[String] = {
+val allowedParams = ParamValidators.inArray(Array("silhouette"))
+new Param(
+  this,
+  "metricName",
+  "metric name in evaluation (silhouette)",
+  allowedParams
+)
+  }
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getMetricName: String = $(metricName)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setMetricName(value: String): this.type = set(metricName, value)
+
+  setDefault(metricName -> "silhouette")
+
+  @Since("2.3.0")
+  override def evaluate(dataset: Dataset[_]): Double = {
+SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new 
VectorUDT)
+SchemaUtils.checkColumnType(dataset.schema, $(predictionCol), 
IntegerType)
--- End diff --

We should support all numeric type for prediction column, not only integer. 
```
SchemaUtils.checkNumericType(schema, $(labelCol))
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18538: [SPARK-14516][ML] Adding ClusteringEvaluator with...

2017-09-11 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/18538#discussion_r138256035
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala 
---
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.evaluation
+
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.ml.linalg.{BLAS, DenseVector, Vector, Vectors, 
VectorUDT}
+import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators}
+import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol}
+import org.apache.spark.ml.util.{DefaultParamsReadable, 
DefaultParamsWritable, Identifiable, SchemaUtils}
+import org.apache.spark.sql.{DataFrame, Dataset}
+import org.apache.spark.sql.functions.{avg, col, udf}
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * :: Experimental ::
+ * Evaluator for clustering results.
+ * The metric computes the Silhouette measure
+ * using the squared Euclidean distance.
+ *
+ * The Silhouette is a measure for the validation
+ * of the consistency within clusters. It ranges
+ * between 1 and -1, where a value close to 1
+ * means that the points in a cluster are close
+ * to the other points in the same cluster and
+ * far from the points of the other clusters.
+ */
+@Experimental
+@Since("2.3.0")
+class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") override val 
uid: String)
+  extends Evaluator with HasPredictionCol with HasFeaturesCol with 
DefaultParamsWritable {
+
+  @Since("2.3.0")
+  def this() = this(Identifiable.randomUID("cluEval"))
+
+  @Since("2.3.0")
+  override def copy(pMap: ParamMap): ClusteringEvaluator = 
this.defaultCopy(pMap)
+
+  @Since("2.3.0")
+  override def isLargerBetter: Boolean = true
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setPredictionCol(value: String): this.type = set(predictionCol, 
value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setFeaturesCol(value: String): this.type = set(featuresCol, value)
+
+  /**
+   * param for metric name in evaluation
+   * (supports `"silhouette"` (default))
+   * @group param
+   */
+  @Since("2.3.0")
+  val metricName: Param[String] = {
+val allowedParams = ParamValidators.inArray(Array("silhouette"))
+new Param(
+  this,
+  "metricName",
+  "metric name in evaluation (silhouette)",
+  allowedParams
+)
--- End diff --

Reorg as:
```
val metricName: Param[String] = {
val allowedParams = ParamValidators.inArray(Array("squaredSilhouette"))
new Param(
  this, "metricName", "metric name in evaluation (squaredSilhouette)", 
allowedParams)
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18538: [SPARK-14516][ML] Adding ClusteringEvaluator with...

2017-09-11 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/18538#discussion_r138255648
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala 
---
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.evaluation
+
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.ml.linalg.{BLAS, DenseVector, Vector, Vectors, 
VectorUDT}
+import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators}
+import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol}
+import org.apache.spark.ml.util.{DefaultParamsReadable, 
DefaultParamsWritable, Identifiable, SchemaUtils}
+import org.apache.spark.sql.{DataFrame, Dataset}
+import org.apache.spark.sql.functions.{avg, col, udf}
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * :: Experimental ::
--- End diff --

Usually we leave a blank line under ```:: Experimental ::```.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18538: [SPARK-14516][ML] Adding ClusteringEvaluator with...

2017-09-11 Thread yanboliang
Github user yanboliang commented on a diff in the pull request:

https://github.com/apache/spark/pull/18538#discussion_r138255474
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala 
---
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.evaluation
+
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.ml.linalg.{BLAS, DenseVector, Vector, Vectors, 
VectorUDT}
+import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators}
+import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol}
+import org.apache.spark.ml.util.{DefaultParamsReadable, 
DefaultParamsWritable, Identifiable, SchemaUtils}
+import org.apache.spark.sql.{DataFrame, Dataset}
+import org.apache.spark.sql.functions.{avg, col, udf}
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * :: Experimental ::
+ * Evaluator for clustering results.
+ * The metric computes the Silhouette measure
+ * using the squared Euclidean distance.
+ *
+ * The Silhouette is a measure for the validation
+ * of the consistency within clusters. It ranges
+ * between 1 and -1, where a value close to 1
+ * means that the points in a cluster are close
+ * to the other points in the same cluster and
+ * far from the points of the other clusters.
+ */
+@Experimental
+@Since("2.3.0")
+class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") override val 
uid: String)
+  extends Evaluator with HasPredictionCol with HasFeaturesCol with 
DefaultParamsWritable {
+
+  @Since("2.3.0")
+  def this() = this(Identifiable.randomUID("cluEval"))
+
+  @Since("2.3.0")
+  override def copy(pMap: ParamMap): ClusteringEvaluator = 
this.defaultCopy(pMap)
+
+  @Since("2.3.0")
+  override def isLargerBetter: Boolean = true
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setPredictionCol(value: String): this.type = set(predictionCol, 
value)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setFeaturesCol(value: String): this.type = set(featuresCol, value)
+
+  /**
+   * param for metric name in evaluation
+   * (supports `"silhouette"` (default))
+   * @group param
+   */
+  @Since("2.3.0")
+  val metricName: Param[String] = {
+val allowedParams = ParamValidators.inArray(Array("silhouette"))
+new Param(
+  this,
+  "metricName",
+  "metric name in evaluation (silhouette)",
+  allowedParams
+)
+  }
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getMetricName: String = $(metricName)
+
+  /** @group setParam */
+  @Since("2.3.0")
+  def setMetricName(value: String): this.type = set(metricName, value)
+
+  setDefault(metricName -> "silhouette")
+
+  @Since("2.3.0")
+  override def evaluate(dataset: Dataset[_]): Double = {
+SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new 
VectorUDT)
+SchemaUtils.checkColumnType(dataset.schema, $(predictionCol), 
IntegerType)
+
+$(metricName) match {
+  case "silhouette" => 
SquaredEuclideanSilhouette.computeSilhouetteScore(
+dataset,
+$(predictionCol),
+$(featuresCol)
+  )
--- End diff --

Reorg as:
```
$(metricName) match {
  case "squaredSilhouette" =>
SquaredEuclideanSilhouette.computeSilhouetteScore(
  dataset, $(predictionCol), $(featuresCol))
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19110: [SPARK-21027][ML][PYTHON] Added tunable parallelism to o...

2017-09-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19110
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19110: [SPARK-21027][ML][PYTHON] Added tunable parallelism to o...

2017-09-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19110
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81653/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19110: [SPARK-21027][ML][PYTHON] Added tunable parallelism to o...

2017-09-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19110
  
**[Test build #81653 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81653/testReport)**
 for PR 19110 at commit 
[`c24d4e2`](https://github.com/apache/spark/commit/c24d4e261550fea22e4ee95909f4b078186bc83c).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...

2017-09-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19196
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81656/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...

2017-09-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19196
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...

2017-09-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19196
  
**[Test build #81656 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81656/testReport)**
 for PR 19196 at commit 
[`2f94951`](https://github.com/apache/spark/commit/2f949517ea1d667aee8ca6838a374e222492c0c7).
 * This patch **fails to generate documentation**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...

2017-09-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19196
  
**[Test build #81656 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81656/testReport)**
 for PR 19196 at commit 
[`2f94951`](https://github.com/apache/spark/commit/2f949517ea1d667aee8ca6838a374e222492c0c7).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19197: [SPARK-18608][ML] Fix double caching

2017-09-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19197
  
**[Test build #81654 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81654/testReport)**
 for PR 19197 at commit 
[`b485614`](https://github.com/apache/spark/commit/b4856147e04c3d57f2bfc70c70e3f136f46fa873).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16422: [SPARK-17642] [SQL] support DESC EXTENDED/FORMATTED tabl...

2017-09-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16422
  
**[Test build #81655 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81655/testReport)**
 for PR 16422 at commit 
[`0d49ee9`](https://github.com/apache/spark/commit/0d49ee91508c908daef672a04768c15a9e5c5dba).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19197: [SPARK-18608][ML] Fix double caching

2017-09-11 Thread zhengruifeng
GitHub user zhengruifeng opened a pull request:

https://github.com/apache/spark/pull/19197

[SPARK-18608][ML] Fix double caching

## What changes were proposed in this pull request?
`df.rdd.getStorageLevel` => `df.storageLevel`

using cmd `find . -name '*.scala' | xargs -i bash -c 'egrep -in 
"\.rdd\.getStorageLevel" {} && echo {}'` to make sure all algs involved in this 
issue are fixed.

Previous discussion in other PRs: 
https://github.com/apache/spark/pull/19107, 
https://github.com/apache/spark/pull/17014

## How was this patch tested?
existing tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhengruifeng/spark double_caching

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19197.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19197


commit b4856147e04c3d57f2bfc70c70e3f136f46fa873
Author: Zheng RuiFeng 
Date:   2017-09-12T04:53:35Z

recreate pr




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16422: [SPARK-17642] [SQL] support DESC EXTENDED/FORMATTED tabl...

2017-09-11 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/16422
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...

2017-09-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19196
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...

2017-09-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19196
  
**[Test build #81652 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81652/testReport)**
 for PR 19196 at commit 
[`090044c`](https://github.com/apache/spark/commit/090044ca089870befff464d37f098c4d4fd19657).
 * This patch **fails to generate documentation**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...

2017-09-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19196
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81652/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19122: [SPARK-21911][ML][PySpark] Parallel Model Evaluat...

2017-09-11 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19122#discussion_r138249937
  
--- Diff: python/pyspark/ml/param/_shared_params_code_gen.py ---
@@ -152,6 +152,8 @@ def get$Name(self):
 ("varianceCol", "column name for the biased sample variance of 
prediction.",
  None, "TypeConverters.toString"),
 ("aggregationDepth", "suggested depth for treeAggregate (>= 2).", 
"2",
+ "TypeConverters.toInt"),
+("parallelism", "number of threads to use when fitting models in 
parallel (>= 1).", "1",
--- End diff --

No worry, #19110 will merge first and then I will merge it to this PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...

2017-09-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19196
  
**[Test build #81652 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81652/testReport)**
 for PR 19196 at commit 
[`090044c`](https://github.com/apache/spark/commit/090044ca089870befff464d37f098c4d4fd19657).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19110: [SPARK-21027][ML][PYTHON] Added tunable parallelism to o...

2017-09-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19110
  
**[Test build #81653 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81653/testReport)**
 for PR 19110 at commit 
[`c24d4e2`](https://github.com/apache/spark/commit/c24d4e261550fea22e4ee95909f4b078186bc83c).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-11 Thread brkyvz
GitHub user brkyvz opened a pull request:

https://github.com/apache/spark/pull/19196

[SPARK-21977] SinglePartition optimizations break certain Streaming 
Stateful Aggregation requirements

## What changes were proposed in this pull request?

This is a bit hard to explain as there are several issues here, I'll try my 
best. Here are the requirements:
1. A StructuredStreaming Source that can generate empty RDDs with 0 
partitions
2. A StructuredStreaming query that uses the above source, performs a 
stateful aggregation (mapGroupsWithState, groupBy.count, ...), and coalesce's 
by 1
The crux of the problem is that when a dataset has a `coalesce(1)` call, it 
receives a `SinglePartition` partitioning scheme. This scheme satisfies most 
required distributions used for aggregations such as HashAggregateExec. This 
causes a world of problems:
Symptom 1. If the input RDD has 0 partitions, the whole lineage will 
receive 0 partitions, nothing will be executed, the state store will not create 
any delta files. When this happens, the next trigger fails, because the 
StateStore fails to load the delta file for the previous trigger
Symptom 2. Let's say that there was data. Then in this case, if you stop 
your stream, and change `coalesce(1)` with `coalesce(2)`, then restart your 
stream, your stream will fail, because `spark.sql.shuffle.partitions - 1` 
number of StateStores will fail to find its delta files.
To fix the issues above, we must check that the partitioning of the child 
of a `StatefulOperator` satisfies:
If the grouping expressions are empty:
a) AllTuple distribution
b) Single physical partition
If the grouping expressions are non empty:
a) Clustered distribution
b) spark.sql.shuffle.partition # of partitions
whether or not coalesce(1) exists in the plan, and whether or not the input 
RDD for the trigger has any data.
Once you fix the above problem by adding an Exchange to the plan, you come 
across the following bug:
If you call `coalesce(1).groupBy().count()` on a Streaming DataFrame, and 
if you have a trigger with no data, `StateStoreRestoreExec` doesn't return the 
prior state. However, for this specific aggregation, `HashAggregateExec` after 
the restore returns a (0, 0) row, since we're performing a count, and there is 
no data. Then this data gets stored in `StateStoreSaveExec` causing the 
previous counts to be overwritten and lost.

## How was this patch tested?

Regression tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/brkyvz/spark sa-0

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19196.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19196


commit b7aeed6af2aaf6eb347dd0a492a62e6530900eb5
Author: Burak Yavuz 
Date:   2017-09-08T18:36:02Z

couldn't repro

commit 4a7d1240196cc4660d33aef33d893526da5f0ceb
Author: Burak Yavuz 
Date:   2017-09-11T17:44:15Z

save

commit 00fa5923c7663f58df72937626bfadac5dc2f1fd
Author: Burak Yavuz 
Date:   2017-09-12T04:32:30Z

ready for review

commit 090044ca089870befff464d37f098c4d4fd19657
Author: Burak Yavuz 
Date:   2017-09-12T04:33:05Z

uncomment




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19195: [DOCS] Fix unreachable links in the document

2017-09-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19195#discussion_r138249514
  
--- Diff: docs/building-spark.md ---
@@ -111,7 +111,7 @@ should run continuous compilation (i.e. wait for 
changes). However, this has not
 extensively. A couple of gotchas to note:
 
 * it only scans the paths `src/main` and `src/test` (see
-[docs](http://scala-tools.org/mvnsites/maven-scala-plugin/usage_cc.html)), 
so it will only work
+[docs](http://davidb.github.io/scala-maven-plugin/example_compile.html)), 
so it will only work
--- End diff --

@sarutak, what do you think about using 
http://davidb.github.io/scala-maven-plugin/example_cc.html instead? (I [checked 
it](https://github.com/davidB/scala-maven-plugin/blob/2.9.1/src/site/xdoc/usage_cc.xml)
 although it looks not a big deal though as they have similar information).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19107: [SPARK-21799][ML] Fix `KMeans` performance regression ca...

2017-09-11 Thread WeichenXu123
Github user WeichenXu123 commented on the issue:

https://github.com/apache/spark/pull/19107
  
OK. Thanks @zhengruifeng .I will close this PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19107: [SPARK-21799][ML] Fix `KMeans` performance regres...

2017-09-11 Thread WeichenXu123
Github user WeichenXu123 closed the pull request at:

https://github.com/apache/spark/pull/19107


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18317: [SPARK-21113][CORE] Read ahead input stream to amortize ...

2017-09-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18317
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18317: [SPARK-21113][CORE] Read ahead input stream to amortize ...

2017-09-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18317
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81649/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18317: [SPARK-21113][CORE] Read ahead input stream to amortize ...

2017-09-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18317
  
**[Test build #81649 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81649/testReport)**
 for PR 18317 at commit 
[`36ae448`](https://github.com/apache/spark/commit/36ae4482d3193509c6dc06a79b8ebfe5a2b88752).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18317: [SPARK-21113][CORE] Read ahead input stream to amortize ...

2017-09-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18317
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18317: [SPARK-21113][CORE] Read ahead input stream to amortize ...

2017-09-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18317
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81648/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18317: [SPARK-21113][CORE] Read ahead input stream to amortize ...

2017-09-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18317
  
**[Test build #81648 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81648/testReport)**
 for PR 18317 at commit 
[`ed426f3`](https://github.com/apache/spark/commit/ed426f33f1d858c74f380218e520127b66f43453).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19132: [SPARK-21922] Fix duration always updating when t...

2017-09-11 Thread caneGuy
Github user caneGuy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19132#discussion_r138243757
  
--- Diff: core/src/main/scala/org/apache/spark/ui/SparkUI.scala ---
@@ -50,6 +50,7 @@ private[spark] class SparkUI private (
 val operationGraphListener: RDDOperationGraphListener,
 var appName: String,
 val basePath: String,
+val lastUpdateTime: Long = -1L,
--- End diff --

Update @jerryshao Thanks for your time.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19107: [SPARK-21799][ML] Fix `KMeans` performance regression ca...

2017-09-11 Thread zhengruifeng
Github user zhengruifeng commented on the issue:

https://github.com/apache/spark/pull/19107
  
I am OK to resubmit the original PR if needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19186: [SPARK-21972][ML] Add param handlePersistence

2017-09-11 Thread zhengruifeng
Github user zhengruifeng commented on a diff in the pull request:

https://github.com/apache/spark/pull/19186#discussion_r138243247
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
 ---
@@ -444,13 +444,13 @@ class LogisticRegressionWithLBFGS
 lr.setFitIntercept(addIntercept)
 lr.setMaxIter(optimizer.getNumIterations())
 lr.setTol(optimizer.getConvergenceTol())
+// Determine if we should cache the DF
+lr.setHandlePersistence(input.getStorageLevel == StorageLevel.NONE)
--- End diff --

`mlllib.LoR` do not expose `HandlePersistence` to users now, and I think it 
maybe better to keep it. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19132: [SPARK-21922] Fix duration always updating when task fai...

2017-09-11 Thread caneGuy
Github user caneGuy commented on the issue:

https://github.com/apache/spark/pull/19132
  
Yes i have confirmed.Below is a testing job: @jerryshao 

![_001](https://user-images.githubusercontent.com/26762018/30306401-7c682822-97aa-11e7-82ae-9c5802680dd5.png)

![_002](https://user-images.githubusercontent.com/26762018/30306402-7c6d7cfa-97aa-11e7-9788-33774ee06c4d.png)

![_003](https://user-images.githubusercontent.com/26762018/30306403-7c94e10a-97aa-11e7-899e-a1190c0f09da.png)



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19186: [SPARK-21972][ML] Add param handlePersistence

2017-09-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19186
  
**[Test build #81651 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81651/testReport)**
 for PR 19186 at commit 
[`9e53579`](https://github.com/apache/spark/commit/9e53579b4e8e69761f5a6c89cc60ab179ff78ea6).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19186: [SPARK-21972][ML] Add param handlePersistence

2017-09-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19186
  
**[Test build #81650 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81650/testReport)**
 for PR 19186 at commit 
[`e112b42`](https://github.com/apache/spark/commit/e112b42a2df231f6b200bcb0cd3759cc143c8c80).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16422: [SPARK-17642] [SQL] support DESC EXTENDED/FORMATTED tabl...

2017-09-11 Thread wzhfy
Github user wzhfy commented on the issue:

https://github.com/apache/spark/pull/16422
  
@gatorsmile @cloud-fan Fixed comments. Do you have time to take another 
look?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19132: [SPARK-21922] Fix duration always updating when task fai...

2017-09-11 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19132
  
Looks like I don't have the Jenkins permission to trigger UT 😞 . Let me 
ping @srowen to trigger the test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19132: [SPARK-21922] Fix duration always updating when t...

2017-09-11 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19132#discussion_r138240053
  
--- Diff: core/src/main/scala/org/apache/spark/ui/SparkUI.scala ---
@@ -50,6 +50,7 @@ private[spark] class SparkUI private (
 val operationGraphListener: RDDOperationGraphListener,
 var appName: String,
 val basePath: String,
+val lastUpdateTime: Long = -1L,
--- End diff --

I would like to user `Option[Long] = None` as default value to reflect 
there's no update time. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19132: [SPARK-21922] Fix duration always updating when task fai...

2017-09-11 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19132
  
ok to test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19118: [SPARK-21882][CORE] OutputMetrics doesn't count w...

2017-09-11 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19118#discussion_r138238318
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/SparkHadoopWriter.scala ---
@@ -112,11 +112,12 @@ object SparkHadoopWriter extends Logging {
   jobTrackerId, sparkStageId, sparkPartitionId, sparkAttemptNumber)
 committer.setupTask(taskContext)
 
-val (outputMetrics, callback) = initHadoopOutputMetrics(context)
-
 // Initiate the writer.
 config.initWriter(taskContext, sparkPartitionId)
 var recordsWritten = 0L
+
+// Initialize callback function after the writer.
--- End diff --

The comment is better to explain why we must initialize the callback here. 
E.g., `We must initialize the callback for calculating bytes written after the 
statistic table is initialized in FileSystem`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19186: [SPARK-21972][ML] Add param handlePersistence

2017-09-11 Thread zhengruifeng
Github user zhengruifeng commented on a diff in the pull request:

https://github.com/apache/spark/pull/19186#discussion_r138237760
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
 ---
@@ -483,24 +488,17 @@ class LogisticRegression @Since("1.2.0") (
 this
   }
 
-  override protected[spark] def train(dataset: Dataset[_]): 
LogisticRegressionModel = {
-val handlePersistence = dataset.rdd.getStorageLevel == 
StorageLevel.NONE
-train(dataset, handlePersistence)
-  }
-
-  protected[spark] def train(
-  dataset: Dataset[_],
-  handlePersistence: Boolean): LogisticRegressionModel = {
+  protected[spark] def train(dataset: Dataset[_]): LogisticRegressionModel 
= {
 val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) 
else col($(weightCol))
 val instances: RDD[Instance] =
   dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd.map {
 case Row(label: Double, weight: Double, features: Vector) =>
   Instance(label, weight, features)
   }
 
-if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
+if ($(handlePersistence)) 
instances.persist(StorageLevel.MEMORY_AND_DISK)
--- End diff --

+1. I supposed that it's up to the users to check the `storageLevel` to 
avoid double caching. But I now approve to check in the algs, and it may be 
better to log a warning if the dataset is already cached and the 
`handlePersistence` is set `true`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...

2017-09-11 Thread goldmedal
Github user goldmedal commented on a diff in the pull request:

https://github.com/apache/spark/pull/18875#discussion_r138237482
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
 ---
@@ -193,14 +223,35 @@ private[sql] class JacksonGenerator(
*
* @param row The row to convert
*/
-  def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, 
rootFieldWriters))
+  def write(row: InternalRow): Unit = {
+writeObject(writeFields(
+  fieldWriters = rootFieldWriters,
+  row = row,
+  schema = dataType.asInstanceOf[StructType]))
+  }
+
+
+  /**
+   * Transforms multiple `InternalRow`s or `MapData`s to JSON array using 
Jackson
+   *
+   * @param array The array of rows or maps to convert
+   */
+  def write(array: ArrayData): Unit = writeArray(writeArrayData(
+fieldWriter = arrElementWriter,
+array = array
+  ))
 
   /**
-   * Transforms multiple `InternalRow`s to JSON array using Jackson
+   * Transforms a single `MapData` to JSON object using Jackson
*
-   * @param array The array of rows to convert
+   * @param map a map to convert
*/
-  def write(array: ArrayData): Unit = writeArray(writeArrayData(array, 
arrElementWriter))
+  def write(map: MapData): Unit = {
+writeObject(writeMapData(
+  fieldWriter = mapElementWriter,
--- End diff --

Ok Thanks for review :) I'll update it tonight.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread sitalkedia
Github user sitalkedia commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138230590
  
--- Diff: 
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
 ---
@@ -72,10 +72,15 @@ public UnsafeSorterSpillReader(
   bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES;
 }
 
+final Double readAheadFraction =
+SparkEnv.get() == null ? 0.5 :
+ 
SparkEnv.get().conf().getDouble("spark.unsafe.sorter.spill.read.ahead.fraction",
 0.5);
+
 final InputStream bs =
 new NioBufferedFileInputStream(file, (int) bufferSizeBytes);
 try {
-  this.in = serializerManager.wrapStream(blockId, bs);
+  this.in = new 
ReadAheadInputStream(serializerManager.wrapStream(blockId, bs),
--- End diff --

sure, done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread sitalkedia
Github user sitalkedia commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138231043
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final 

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread sitalkedia
Github user sitalkedia commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138230901
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final 

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread sitalkedia
Github user sitalkedia commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138230596
  
--- Diff: 
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
 ---
@@ -72,10 +72,15 @@ public UnsafeSorterSpillReader(
   bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES;
 }
 
+final Double readAheadFraction =
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18875: [SPARK-21513][SQL] Allow UDF to_json support converting ...

2017-09-11 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/18875
  
Minor comments left, otherwise LGTM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...

2017-09-11 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18875#discussion_r138233095
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JacksonGeneratorSuite.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.json
+
+import java.io.CharArrayWriter
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
DateTimeUtils, GenericArrayData}
+import org.apache.spark.sql.types._
+
+class JacksonGeneratorSuite extends SparkFunSuite {
+
+  val gmtId = DateTimeUtils.TimeZoneGMT.getID
+  val option = new JSONOptions(Map.empty, gmtId)
+
+  test("initial with StructType and write out a row") {
+val dataType = StructType(StructField("a", IntegerType) :: Nil)
+val input = InternalRow(1)
+val writer = new CharArrayWriter()
+val gen = new JacksonGenerator(dataType, writer, option)
+gen.write(input)
+gen.flush()
+assert(writer.toString === """{"a":1}""")
+  }
+
+  test("initial with StructType and write out rows") {
+val dataType = StructType(StructField("a", IntegerType) :: Nil)
+val input = new GenericArrayData(InternalRow(1) :: InternalRow(2) :: 
Nil)
+val writer = new CharArrayWriter()
+val gen = new JacksonGenerator(dataType, writer, option)
+gen.write(input)
+gen.flush()
+assert(writer.toString === """[{"a":1},{"a":2}]""")
+  }
+
+  test("initial with StructType and write out an array with single empty 
row") {
+val dataType = StructType(StructField("a", IntegerType) :: Nil)
+val input = new GenericArrayData(InternalRow(null) :: Nil)
+val writer = new CharArrayWriter()
+val gen = new JacksonGenerator(dataType, writer, option)
+gen.write(input)
+gen.flush()
+assert(writer.toString === """[{}]""")
+  }
+
+  test("initial with StructType and write out an empty array") {
+val dataType = StructType(StructField("a", IntegerType) :: Nil)
+val input = new GenericArrayData(Nil)
+val writer = new CharArrayWriter()
+val gen = new JacksonGenerator(dataType, writer, option)
+gen.write(input)
+gen.flush()
+assert(writer.toString === """[]""")
+  }
+
+  test("initial with Map and write out a map data") {
+val dataType = MapType(StringType, IntegerType)
+val input = ArrayBasedMapData(Map("a" -> 1))
+val writer = new CharArrayWriter()
+val gen = new JacksonGenerator(dataType, writer, option)
+gen.write(input)
+gen.flush()
+assert(writer.toString === """{"a":1}""")
+  }
+
+  test("initial with Map and write out an array of maps") {
+val dataType = MapType(StringType, IntegerType)
+val input = new GenericArrayData(
+  ArrayBasedMapData(Map("a" -> 1)) :: ArrayBasedMapData(Map("b" -> 2)) 
:: Nil)
+val writer = new CharArrayWriter()
+val gen = new JacksonGenerator(dataType, writer, option)
+gen.write(input)
+gen.flush()
+assert(writer.toString === """[{"a":1},{"b":2}]""")
+  }
+
+  test("error handling: initial with StructType but error calling write a 
map") {
+val dataType = StructType(StructField("a", IntegerType) :: Nil)
+val input = ArrayBasedMapData(Map("a" -> 1))
+val writer = new CharArrayWriter()
+val gen = new JacksonGenerator(dataType, writer, option)
+intercept[Exception] {
+  gen.write(input)
+}
+  }
+
+  test("error handling: initial with StructType but error calling write an 
array of maps") {
+val dataType = StructType(StructField("a", IntegerType) :: Nil)
+val input = new GenericArrayData(
+  ArrayBasedMapData(Map("a" -> 1)) :: ArrayBasedMapData(Map("b" -> 2)) 
:: Nil)
+val writer = new 

[GitHub] spark issue #18853: [SPARK-21646][SQL] CommonType for binary comparison

2017-09-11 Thread wangyum
Github user wangyum commented on the issue:

https://github.com/apache/spark/pull/18853
  
CC @gatorsmile, @cloud-fan


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...

2017-09-11 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/18875#discussion_r138232491
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
 ---
@@ -193,14 +223,35 @@ private[sql] class JacksonGenerator(
*
* @param row The row to convert
*/
-  def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, 
rootFieldWriters))
+  def write(row: InternalRow): Unit = {
+writeObject(writeFields(
+  fieldWriters = rootFieldWriters,
+  row = row,
+  schema = dataType.asInstanceOf[StructType]))
+  }
+
+
+  /**
+   * Transforms multiple `InternalRow`s or `MapData`s to JSON array using 
Jackson
+   *
+   * @param array The array of rows or maps to convert
+   */
+  def write(array: ArrayData): Unit = writeArray(writeArrayData(
+fieldWriter = arrElementWriter,
+array = array
+  ))
 
   /**
-   * Transforms multiple `InternalRow`s to JSON array using Jackson
+   * Transforms a single `MapData` to JSON object using Jackson
*
-   * @param array The array of rows to convert
+   * @param map a map to convert
*/
-  def write(array: ArrayData): Unit = writeArray(writeArrayData(array, 
arrElementWriter))
+  def write(map: MapData): Unit = {
+writeObject(writeMapData(
+  fieldWriter = mapElementWriter,
--- End diff --

And also add a similar comment in `write(row: InternalRow)`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18317: [SPARK-21113][CORE] Read ahead input stream to amortize ...

2017-09-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18317
  
**[Test build #81649 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81649/testReport)**
 for PR 18317 at commit 
[`36ae448`](https://github.com/apache/spark/commit/36ae4482d3193509c6dc06a79b8ebfe5a2b88752).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18317: [SPARK-21113][CORE] Read ahead input stream to amortize ...

2017-09-11 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18317
  
**[Test build #81648 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81648/testReport)**
 for PR 18317 at commit 
[`ed426f3`](https://github.com/apache/spark/commit/ed426f33f1d858c74f380218e520127b66f43453).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19192: Quote table names in JDBC queries

2017-09-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19192#discussion_r138228826
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala ---
@@ -100,7 +100,7 @@ abstract class JdbcDialect extends Serializable {
* @return The SQL query to use for checking the table.
*/
   def getTableExistsQuery(table: String): String = {
-s"SELECT * FROM $table WHERE 1=0"
+s"SELECT * FROM ${quoteIdentifier(table)} WHERE 1=0"
--- End diff --

This is a behavior change. After we quote the names, the table names become 
case sensitive in most RDBMS. We need at least a JDBC option to control it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19192: Quote table names in JDBC queries

2017-09-11 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19192
  
Without this PR change, can users add quotes in the JDBC option `dbtable`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...

2017-09-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18875#discussion_r138228117
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
 ---
@@ -193,14 +223,35 @@ private[sql] class JacksonGenerator(
*
* @param row The row to convert
*/
-  def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, 
rootFieldWriters))
+  def write(row: InternalRow): Unit = {
+writeObject(writeFields(
+  fieldWriters = rootFieldWriters,
+  row = row,
+  schema = dataType.asInstanceOf[StructType]))
+  }
+
+
+  /**
+   * Transforms multiple `InternalRow`s or `MapData`s to JSON array using 
Jackson
+   *
+   * @param array The array of rows or maps to convert
+   */
+  def write(array: ArrayData): Unit = writeArray(writeArrayData(
+fieldWriter = arrElementWriter,
+array = array
+  ))
--- End diff --

Let's change this one back to:

```scala
def write(array: ArrayData): Unit = writeArray(writeArrayData(array, 
arrElementWriter))
```




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...

2017-09-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18875#discussion_r138228185
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
 ---
@@ -193,14 +223,35 @@ private[sql] class JacksonGenerator(
*
* @param row The row to convert
*/
-  def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, 
rootFieldWriters))
+  def write(row: InternalRow): Unit = {
+writeObject(writeFields(
+  fieldWriters = rootFieldWriters,
+  row = row,
+  schema = dataType.asInstanceOf[StructType]))
+  }
+
+
+  /**
+   * Transforms multiple `InternalRow`s or `MapData`s to JSON array using 
Jackson
+   *
+   * @param array The array of rows or maps to convert
+   */
+  def write(array: ArrayData): Unit = writeArray(writeArrayData(
+fieldWriter = arrElementWriter,
+array = array
+  ))
 
   /**
-   * Transforms multiple `InternalRow`s to JSON array using Jackson
+   * Transforms a single `MapData` to JSON object using Jackson
*
-   * @param array The array of rows to convert
+   * @param map a map to convert
*/
-  def write(array: ArrayData): Unit = writeArray(writeArrayData(array, 
arrElementWriter))
+  def write(map: MapData): Unit = {
+writeObject(writeMapData(
+  fieldWriter = mapElementWriter,
--- End diff --

Let's add a small comment saying this actually triggers the proper 
validation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18875: [SPARK-21513][SQL] Allow UDF to_json support conv...

2017-09-11 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18875#discussion_r138228111
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala
 ---
@@ -193,14 +223,35 @@ private[sql] class JacksonGenerator(
*
* @param row The row to convert
*/
-  def write(row: InternalRow): Unit = writeObject(writeFields(row, schema, 
rootFieldWriters))
+  def write(row: InternalRow): Unit = {
+writeObject(writeFields(
+  fieldWriters = rootFieldWriters,
+  row = row,
+  schema = dataType.asInstanceOf[StructType]))
+  }
+
+
+  /**
+   * Transforms multiple `InternalRow`s or `MapData`s to JSON array using 
Jackson
+   *
+   * @param array The array of rows or maps to convert
+   */
+  def write(array: ArrayData): Unit = writeArray(writeArrayData(
+fieldWriter = arrElementWriter,
+array = array
+  ))
--- End diff --

Let's change this one back to:

```scala
def write(array: ArrayData): Unit = writeArray(writeArrayData(array, 
arrElementWriter))
```




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138225592
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final 

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138201871
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final 

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138207519
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final 

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138193555
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final 

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138225438
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final 

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138198506
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
--- End diff --

nit: double spaces



[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138207456
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final 

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138195259
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
--- End diff --

nit: could you add the `bufferSizeInBytes` value to the error message?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138194732
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final 

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138204793
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,292 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean isReadInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean isReadAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private final byte[] oneByte = new byte[1];
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if(activeBuffer.remaining() == 0 && readAheadBuffer.remaining() == 0 
&& endOfStream) {
+  return true;
+}
+return  false;
+  }
+
+
+  private void readAsync(final ByteBuffer byteBuffer) throws IOException {
+stateChangeLock.lock();
+if (endOfStream || isReadInProgress) {
+  

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138200321
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,317 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private final byte[] oneByte = new byte[1];
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean hasRemaining() {
+if(activeBuffer.remaining() == 0 && readAheadBuffer.remaining() == 0 
&& endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final ByteBuffer byteBuffer) throws IOException {
+

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138194101
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final 

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138189593
  
--- Diff: 
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
 ---
@@ -72,10 +72,15 @@ public UnsafeSorterSpillReader(
   bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES;
 }
 
+final Double readAheadFraction =
+SparkEnv.get() == null ? 0.5 :
+ 
SparkEnv.get().conf().getDouble("spark.unsafe.sorter.spill.read.ahead.fraction",
 0.5);
+
 final InputStream bs =
 new NioBufferedFileInputStream(file, (int) bufferSizeBytes);
 try {
-  this.in = serializerManager.wrapStream(blockId, bs);
+  this.in = new 
ReadAheadInputStream(serializerManager.wrapStream(blockId, bs),
--- End diff --

Could you add an internal conf to disable it? It will allow the user to 
disable it when the new feature causes a regression.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138189733
  
--- Diff: 
core/src/test/java/org/apache/spark/io/GenericFileInputStreamSuite.java ---
@@ -50,17 +52,16 @@ public void tearDown() {
 inputFile.delete();
   }
 
+
--- End diff --

nit: extra empty line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138201264
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final 

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138188910
  
--- Diff: 
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
 ---
@@ -72,10 +72,15 @@ public UnsafeSorterSpillReader(
   bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES;
 }
 
+final Double readAheadFraction =
--- End diff --

nit: Double -> double


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138195292
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138200201
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final 

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138194341
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final 

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138208239
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
+  }
+  private void readAsync(final 

[GitHub] spark pull request #18317: [SPARK-21113][CORE] Read ahead input stream to am...

2017-09-11 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/18317#discussion_r138198579
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -0,0 +1,315 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.spark.storage.StorageUtils;
+import org.apache.spark.util.ThreadUtils;
+
+import javax.annotation.concurrent.GuardedBy;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * {@link InputStream} implementation which asynchronously reads ahead 
from the underlying input
+ * stream when specified amount of data has been read from the current 
buffer. It does it by maintaining
+ * two buffer - active buffer and read ahead buffer. Active buffer 
contains data which should be returned
+ * when a read() call is issued. The read ahead buffer is used to 
asynchronously read from the underlying
+ * input stream and once the current active buffer is exhausted, we flip 
the two buffers so that we can
+ * start reading from the read ahead buffer without being blocked in disk 
I/O.
+ */
+public class ReadAheadInputStream extends InputStream {
+
+  private ReentrantLock stateChangeLock = new ReentrantLock();
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer activeBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private ByteBuffer readAheadBuffer;
+
+  @GuardedBy("stateChangeLock")
+  private boolean endOfStream;
+
+  @GuardedBy("stateChangeLock")
+  // true if async read is in progress
+  private boolean readInProgress;
+
+  @GuardedBy("stateChangeLock")
+  // true if read is aborted due to an exception in reading from 
underlying input stream.
+  private boolean readAborted;
+
+  @GuardedBy("stateChangeLock")
+  private Exception readException;
+
+  // If the remaining data size in the current buffer is below this 
threshold,
+  // we issue an async read from the underlying input stream.
+  private final int readAheadThresholdInBytes;
+
+  private final InputStream underlyingInputStream;
+
+  private final ExecutorService executorService = 
ThreadUtils.newDaemonSingleThreadExecutor("read-ahead");
+
+  private final Condition asyncReadComplete = 
stateChangeLock.newCondition();
+
+  private static final ThreadLocal oneByte = 
ThreadLocal.withInitial(() -> new byte[1]);
+
+  /**
+   * Creates a ReadAheadInputStream with the specified buffer 
size and read-ahead
+   * threshold
+   *
+   * @param   inputStream The underlying input stream.
+   * @param   bufferSizeInBytes   The buffer size.
+   * @param   readAheadThresholdInBytes   If the active buffer has less 
data than the read-ahead
+   *  threshold, an async read is 
triggered.
+   */
+  public ReadAheadInputStream(InputStream inputStream, int 
bufferSizeInBytes, int readAheadThresholdInBytes) {
+Preconditions.checkArgument(bufferSizeInBytes > 0,
+"bufferSizeInBytes should be greater than 0");
+Preconditions.checkArgument(readAheadThresholdInBytes > 0 &&
+readAheadThresholdInBytes < bufferSizeInBytes,
+"readAheadThresholdInBytes should be greater than 0 and less 
than bufferSizeInBytes" );
+activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
+this.readAheadThresholdInBytes = readAheadThresholdInBytes;
+this.underlyingInputStream = inputStream;
+activeBuffer.flip();
+readAheadBuffer.flip();
+  }
+
+  private boolean isEndOfStream() {
+if (!activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && 
endOfStream) {
+  return true;
+}
+return  false;
--- End diff --

why not just `return 

[GitHub] spark pull request #19136: [SPARK-15689][SQL] data source v2

2017-09-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r138224219
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.datasources.DataSourceStrategy
+import org.apache.spark.sql.sources.Filter
+import 
org.apache.spark.sql.sources.v2.reader.downward.{CatalystFilterPushDownSupport, 
ColumnPruningSupport, FilterPushDownSupport}
+
+object DataSourceV2Strategy extends Strategy {
+  // TODO: write path
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+case PhysicalOperation(projects, filters, DataSourceV2Relation(output, 
reader)) =>
+  val attrMap = AttributeMap(output.zip(output))
+
+  val projectSet = AttributeSet(projects.flatMap(_.references))
+  val filterSet = AttributeSet(filters.flatMap(_.references))
+
+  // Match original case of attributes.
+  // TODO: nested fields pruning
+  val requiredColumns = (projectSet ++ filterSet).toSeq.map(attrMap)
+  reader match {
+case r: ColumnPruningSupport =>
+  r.pruneColumns(requiredColumns.toStructType)
+case _ =>
+  }
+
+  val stayUpFilters: Seq[Expression] = reader match {
+case r: CatalystFilterPushDownSupport =>
+  r.pushCatalystFilters(filters.toArray)
+
+case r: FilterPushDownSupport =>
--- End diff --

yea we can't prevent users to implement them both, and we will pick 
`CatalystFilterPushDownSupport` over `FilterPushDownSupport`. Let me document 
it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19136: [SPARK-15689][SQL] data source v2

2017-09-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19136#discussion_r138224082
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
+import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader
+import org.apache.spark.sql.sources.v2.reader.upward.StatisticsSupport
+
+case class DataSourceV2Relation(
+output: Seq[AttributeReference],
+reader: DataSourceV2Reader) extends LeafNode {
+
+  override def computeStats(): Statistics = reader match {
+case r: StatisticsSupport => Statistics(sizeInBytes = 
r.getStatistics.sizeInBytes())
+case _ => Statistics(sizeInBytes = conf.defaultSizeInBytes)
+  }
+}
+
+object DataSourceV2Relation {
+  def apply(reader: DataSourceV2Reader): DataSourceV2Relation = {
+new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
--- End diff --

I think users can write a special `ReadTask` to do it, but we can't save 
memory by doing this. When an operator(the scan operator) transfers data to 
another operator, the data must be `UnsafeRow`s. So even users return a joined 
row in `ReadTask`, Spark need to convert it to `UnsafeRow`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19185: [Spark-21854] Added LogisticRegressionTrainingSum...

2017-09-11 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/19185#discussion_r138220213
  
--- Diff: python/pyspark/ml/tests.py ---
@@ -1473,11 +1473,59 @@ def test_logistic_regression_summary(self):
 self.assertTrue(isinstance(s.fMeasureByThreshold, DataFrame))
 self.assertTrue(isinstance(s.precisionByThreshold, DataFrame))
 self.assertTrue(isinstance(s.recallByThreshold, DataFrame))
+
+self.assertAlmostEqual(s.accuracy, 1.0, 2)
--- End diff --

also nit, but should probably add tests for all the new attributes, like 
`falsePositiveRateByLabel` as below.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19185: [Spark-21854] Added LogisticRegressionTrainingSum...

2017-09-11 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/19185#discussion_r138219915
  
--- Diff: python/pyspark/ml/tests.py ---
@@ -1473,11 +1473,59 @@ def test_logistic_regression_summary(self):
 self.assertTrue(isinstance(s.fMeasureByThreshold, DataFrame))
 self.assertTrue(isinstance(s.precisionByThreshold, DataFrame))
 self.assertTrue(isinstance(s.recallByThreshold, DataFrame))
+
+self.assertAlmostEqual(s.accuracy, 1.0, 2)
+self.assertAlmostEqual(s.weightedTruePositiveRate, 1.0, 2)
+self.assertAlmostEqual(s.weightedFalsePositiveRate, 0.0, 2)
+self.assertAlmostEqual(s.weightedRecall, 1.0, 2)
+self.assertAlmostEqual(s.weightedPrecision, 1.0, 2)
+self.assertAlmostEqual(s.weightedFMeasure(), 1.0, 2)
 # test evaluation (with training dataset) produces a summary with 
same values
 # one check is enough to verify a summary is returned, Scala 
version runs full test
 sameSummary = model.evaluate(df)
 self.assertAlmostEqual(sameSummary.areaUnderROC, s.areaUnderROC)
 
+def test_multiclass_logistic_regression_summary(self):
+df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)),
+ (0.0, 2.0, Vectors.sparse(1, [], 
[])),
+ (2.0, 2.0, Vectors.dense(2.0)),
+ (2.0, 2.0, Vectors.dense(1.9))],
+["label", "weight", "features"])
+lr = LogisticRegression(maxIter=5, regParam=0.01, 
weightCol="weight", fitIntercept=False)
+model = lr.fit(df)
+self.assertTrue(model.hasSummary)
+s = model.summary
+# test that api is callable and returns expected types
+self.assertTrue(isinstance(s.predictions, DataFrame))
+self.assertEqual(s.probabilityCol, "probability")
+self.assertEqual(s.labelCol, "label")
+self.assertEqual(s.featuresCol, "features")
+self.assertEqual(s.predictionCol, "prediction")
+objHist = s.objectiveHistory
+self.assertTrue(isinstance(objHist, list) and 
isinstance(objHist[0], float))
+self.assertGreater(s.totalIterations, 0)
+self.assertTrue(isinstance(s.labels, list))
+self.assertTrue(isinstance(s.truePositiveRateByLabel, list))
+self.assertTrue(isinstance(s.falsePositiveRateByLabel, list))
+self.assertTrue(isinstance(s.precisionByLabel, list))
+self.assertTrue(isinstance(s.recallByLabel, list))
+self.assertTrue(isinstance(s.fMeasureByLabel(), list))
+self.assertAlmostEqual(s.accuracy, 0.75, 2)
+self.assertAlmostEqual(s.weightedTruePositiveRate, 0.75, 2)
+self.assertAlmostEqual(s.weightedFalsePositiveRate, 0.25, 2)
+self.assertAlmostEqual(s.weightedRecall, 0.75, 2)
+self.assertAlmostEqual(s.weightedPrecision, 0.583, 2)
+self.assertAlmostEqual(s.weightedFMeasure(), 0.65, 2)
--- End diff --

maybe add `beta=1.0` to the methods that take beta as a parameter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19185: [Spark-21854] Added LogisticRegressionTrainingSum...

2017-09-11 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/19185#discussion_r138220005
  
--- Diff: python/pyspark/ml/tests.py ---
@@ -1473,11 +1473,59 @@ def test_logistic_regression_summary(self):
 self.assertTrue(isinstance(s.fMeasureByThreshold, DataFrame))
 self.assertTrue(isinstance(s.precisionByThreshold, DataFrame))
 self.assertTrue(isinstance(s.recallByThreshold, DataFrame))
+
+self.assertAlmostEqual(s.accuracy, 1.0, 2)
--- End diff --

care to add these to the scala unit test for binary summary as well?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19185: [Spark-21854] Added LogisticRegressionTrainingSum...

2017-09-11 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/19185#discussion_r138220297
  
--- Diff: python/pyspark/ml/classification.py ---
@@ -528,9 +528,11 @@ def summary(self):
 trained on the training set. An exception is thrown if 
`trainingSummary is None`.
 """
 if self.hasSummary:
-java_blrt_summary = self._call_java("summary")
-# Note: Once multiclass is added, update this to return 
correct summary
-return 
BinaryLogisticRegressionTrainingSummary(java_blrt_summary)
+java_lrt_summary = self._call_java("summary")
+if (self.numClasses <= 2):
--- End diff --

nit: remove parentheses


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19188: [SPARK-21973][SQL] Add an new option to filter qu...

2017-09-11 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/19188#discussion_r138221684
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
 ---
@@ -113,12 +114,39 @@ object TPCDSQueryBenchmark {
   "q81", "q82", "q83", "q84", "q85", "q86", "q87", "q88", "q89", "q90",
   "q91", "q92", "q93", "q94", "q95", "q96", "q97", "q98", "q99")
 
+val sparkConf = new SparkConf()
--- End diff --

ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18592: [SPARK-21368][SQL] TPCDSQueryBenchmark can't refe...

2017-09-11 Thread sarutak
Github user sarutak commented on a diff in the pull request:

https://github.com/apache/spark/pull/18592#discussion_r138220942
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
 ---
@@ -99,6 +95,20 @@ object TPCDSQueryBenchmark {
   }
 
   def main(args: Array[String]): Unit = {
+if (args.length < 1) {
--- End diff --

Good idea. I'll add `TPCDSQueryBenchmarkArguments`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19141: [SPARK-21384] [YARN] Spark + YARN fails with Loca...

2017-09-11 Thread devaraj-kavali
Github user devaraj-kavali commented on a diff in the pull request:

https://github.com/apache/spark/pull/19141#discussion_r138219530
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -565,7 +565,6 @@ private[spark] class Client(
   distribute(jarsArchive.toURI.getPath,
 resType = LocalResourceType.ARCHIVE,
 destName = Some(LOCALIZED_LIB_DIR))
-  jarsArchive.delete()
--- End diff --

Thanks @vanzin for the pointer. It was my mistake, I missed the change 
reason while looking at the history of the file.

I still see that SPARK-20741 has fixed the issue partially, it leaves 
\_\_spark_conf\_\_*.zip file to delete as part of shutdownhook.

I see these approaches to fix it further,

1. Delete \_\_spark_conf\_\_*.zip and \_\_spark_libs\_\_*.zip files after 
completing the application similar to cleanupStagingDir. 
(Or)
2. Add a configuration whether to delete \_\_spark_conf\_\_*.zip and 
\_\_spark_libs\_\_*.zip files after copying to dest dir, so that users can 
decide whether to delete these immediately or as part of process exit. In case 
of SPARK-20741, this new configuration can be enabled to delete these files 
immediately. 

@vanzin & @jerryshao Please let me know your thoughts on this or if you 
have any other way to do this. Thanks



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18945: Add option to convert nullable int columns to float colu...

2017-09-11 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/18945
  
Hey @logannc, have you had some time to work on this? I want to fix this 
issue asap. Ortherwise, would anyone here be interested in submitimg another PR 
for the another approach?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19188: [SPARK-21973][SQL] Add an new option to filter qu...

2017-09-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19188#discussion_r138217212
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
 ---
@@ -113,12 +114,39 @@ object TPCDSQueryBenchmark {
   "q81", "q82", "q83", "q84", "q85", "q86", "q87", "q88", "q89", "q90",
   "q91", "q92", "q93", "q94", "q95", "q96", "q97", "q98", "q99")
 
+val sparkConf = new SparkConf()
--- End diff --

Could we add an argument, instead of using the SQLConf? See 
https://github.com/apache/spark/pull/18592#discussion_r138217049


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19106: [SPARK-21770][ML] ProbabilisticClassificationModel fix c...

2017-09-11 Thread sethah
Github user sethah commented on the issue:

https://github.com/apache/spark/pull/19106
  
I'm confused how this issue was discovered in the first place. Did someone 
actually train an RF/DT and receive all zero probabilities? If so, shouldn't 
there be a unit test that recreates that scenario? 

Anyway, AFAICT the only way this could happen is if the RandomForest was 
trained on an empty DataFrame which couldn't happen because it would fail 
before the train method was called anyway.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18592: [SPARK-21368][SQL] TPCDSQueryBenchmark can't refe...

2017-09-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18592#discussion_r138217049
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/TPCDSQueryBenchmark.scala
 ---
@@ -99,6 +95,20 @@ object TPCDSQueryBenchmark {
   }
 
   def main(args: Array[String]): Unit = {
+if (args.length < 1) {
--- End diff --

@sarutak @maropu Could we do something like 
https://github.com/apache/spark/blob/master/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala?
 

We also can use add another option for outputing the plans of TPC-DS 
queries, instead of running all the queries.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19147: [WIP][SPARK-21190][SQL][PYTHON] Vectorized UDFs i...

2017-09-11 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19147#discussion_r138215300
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/VectorizedPythonRunner.scala
 ---
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, 
DataInputStream, DataOutputStream}
+import java.net.Socket
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.arrow.vector.VectorSchemaRoot
+import org.apache.arrow.vector.stream.{ArrowStreamReader, 
ArrowStreamWriter}
+
+import org.apache.spark.{SparkEnv, SparkFiles, TaskContext}
+import org.apache.spark.api.python.{ChainedPythonFunctions, 
PythonEvalType, PythonException, PythonRDD, SpecialLengths}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.arrow.{ArrowUtils, ArrowWriter}
+import org.apache.spark.sql.execution.vectorized.{ArrowColumnVector, 
ColumnarBatch, ColumnVector}
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
+
+/**
+ * Similar to `PythonRunner`, but exchange data with Python worker via 
columnar format.
+ */
+class VectorizedPythonRunner(
+funcs: Seq[ChainedPythonFunctions],
+batchSize: Int,
+bufferSize: Int,
+reuse_worker: Boolean,
+argOffsets: Array[Array[Int]]) extends Logging {
+
+  require(funcs.length == argOffsets.length, "argOffsets should have the 
same length as funcs")
+
+  // All the Python functions should have the same exec, version and 
envvars.
+  private val envVars = funcs.head.funcs.head.envVars
+  private val pythonExec = funcs.head.funcs.head.pythonExec
+  private val pythonVer = funcs.head.funcs.head.pythonVer
+
+  // TODO: support accumulator in multiple UDF
+  private val accumulator = funcs.head.funcs.head.accumulator
+
+  // todo: return column batch?
+  def compute(
--- End diff --

I was referring to the protocol between Scala and Python that is changed 
here and could act differently under some circumstances.  Here is the behavior 
of the `PythonRunner` protocol and `VectorizedPythonRunner` protocol that you 
introduce here:

**PythonRunner**
Data blocks are framed by a special length integer.  Scala reads each data 
block one at a time and checks the length code.  If the code is a 
`PythonException`, the error is read from Python and a `SparkException` is 
thrown with that being the cause.

**VectorizedPythonRunner**
A data stream is opened in Scala with `ArrowStreamReader` and batches are 
transferred until `ArrowStreamReader` returns False indicating there is no more 
data.  Only at this point are the special length codes checked to handle an 
error from Python.

This behavior change would probably only cause problems if things are not 
working normally.  For example, what would happen if `pyarrow` was not 
installed on an executor?  With `PythonRunner` the ImportError would cause a 
`PythonException` to be transferred and thrown in Scala.  In 
`VectorizedPythonRunner` I believe the `ArrowStreamReader` would try to read 
the special length code and then fail somewhere internally to Arrow, not 
showing the ImportError.

My point was that this type of behavior change should probably be 
implemented in a separate JIRA where we could make sure to handle all of these 
cases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19107: [SPARK-21799][ML] Fix `KMeans` performance regression ca...

2017-09-11 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/19107
  
@WeichenXu123 I just commented on 
https://issues.apache.org/jira/browse/SPARK-18608 to clarify our efforts here.  
Can you please either retarget this for SPARK-18608 and update it, or ask 
@zhengruifeng to submit his original PR as the fix?  Please coordinate, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r138191841
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/history/config.scala 
---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.concurrent.TimeUnit
+
+import scala.annotation.meta.getter
+
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.util.kvstore.KVIndex
+
+private[spark] object config {
+
+  /** Use this to annotate constructor params to be used as KVStore 
indices. */
+  type KVIndexParam = KVIndex @getter
+
+  val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
+
+  val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory")
+.stringConf
+.createWithDefault(DEFAULT_LOG_DIR)
+
+  val MAX_LOG_AGE_S = ConfigBuilder("spark.history.fs.cleaner.maxAge")
+.timeConf(TimeUnit.SECONDS)
+.createWithDefaultString("7d")
+
+  val LOCAL_STORE_DIR = ConfigBuilder("spark.history.store.path")
--- End diff --

It'd better to document the default one is an in-memory store.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r137942598
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -229,10 +254,22 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 }
   }
 
-  override def getListing(): Iterator[FsApplicationHistoryInfo] = 
applications.values.iterator
+  override def getListing(): Iterator[ApplicationHistoryInfo] = {
--- End diff --

The returned order is `descending`, right? This is not straightforward from 
the codes. Please add a comment


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r137942907
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -316,25 +353,21 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 try {
   val newLastScanTime = getNewLastScanTime()
   logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
-  val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
-.getOrElse(Seq.empty[FileStatus])
+  val statusList = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq).getOrElse(Nil)
   // scan for modified applications, replay and merge them
-  val logInfos: Seq[FileStatus] = statusList
+  val logInfos = statusList
 .filter { entry =>
-  val fileInfo = fileToAppInfo.get(entry.getPath())
-  val prevFileSize = if (fileInfo != null) fileInfo.fileSize else 
0L
   !entry.isDirectory() &&
 // FsHistoryProvider generates a hidden file which can't be 
read.  Accidentally
 // reading a garbage file is safe, but we would log an error 
which can be scary to
 // the end-user.
 !entry.getPath().getName().startsWith(".") &&
-prevFileSize < entry.getLen() &&
-SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
+SparkHadoopUtil.get.checkAccessPermission(entry, 
FsAction.READ) &&
+recordedFileSize(entry.getPath()) < entry.getLen()
--- End diff --

Can we add a comment to explain what `recordedFileSize(entry.getPath())` 
returns? In the original code, the variable name is self descriptive. The new 
change does not have it any more.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18887: [SPARK-20642][core] Store FsHistoryProvider listi...

2017-09-11 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18887#discussion_r137940658
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/history/config.scala 
---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.history
+
+import java.util.concurrent.TimeUnit
+
+import scala.annotation.meta.getter
+
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.util.kvstore.KVIndex
+
+private[spark] object config {
+
+  /** Use this to annotate constructor params to be used as KVStore 
indices. */
+  type KVIndexParam = KVIndex @getter
+
+  val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
+
+  val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory")
+.stringConf
+.createWithDefault(DEFAULT_LOG_DIR)
+
+  val MAX_LOG_AGE_S = ConfigBuilder("spark.history.fs.cleaner.maxAge")
+.timeConf(TimeUnit.SECONDS)
+.createWithDefaultString("7d")
+
+  val LOCAL_STORE_DIR = ConfigBuilder("spark.history.store.path")
--- End diff --

Just want to confirm it. Except this, no change on the other parameters. 
Right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   >