[GitHub] spark issue #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of `Colum...

2018-12-03 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/23152
  
LGTM, thanks for the change!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of `Colum...

2018-12-03 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/23152
  
jenkins retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of `Colum...

2018-11-30 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/23152
  
This reproes it:
```
sql("create table all_null (attr1 int, attr2 int)")
sql("insert into all_null values (null, null)")
sql("analyze table all_null compute statistics for columns attr1, 
attr2")
// check if the stats can be calculated without Cast exception.
sql("select * from all_null where attr1 < 
attr2").queryExecution.stringWithStats
```

Could we piggy back this fix here?
Sorry for posting last minute after lgtm.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of `Colum...

2018-11-30 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/23152
  
While at it, could we kill one more potential for a bug?
In `FilterEstimation.evaluateBinaryForTwoColumns` there is a
```
attrLeft.dataType match {
  case StringType | BinaryType =>
// TODO: It is difficult to support other binary comparisons for 
String/Binary
// type without min/max and advanced statistics like histogram.
logDebug("[CBO] No range comparison statistics for String/Binary 
type " + attrLeft)
return None
  case _ =>
}
```
Could we change
```
  case _ =>
if (!colStatsMap.hasMinMaxStats(attrLeft)) {
  logDebug("[CBO] No min/max statistics " + attrLeft)
  return None
}
if (!colStatsMap.hasMinMaxStats(attrRight)) {
  logDebug("[CBO] No min/max statistics " + attrRight)
  return None
}
```
This is one more place that later does
```
val statsIntervalLeft = ValueInterval(colStatLeft.min, colStatLeft.max, 
attrLeft.dataType)
  .asInstanceOf[NumericValueInterval]
...
val statsIntervalRight = ValueInterval(colStatRight.min, 
colStatRight.max, attrRight.dataType)
  .asInstanceOf[NumericValueInterval]
```

assuming that min/maxes are present, and could therefore also hint the 
ClassCastException.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of `Colum...

2018-11-30 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/23152
  
LGTM, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23152: [SPARK-26181][SQL] the `hasMinMaxStats` method of...

2018-11-30 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/23152#discussion_r237789881
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala 
---
@@ -2276,4 +2276,16 @@ class SQLQuerySuite extends QueryTest with 
SQLTestUtils with TestHiveSingleton {
 }
   }
 
+
+  test("SPARK-26181 hasMinMaxStats method of ColumnStatsMap is not 
correct") {
+withSQLConf(SQLConf.CBO_ENABLED.key -> "true") {
+  withTable("all_null") {
+sql("create table all_null (attrInt int)")
+sql("insert into all_null values (null)")
+sql("analyze table all_null compute statistics for columns 
attrInt")
+checkAnswer(sql("select * from all_null where attrInt < 1"), Nil)
--- End diff --

Normal query execution doens't trigger it here, because it doesn't need 
stats so they never get lazy evaluated.
Putting a join over it would probably trigger it without having to force it 
with .queryExecution.stringWithStats.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23127: [SPARK-26159] Codegen for LocalTableScanExec and RDDScan...

2018-11-27 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/23127
  
jenkins retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23127: [SPARK-26159] Codegen for LocalTableScanExec and RDDScan...

2018-11-27 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/23127
  
jenkins retest this


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23127: [SPARK-26159] Codegen for LocalTableScanExec and RDDScan...

2018-11-27 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/23127
  
Talked with @hvanhovell offline and set `LocalTableScanExec` and 
`InputAdapter` to not create an unsafe projection, and `RDDScanExec` and 
`RowDataSourceScanExec` to always do so, which replicates previous behaviour.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23127: [SPARK-26159] Codegen for LocalTableScanExec and RDDScan...

2018-11-27 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/23127
  
@cloud-fan Thanks. Actually, I had to revert earlier updates because the 
plan no longer changes for LocalTableScanExec that is alone in a 
wholestagecodegen.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...

2018-11-26 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/23127#discussion_r236395764
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -406,14 +415,62 @@ trait BlockingOperatorWithCodegen extends 
CodegenSupport {
   override def limitNotReachedChecks: Seq[String] = Nil
 }
 
+/**
+ * Leaf codegen node reading from a single RDD.
+ */
+trait InputRDDCodegen extends CodegenSupport {
+
+  def inputRDD: RDD[InternalRow]
+
+  // If the input is an RDD of InternalRow which are potentially not 
UnsafeRow,
+  // and there is no parent to consume it, it needs an UnsafeProjection.
+  protected val createUnsafeProjection: Boolean = (parent == null)
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+inputRDD :: Nil
+  }
+
+  override def doProduce(ctx: CodegenContext): String = {
--- End diff --

> This assumes that that parent operator would always result in some 
UnsafeProjection being eventually added, and hence the output of the 
WholeStageCodegen unit will be UnsafeRows.

I think it's quite a hack in my patch, and that there should be some nicer 
interface to tell the codegened operators whether thei're dealing with 
UnsafeRows input, or InternalRows that may not be UnsafeRows...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...

2018-11-26 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/23127#discussion_r236393985
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -406,14 +415,62 @@ trait BlockingOperatorWithCodegen extends 
CodegenSupport {
   override def limitNotReachedChecks: Seq[String] = Nil
 }
 
+/**
+ * Leaf codegen node reading from a single RDD.
+ */
+trait InputRDDCodegen extends CodegenSupport {
+
+  def inputRDD: RDD[InternalRow]
+
+  // If the input is an RDD of InternalRow which are potentially not 
UnsafeRow,
+  // and there is no parent to consume it, it needs an UnsafeProjection.
+  protected val createUnsafeProjection: Boolean = (parent == null)
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+inputRDD :: Nil
+  }
+
+  override def doProduce(ctx: CodegenContext): String = {
--- End diff --

updated description.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...

2018-11-26 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/23127#discussion_r236391673
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -406,14 +415,62 @@ trait BlockingOperatorWithCodegen extends 
CodegenSupport {
   override def limitNotReachedChecks: Seq[String] = Nil
 }
 
+/**
+ * Leaf codegen node reading from a single RDD.
+ */
+trait InputRDDCodegen extends CodegenSupport {
+
+  def inputRDD: RDD[InternalRow]
+
+  // If the input is an RDD of InternalRow which are potentially not 
UnsafeRow,
+  // and there is no parent to consume it, it needs an UnsafeProjection.
+  protected val createUnsafeProjection: Boolean = (parent == null)
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+inputRDD :: Nil
+  }
+
+  override def doProduce(ctx: CodegenContext): String = {
--- End diff --

The new one should be the same as the previous 
`RowDataSourceScanExec.doProduce` and `RDDScanExec.doProduce` if 
createUnsafeProjection == true, and it should be the same as the previous 
`InputAdapter.doProduce` and `LocalTableScanExec.doProduce` when 
createUnsafeProjection == false.

From the fact that `InputAdapter` was not doing an explicit unsafe 
projection, even though it's input could be InternalRows that are not 
UnsafeRows I derived an assumption that it is safe not to do so as long as 
there is a parent operator. This assumes that that parent operator would always 
result in an UnsafeProjection being eventually added, and hence the output of 
the WholeStageCodegen will be in UnsafeRows.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23127: [SPARK-26159] Codegen for LocalTableScanExec and RDDScan...

2018-11-26 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/23127
  
@cloud-fan @rednaxelafx
Actually, the input to a codegen stage can be an internal row so I can't 
make the inputRDD be `RDD[UnsafeRow], but the output needs to be UnsafeRow.
Doing it like `InputAdapter` did actually make it just pass-through output 
the internal row.
For `InputAdapter`, there always is some parent operator to consume it, and 
create an unsafe projection in whatever it does, and then the output UnsafeRows.
But for an `RDDScanExec` or `RowDataSourceScanExec` could be alone in a 
WholeStageCodegenExec, and then just doing `${consume(ctx, null, row)}` made it 
pass-through output the InternalRow from input.

WDYT about how I patched it up?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23127: [SPARK-26159] Codegen for LocalTableScanExec and Existin...

2018-11-23 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/23127
  
cc @hvanhovell @rednaxelafx


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23127: [SPARK-26159] Codegen for LocalTableScanExec and ...

2018-11-23 Thread juliuszsompolski
GitHub user juliuszsompolski opened a pull request:

https://github.com/apache/spark/pull/23127

[SPARK-26159] Codegen for LocalTableScanExec and ExistingRDDExec

## What changes were proposed in this pull request?

Implement codegen for LocalTableScanExec and ExistingRDDExec. Refactor to 
share code with InputAdapter.


## How was this patch tested?

Covered and used in existing tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/juliuszsompolski/apache-spark SPARK-26159

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23127.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23127


commit 23c2d9111f1cff9059746bb7b48bb8ef7ad7027b
Author: Juliusz Sompolski 
Date:   2018-11-13T09:19:09Z

localtablescanexec codegen




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23022: [SPARK-26038] Decimal toScalaBigInt/toJavaBigInteger for...

2018-11-23 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/23022
  
ping @hvanhovell 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #23022: [SPARK-26038] Decimal toScalaBigInt/toJavaBigInteger for...

2018-11-13 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/23022
  
cc @hvanhovell 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23022: [SPARK-26038] Decimal toScalaBigInt/toJavaBigInte...

2018-11-13 Thread juliuszsompolski
GitHub user juliuszsompolski opened a pull request:

https://github.com/apache/spark/pull/23022

[SPARK-26038] Decimal toScalaBigInt/toJavaBigInteger for decimals not 
fitting in long

## What changes were proposed in this pull request?

Fix Decimal `toScalaBigInt` and `toJavaBigInteger` used to only work for 
decimals not fitting long.

## How was this patch tested?

Added test to DecimalSuite.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/juliuszsompolski/apache-spark SPARK-26038

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23022.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23022


commit 9c52e09357531f081801c25a0c2533fff10aa1d8
Author: Juliusz Sompolski 
Date:   2018-11-13T10:58:26Z

SPARK-26038




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22029: [SPARK-24395][SQL] IN operator should return NULL when c...

2018-10-22 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/22029
  
IMHO if a new wrapper was justifiable for the IN-subquery in #21403, then 
it is also justifiable to add one here for the IN-literal-list case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22566: [SPARK-25458][SQL] Support FOR ALL COLUMNS in ANALYZE TA...

2018-09-28 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/22566
  
LGTM. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22566: [SPARK-25458][SQL] Support FOR ALL COLUMNS in ANALYZE TA...

2018-09-28 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/22566
  
jenkins retest this


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22209: [SPARK-24415][Core] Fixed the aggregated stage me...

2018-08-31 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/22209#discussion_r214381992
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -350,11 +350,20 @@ private[spark] class AppStatusListener(
 val e = it.next()
 if (job.stageIds.contains(e.getKey()._1)) {
   val stage = e.getValue()
-  stage.status = v1.StageStatus.SKIPPED
-  job.skippedStages += stage.info.stageId
-  job.skippedTasks += stage.info.numTasks
-  it.remove()
-  update(stage, now)
+  if (v1.StageStatus.PENDING.equals(stage.status)) {
--- End diff --

In the previous behaviour, it would have marked the stages that were ACTIVE 
as SKIPPED, which now will not happen here anymore.
It looks like the code in `onStageCompleted` may handle that in the 
`stage.status = event.stageInfo.failureReason match {` handles that. Is that 
the case?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22209: [SPARK-24415][Core] Fixed the aggregated stage me...

2018-08-31 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/22209#discussion_r214320305
  
--- Diff: 
core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
@@ -350,11 +350,20 @@ private[spark] class AppStatusListener(
 val e = it.next()
 if (job.stageIds.contains(e.getKey()._1)) {
   val stage = e.getValue()
-  stage.status = v1.StageStatus.SKIPPED
-  job.skippedStages += stage.info.stageId
-  job.skippedTasks += stage.info.numTasks
-  it.remove()
-  update(stage, now)
+  if (v1.StageStatus.PENDING.equals(stage.status)) {
--- End diff --

Is there nothing to be done here if StageStatus is ACTIVE?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22286: [SPARK-25284] Spark UI: make sure skipped stages are upd...

2018-08-31 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/22286
  
Thanks pointing to #22209. Closing this one. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22286: [SPARK-25284] Spark UI: make sure skipped stages ...

2018-08-31 Thread juliuszsompolski
Github user juliuszsompolski closed the pull request at:

https://github.com/apache/spark/pull/22286


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22286: [SPARK-25284] Spark UI: make sure skipped stages are upd...

2018-08-30 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/22286
  
cc @gatorsmile 
fyi @dbkerkela 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22286: [SPARK-25284] Spark UI: make sure skipped stages ...

2018-08-30 Thread juliuszsompolski
GitHub user juliuszsompolski opened a pull request:

https://github.com/apache/spark/pull/22286

[SPARK-25284] Spark UI: make sure skipped stages are updated onJobEnd

## What changes were proposed in this pull request?

Tiny bug in onJobEnd, not forcing the refresh of skipped stages it removes.

## How was this patch tested?

YOLO

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/juliuszsompolski/apache-spark SPARK-25284

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22286.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22286


commit 3be439c324f3d2f4fd304a97f39913940de98c56
Author: Juliusz Sompolski 
Date:   2018-08-30T16:41:33Z

SPARK-25284




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22268: [DOC] Fix comment on SparkPlanGraphEdge

2018-08-29 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/22268
  
cc @gatorsmile just a tiny nit...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22268: [DOC] Fix comment on SparkPlanGraphEdge

2018-08-29 Thread juliuszsompolski
GitHub user juliuszsompolski opened a pull request:

https://github.com/apache/spark/pull/22268

[DOC] Fix comment on SparkPlanGraphEdge

## What changes were proposed in this pull request?

`fromId` is the child, and `toId` is the parent, see line 127 in 
`buildSparkPlanGraphNode` above.
The edges in Spark UI also go from child to parent.

## How was this patch tested?

Comment change only. Inspected code above. Inspected how the edges in Spark 
UI look like.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/juliuszsompolski/apache-spark 
sparkplangraphedgedoc

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22268.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22268


commit e405f699e51058f8f61a2243862bcd8c6a6013c4
Author: Juliusz Sompolski 
Date:   2018-08-29T12:19:10Z

update doc




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21403: [SPARK-24341][SQL] Support only IN subqueries with the s...

2018-07-24 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/21403
  
Looks good to me, though I'm not very familiar with analyzer.
@cloud-fan, @hvanhovell ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21403: [SPARK-24341][WIP][SQL] Support IN subqueries with struc...

2018-05-29 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/21403
  
@mgaido91 BTW: In SPARK-24395 I would consider the cases to still be valid, 
because I believe there is no other syntactic way to do a multi-column IN/NOT 
IN with list of literals.
The question is whether it should be treated as structs, or unpacked?
If like structs, then the current behavior is correct, I think.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21403: [SPARK-24341][WIP][SQL] Support IN subqueries with struc...

2018-05-29 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/21403
  
@mgaido91 This also works, +1.
What about `a in (select (b, c) from ...)` when `a` is a struct? - I guess 
allow it, but a potential gotcha during implementation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21403: [SPARK-24341][WIP][SQL] Support IN subqueries with struc...

2018-05-29 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/21403
  
I think that the way the columns are defined in the subquery should define 
the semantics.
E.g.:
`(a, b) IN (select c, d from ...)` - unpack (a, b) and treat it as a multi 
column comparison as in current semantics.
`(a, b) IN (select (c, d) from ..)` - keep it packed and treat it as a 
single column IN.
`(a, b, c) IN (select (d, e), f from ..)` or similar combinations - catch 
it in analysis as ambiguous
`(a, b, c) IN (select (d, e), f, g from ..)` - but this is valid as long as 
`a` matches the type of `(d, e)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21228: [SPARK-24171] Adding a note for non-deterministic...

2018-05-03 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/21228#discussion_r185792840
  
--- Diff: R/pkg/R/functions.R ---
@@ -3184,6 +3191,7 @@ setMethod("create_map",
 #' collect(select(df2, collect_list(df2$gear)))
 #' collect(select(df2, collect_set(df2$gear)))}
 #' @note collect_list since 2.3.0
+#' @note the function is non-deterministic because its result depends on 
order of rows.
--- End diff --

for collect_list, collect_set maybe word it:
"the function is non-deterministic, because the order of collected results 
depends on order of rows, which may be non-deterministic after a shuffle"


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21228: [SPARK-24171] Adding a note for non-deterministic...

2018-05-03 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/21228#discussion_r185791719
  
--- Diff: R/pkg/R/functions.R ---
@@ -963,6 +964,7 @@ setMethod("kurtosis",
 #' last(df$c, TRUE)
 #' }
 #' @note last since 1.4.0
+#' @note the function is non-deterministic because its result depends on 
order of rows.
--- End diff --

for the first/last maybe word it:
"the function is non-deterministic, because its results depends on order of 
rows, which may be non-deterministic after a shuffle"



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21133: [SPARK-24013][SQL] Remove unneeded compress in Approxima...

2018-04-30 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/21133
  
Maybe we could add the former test as a benchmark to `AggregateBenchmark`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21133: [SPARK-24013][SQL] Remove unneeded compress in Ap...

2018-04-27 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/21133#discussion_r184662359
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala
 ---
@@ -279,4 +282,11 @@ class ApproximatePercentileQuerySuite extends 
QueryTest with SharedSQLContext {
   checkAnswer(query, expected)
 }
   }
+
+  test("SPARK-24013: unneeded compress can cause performance issues with 
sorted input") {
+failAfter(30 seconds) {
+  checkAnswer(sql("select approx_percentile(id, array(0.1)) from 
range(1000)"),
+Row(Array(999160)))
--- End diff --

Ok. Yeah, looking at the other tests in this suite it's definitely fine :-).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21133: [SPARK-24013][SQL] Remove unneeded compress in Ap...

2018-04-27 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/21133#discussion_r184656896
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala
 ---
@@ -279,4 +282,11 @@ class ApproximatePercentileQuerySuite extends 
QueryTest with SharedSQLContext {
   checkAnswer(query, expected)
 }
   }
+
+  test("SPARK-24013: unneeded compress can cause performance issues with 
sorted input") {
+failAfter(30 seconds) {
+  checkAnswer(sql("select approx_percentile(id, array(0.1)) from 
range(1000)"),
+Row(Array(999160)))
--- End diff --

nit:
With the approx nature of the algorithm, could the exact answer not get 
flakty through some small changes in code or config? (like e.g. the split of 
range into tasks, and then different merging of partial aggrs producing 
slightly different results)
maybe just asserting on collect().length == 1 would do?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21133: [SPARK-24013][SQL] Remove unneeded compress in Ap...

2018-04-27 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/21133#discussion_r184654132
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala
 ---
@@ -238,12 +238,6 @@ object ApproximatePercentile {
   summaries = summaries.insert(value)
   // The result of QuantileSummaries.insert is un-compressed
   isCompressed = false
-
-  // Currently, QuantileSummaries ignores the construction parameter 
compressThresHold,
-  // which may cause QuantileSummaries to occupy unbounded memory. We 
have to hack around here
-  // to make sure QuantileSummaries doesn't occupy infinite memory.
-  // TODO: Figure out why QuantileSummaries ignores construction 
parameter compressThresHold
-  if (summaries.sampled.length >= compressThresHoldBufferLength) 
compress()
--- End diff --

Sorry, it's my fault of not reading the description attentively :-).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21171: [SPARK-24104] SQLAppStatusListener overwrites metrics on...

2018-04-26 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/21171
  
cc @gengliangwang @vanzin 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21171: [SPARK-24104] SQLAppStatusListener overwrites met...

2018-04-26 Thread juliuszsompolski
GitHub user juliuszsompolski opened a pull request:

https://github.com/apache/spark/pull/21171

[SPARK-24104] SQLAppStatusListener overwrites metrics onDriverAccumUpdates 
instead of updating them

## What changes were proposed in this pull request?

Event `SparkListenerDriverAccumUpdates` may happen multiple times in a 
query - e.g. every `FileSourceScanExec` and `BroadcastExchangeExec` call 
`postDriverMetricUpdates`.
In Spark 2.2 `SQLListener` updated the map with new values. 
`SQLAppStatusListener` overwrites it.
Unless `update` preserved it in the KV store (dependant on 
`exec.lastWriteTime`), only the metrics from the last operator that does 
`postDriverMetricUpdates` are preserved.

## How was this patch tested?

Unit test added.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/juliuszsompolski/apache-spark SPARK-24104

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21171.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21171


commit 69e09cc770514bdb7964b8552456bf7a83df7588
Author: Juliusz Sompolski <julek@...>
Date:   2018-04-26T17:52:04Z

onDriverAccumUpdates




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21133: [SPARK-24013][SQL] Remove unneeded compress in Ap...

2018-04-26 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/21133#discussion_r184343803
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala
 ---
@@ -279,4 +282,10 @@ class ApproximatePercentileQuerySuite extends 
QueryTest with SharedSQLContext {
   checkAnswer(query, expected)
 }
   }
+
+  test("SPARK-24013: unneeded compress can cause performance issues with 
sorted input") {
+failAfter(20 seconds) {
+  assert(sql("select approx_percentile(id, array(0.1)) from 
range(1000)").count() == 1)
--- End diff --

When you do .count(), column pruning removes the approx_percentile from the 
query, so the test does not execute approx_percentile.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21133: [SPARK-24013][SQL] Remove unneeded compress in Ap...

2018-04-26 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/21133#discussion_r184347998
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala
 ---
@@ -238,12 +238,6 @@ object ApproximatePercentile {
   summaries = summaries.insert(value)
   // The result of QuantileSummaries.insert is un-compressed
   isCompressed = false
-
-  // Currently, QuantileSummaries ignores the construction parameter 
compressThresHold,
-  // which may cause QuantileSummaries to occupy unbounded memory. We 
have to hack around here
-  // to make sure QuantileSummaries doesn't occupy infinite memory.
-  // TODO: Figure out why QuantileSummaries ignores construction 
parameter compressThresHold
-  if (summaries.sampled.length >= compressThresHoldBufferLength) 
compress()
--- End diff --

I tested if this change doesn't cause `compress()` to not be called at all, 
and memory consumption to go ubounded, but it appears to be working good - the 
mem usage through jmap -histo:live when running `sql("select 
approx_percentile(id, array(0.1)) from range(100L)").collect()` remains 
stable.
The compress() is being called from `QuantileSummaries.insert()`, so it 
seems that the above TODO got resolved at some point.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20718: [SPARK-23514][FOLLOW-UP] Remove more places using sparkC...

2018-03-02 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/20718
  
jenkins retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20718: [SPARK-23514][FOLLOW-UP] Remove more places using sparkC...

2018-03-02 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/20718
  
jenkins retest this please
```
org.apache.spark.sql.FileBasedDataSourceSuite.(It is not a test it is a 
sbt.testing.SuiteSelector)

org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed 
to eventually never returned normally. Attempted 15 times over 
10.01441841301 seconds. Last failure message: There are 1 possibly leaked 
file streams..
```
Could be a problem, but I don't see how it's related.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20718: [SPARK-23514][FOLLOW-UP] Remove more places using sparkC...

2018-03-02 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/20718
  
jenkins retest this please
`hudson.plugins.git.GitException: Failed to fetch from 
https://github.com/apache/spark.git`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20718: [SPARK-23514][FOLLOW-UP] Remove more places using sparkC...

2018-03-02 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/20718
  
cc @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20718: [SPARK-23514][FOLLOW-UP] Remove more places using...

2018-03-02 Thread juliuszsompolski
GitHub user juliuszsompolski opened a pull request:

https://github.com/apache/spark/pull/20718

[SPARK-23514][FOLLOW-UP] Remove more places using 
sparkContext.hadoopConfiguration directly

## What changes were proposed in this pull request?

In https://github.com/apache/spark/pull/20679 I missed a few places in SQL 
tests.
For hygiene, they should also use the sessionState interface where possible.

## How was this patch tested?

Modified existing tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/juliuszsompolski/apache-spark 
SPARK-23514-followup

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20718.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20718


commit 358a76ae781016586a892aa077d88f0c27876d76
Author: Juliusz Sompolski <julek@...>
Date:   2018-03-02T17:05:58Z

[SPARK-23514][FOLLOW-UP] Remove more places using 
sparkContext.hadoopConfiguration directly




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20679: [SPARK-23514] Use SessionState.newHadoopConf() to propag...

2018-02-28 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/20679
  
jenkins retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20679: [SPARK-23514] Use SessionState.newHadoopConf() to propag...

2018-02-27 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/20679
  
jenkins retest this please
Flaky: `sbt.ForkMain$ForkError: java.io.IOException: Failed to delete: 
/home/jenkins/workspace/SparkPullRequestBuilder/target/tmp/spark-c0f7c3f9-f48a-4bb8-91f7-e9c710e78b00`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20679: [SPARK-23514] Use SessionState.newHadoopConf() to...

2018-02-27 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/20679#discussion_r170981204
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---
@@ -115,7 +115,9 @@ private[sql] class SessionState(
 private[sql] object SessionState {
   def newHadoopConf(hadoopConf: Configuration, sqlConf: SQLConf): 
Configuration = {
 val newHadoopConf = new Configuration(hadoopConf)
-sqlConf.getAllConfs.foreach { case (k, v) => if (v ne null) 
newHadoopConf.set(k, v) }
+sqlConf.getAllConfs.foreach { case (k, v) =>
+  if (v ne null) newHadoopConf.set(k, v.stripPrefix("spark.hadoop"))
--- End diff --

I just reverted this part, it's not really related to the rest.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20679: [SPARK-23514] Use SessionState.newHadoopConf() to propag...

2018-02-26 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/20679
  
cc @dongjoon-hyun 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20679: [SPARK-23514] Use SessionState.newHadoopConf() to propag...

2018-02-26 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/20679
  
cc @gatorsmile @rxin 
We had a chat whether to implement something that would catch direct 
misuses of sc.hadoopConfiguration in sql module, but it seems that it's not 
very common, so maybe just fixing it where it happened is enough.

@liancheng suggested stripping the "spark.hadoop" prefix to have more 
compatibility with users specifying or not specifying that prefix.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20679: [SPARK-23514] Use SessionState.newHadoopConf() to...

2018-02-26 Thread juliuszsompolski
GitHub user juliuszsompolski opened a pull request:

https://github.com/apache/spark/pull/20679

[SPARK-23514] Use SessionState.newHadoopConf() to propage hadoop configs 
set in SQLConf.

## What changes were proposed in this pull request?

A few places in `spark-sql` were using `sc.hadoopConfiguration` directly. 
They should be using `sessionState.newHadoopConf()` to blend in configs that 
were set through `SQLConf`.

Also, for better UX, for these configs blended in from `SQLConf`, we should 
consider removing the `spark.hadoop` prefix, so that the settings are 
recognized whether or not they were specified by the user.

## How was this patch tested?

Tested that AlterTableRecoverPartitions now correctly recognizes settings 
that are passed in to the FileSystem through SQLConf.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/juliuszsompolski/apache-spark SPARK-23514

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20679.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20679


commit 2c070fcd053acb47d8a8c3214d67e106b5683376
Author: Juliusz Sompolski <julek@...>
Date:   2018-02-26T15:13:23Z

spark-23514




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20624: [SPARK-23445] ColumnStat refactoring

2018-02-26 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/20624#discussion_r170567874
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
 ---
@@ -187,11 +187,11 @@ object StarSchemaDetection extends PredicateHelper {
   stats.rowCount match {
 case Some(rowCount) if rowCount >= 0 =>
   if (stats.attributeStats.nonEmpty && 
stats.attributeStats.contains(col)) {
-val colStats = stats.attributeStats.get(col)
-if (colStats.get.nullCount > 0) {
+val colStats = stats.attributeStats.get(col).get
+if (!colStats.hasCountStats || colStats.nullCount.get > 0) 
{
--- End diff --

`hasCountStats == distinctCount.isDefined && nullCount.isDefined`
So if it passed to the second part of the ||, then `hasCountStats == true 
-> nullCount.isDefined`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20624: [SPARK-23445] ColumnStat refactoring

2018-02-25 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/20624#discussion_r170465836
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 ---
@@ -387,6 +390,101 @@ case class CatalogStatistics(
   }
 }
 
+/**
+ * This class of statistics for a column is used in [[CatalogTable]] to 
interact with metastore.
+ */
+case class CatalogColumnStat(
+  distinctCount: Option[BigInt] = None,
+  min: Option[String] = None,
+  max: Option[String] = None,
+  nullCount: Option[BigInt] = None,
+  avgLen: Option[Long] = None,
+  maxLen: Option[Long] = None,
+  histogram: Option[Histogram] = None) {
+
+  /**
+   * Returns a map from string to string that can be used to serialize the 
column stats.
+   * The key is the name of the column and name of the field (e.g. 
"colName.distinctCount"),
+   * and the value is the string representation for the value.
+   * min/max values are stored as Strings. They can be deserialized using
+   * [[ColumnStat.fromExternalString]].
+   *
+   * As part of the protocol, the returned map always contains a key 
called "version".
+   * In the case min/max values are null (None), they won't appear in the 
map.
+   */
+  def toMap(colName: String): Map[String, String] = {
+val map = new scala.collection.mutable.HashMap[String, String]
+map.put(s"${colName}.${CatalogColumnStat.KEY_VERSION}", "1")
+distinctCount.foreach { v =>
+  map.put(s"${colName}.${CatalogColumnStat.KEY_DISTINCT_COUNT}", 
v.toString)
+}
+nullCount.foreach { v =>
+  map.put(s"${colName}.${CatalogColumnStat.KEY_NULL_COUNT}", 
v.toString)
+}
+avgLen.foreach { v => 
map.put(s"${colName}.${CatalogColumnStat.KEY_AVG_LEN}", v.toString) }
+maxLen.foreach { v => 
map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_LEN}", v.toString) }
+min.foreach { v => 
map.put(s"${colName}.${CatalogColumnStat.KEY_MIN_VALUE}", v) }
+max.foreach { v => 
map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_VALUE}", v) }
+histogram.foreach { h =>
+  map.put(s"${colName}.${CatalogColumnStat.KEY_HISTOGRAM}", 
HistogramSerializer.serialize(h))
+}
+map.toMap
+  }
+
+  /** Convert [[CatalogColumnStat]] to [[ColumnStat]]. */
+  def toPlanStat(
+  colName: String,
+  dataType: DataType): ColumnStat =
+ColumnStat(
+  distinctCount = distinctCount,
+  min = min.map(ColumnStat.fromExternalString(_, colName, dataType)),
+  max = max.map(ColumnStat.fromExternalString(_, colName, dataType)),
+  nullCount = nullCount,
+  avgLen = avgLen,
+  maxLen = maxLen,
+  histogram = histogram)
+}
+
+object CatalogColumnStat extends Logging {
+
+  // List of string keys used to serialize CatalogColumnStat
+  val KEY_VERSION = "version"
+  private val KEY_DISTINCT_COUNT = "distinctCount"
+  private val KEY_MIN_VALUE = "min"
+  private val KEY_MAX_VALUE = "max"
+  private val KEY_NULL_COUNT = "nullCount"
+  private val KEY_AVG_LEN = "avgLen"
+  private val KEY_MAX_LEN = "maxLen"
+  private val KEY_HISTOGRAM = "histogram"
+
+  /**
+   * Creates a [[CatalogColumnStat]] object from the given map.
+   * This is used to deserialize column stats from some external storage.
+   * The serialization side is defined in [[CatalogColumnStat.toMap]].
+   */
+  def fromMap(
+table: String,
+colName: String,
+map: Map[String, String]): Option[CatalogColumnStat] = {
+
+try {
+  Some(CatalogColumnStat(
+distinctCount = map.get(s"${colName}.${KEY_DISTINCT_COUNT}").map(v 
=> BigInt(v.toLong)),
--- End diff --

Added `"verify column stats can be deserialized from tblproperties"` test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20624: [SPARK-23445] ColumnStat refactoring

2018-02-25 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/20624#discussion_r170465726
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 ---
@@ -305,15 +260,15 @@ object ColumnStat extends Logging {
   percentiles: Option[ArrayData]): ColumnStat = {
 // The first 6 fields are basic column stats, the 7th is ndvs for 
histogram bins.
 val cs = ColumnStat(
-  distinctCount = BigInt(row.getLong(0)),
+  distinctCount = Option(BigInt(row.getLong(0))),
--- End diff --

I'd keep it an Option, just to be prepared for more flexibility and more 
optionality, unless you have a strong opinion. (note: this code has moved to 
AnalyzeColumnCommand)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20624: [SPARK-23445] ColumnStat refactoring

2018-02-25 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/20624#discussion_r170465448
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 ---
@@ -387,6 +390,101 @@ case class CatalogStatistics(
   }
 }
 
+/**
+ * This class of statistics for a column is used in [[CatalogTable]] to 
interact with metastore.
+ */
+case class CatalogColumnStat(
+  distinctCount: Option[BigInt] = None,
+  min: Option[String] = None,
+  max: Option[String] = None,
+  nullCount: Option[BigInt] = None,
+  avgLen: Option[Long] = None,
+  maxLen: Option[Long] = None,
+  histogram: Option[Histogram] = None) {
+
+  /**
+   * Returns a map from string to string that can be used to serialize the 
column stats.
+   * The key is the name of the column and name of the field (e.g. 
"colName.distinctCount"),
+   * and the value is the string representation for the value.
+   * min/max values are stored as Strings. They can be deserialized using
+   * [[ColumnStat.fromExternalString]].
--- End diff --

I think that actually everything from ColumnStat object should move.
`fromExternalString` / `toExternalString` -> `CatalogColumnStat`

And also:
`supportsDatatype` / `supportsHistogram` -> `AnalyzeColumnCommand`
`statExprs` / `rowToColumnStat` -> `AnalyzeColumnCommand`
because they are tied to that specific method of stats collection.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20624: [SPARK-23445] ColumnStat refactoring

2018-02-25 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/20624#discussion_r170463362
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 ---
@@ -387,6 +390,101 @@ case class CatalogStatistics(
   }
 }
 
+/**
+ * This class of statistics for a column is used in [[CatalogTable]] to 
interact with metastore.
+ */
+case class CatalogColumnStat(
+  distinctCount: Option[BigInt] = None,
+  min: Option[String] = None,
+  max: Option[String] = None,
+  nullCount: Option[BigInt] = None,
+  avgLen: Option[Long] = None,
+  maxLen: Option[Long] = None,
+  histogram: Option[Histogram] = None) {
+
+  /**
+   * Returns a map from string to string that can be used to serialize the 
column stats.
+   * The key is the name of the column and name of the field (e.g. 
"colName.distinctCount"),
+   * and the value is the string representation for the value.
+   * min/max values are stored as Strings. They can be deserialized using
+   * [[ColumnStat.fromExternalString]].
+   *
+   * As part of the protocol, the returned map always contains a key 
called "version".
+   * In the case min/max values are null (None), they won't appear in the 
map.
+   */
+  def toMap(colName: String): Map[String, String] = {
+val map = new scala.collection.mutable.HashMap[String, String]
+map.put(s"${colName}.${CatalogColumnStat.KEY_VERSION}", "1")
+distinctCount.foreach { v =>
+  map.put(s"${colName}.${CatalogColumnStat.KEY_DISTINCT_COUNT}", 
v.toString)
+}
+nullCount.foreach { v =>
+  map.put(s"${colName}.${CatalogColumnStat.KEY_NULL_COUNT}", 
v.toString)
+}
+avgLen.foreach { v => 
map.put(s"${colName}.${CatalogColumnStat.KEY_AVG_LEN}", v.toString) }
+maxLen.foreach { v => 
map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_LEN}", v.toString) }
+min.foreach { v => 
map.put(s"${colName}.${CatalogColumnStat.KEY_MIN_VALUE}", v) }
+max.foreach { v => 
map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_VALUE}", v) }
+histogram.foreach { h =>
+  map.put(s"${colName}.${CatalogColumnStat.KEY_HISTOGRAM}", 
HistogramSerializer.serialize(h))
+}
+map.toMap
+  }
+
+  /** Convert [[CatalogColumnStat]] to [[ColumnStat]]. */
+  def toPlanStat(
+  colName: String,
+  dataType: DataType): ColumnStat =
+ColumnStat(
+  distinctCount = distinctCount,
+  min = min.map(ColumnStat.fromExternalString(_, colName, dataType)),
+  max = max.map(ColumnStat.fromExternalString(_, colName, dataType)),
+  nullCount = nullCount,
+  avgLen = avgLen,
+  maxLen = maxLen,
+  histogram = histogram)
+}
+
+object CatalogColumnStat extends Logging {
+
+  // List of string keys used to serialize CatalogColumnStat
+  val KEY_VERSION = "version"
+  private val KEY_DISTINCT_COUNT = "distinctCount"
+  private val KEY_MIN_VALUE = "min"
+  private val KEY_MAX_VALUE = "max"
+  private val KEY_NULL_COUNT = "nullCount"
+  private val KEY_AVG_LEN = "avgLen"
+  private val KEY_MAX_LEN = "maxLen"
+  private val KEY_HISTOGRAM = "histogram"
+
+  /**
+   * Creates a [[CatalogColumnStat]] object from the given map.
+   * This is used to deserialize column stats from some external storage.
+   * The serialization side is defined in [[CatalogColumnStat.toMap]].
+   */
+  def fromMap(
+table: String,
+colName: String,
+map: Map[String, String]): Option[CatalogColumnStat] = {
+
+try {
+  Some(CatalogColumnStat(
+distinctCount = map.get(s"${colName}.${KEY_DISTINCT_COUNT}").map(v 
=> BigInt(v.toLong)),
--- End diff --

The format doesn't change.
There is existing test `StatisticsSuite."verify serialized column stats 
after analyzing columns"` that the format of the serialized stats in the 
metastore doesn't change by comparing it to a manual map of properties.
I will add a test that verifies it the other way - adds the properties 
manually as TBLPROPERTIES, and verifies that they are successfully parsed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20624: [SPARK-23445] ColumnStat refactoring

2018-02-25 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/20624#discussion_r170462960
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 ---
@@ -387,6 +390,101 @@ case class CatalogStatistics(
   }
 }
 
+/**
+ * This class of statistics for a column is used in [[CatalogTable]] to 
interact with metastore.
+ */
+case class CatalogColumnStat(
+  distinctCount: Option[BigInt] = None,
+  min: Option[String] = None,
+  max: Option[String] = None,
+  nullCount: Option[BigInt] = None,
+  avgLen: Option[Long] = None,
+  maxLen: Option[Long] = None,
+  histogram: Option[Histogram] = None) {
+
+  /**
+   * Returns a map from string to string that can be used to serialize the 
column stats.
+   * The key is the name of the column and name of the field (e.g. 
"colName.distinctCount"),
+   * and the value is the string representation for the value.
+   * min/max values are stored as Strings. They can be deserialized using
+   * [[ColumnStat.fromExternalString]].
+   *
+   * As part of the protocol, the returned map always contains a key 
called "version".
+   * In the case min/max values are null (None), they won't appear in the 
map.
+   */
+  def toMap(colName: String): Map[String, String] = {
+val map = new scala.collection.mutable.HashMap[String, String]
+map.put(s"${colName}.${CatalogColumnStat.KEY_VERSION}", "1")
+distinctCount.foreach { v =>
+  map.put(s"${colName}.${CatalogColumnStat.KEY_DISTINCT_COUNT}", 
v.toString)
+}
+nullCount.foreach { v =>
+  map.put(s"${colName}.${CatalogColumnStat.KEY_NULL_COUNT}", 
v.toString)
+}
+avgLen.foreach { v => 
map.put(s"${colName}.${CatalogColumnStat.KEY_AVG_LEN}", v.toString) }
+maxLen.foreach { v => 
map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_LEN}", v.toString) }
+min.foreach { v => 
map.put(s"${colName}.${CatalogColumnStat.KEY_MIN_VALUE}", v) }
+max.foreach { v => 
map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_VALUE}", v) }
+histogram.foreach { h =>
+  map.put(s"${colName}.${CatalogColumnStat.KEY_HISTOGRAM}", 
HistogramSerializer.serialize(h))
+}
+map.toMap
+  }
+
+  /** Convert [[CatalogColumnStat]] to [[ColumnStat]]. */
+  def toPlanStat(
--- End diff --

I intentionally made it the same.
`CatalogStatistics.toPlanStat` converts it to `Statistics`. 
`CatalogColumnStat.toPlanStat` converts it to `ColumnStat`. The name signifies 
that it is used to convert both of these objects to their counterparts that are 
used in the query plan.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20624: [SPARK-23445] ColumnStat refactoring

2018-02-22 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/20624#discussion_r170086812
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 ---
@@ -387,6 +390,101 @@ case class CatalogStatistics(
   }
 }
 
+/**
+ * This class of statistics for a column is used in [[CatalogTable]] to 
interact with metastore.
+ */
+case class CatalogColumnStat(
+  distinctCount: Option[BigInt] = None,
+  min: Option[String] = None,
+  max: Option[String] = None,
+  nullCount: Option[BigInt] = None,
+  avgLen: Option[Long] = None,
+  maxLen: Option[Long] = None,
+  histogram: Option[Histogram] = None) {
+
+  /**
+   * Returns a map from string to string that can be used to serialize the 
column stats.
+   * The key is the name of the column and name of the field (e.g. 
"colName.distinctCount"),
+   * and the value is the string representation for the value.
+   * min/max values are stored as Strings. They can be deserialized using
+   * [[ColumnStat.fromExternalString]].
+   *
+   * As part of the protocol, the returned map always contains a key 
called "version".
+   * In the case min/max values are null (None), they won't appear in the 
map.
+   */
+  def toMap(colName: String): Map[String, String] = {
+val map = new scala.collection.mutable.HashMap[String, String]
+map.put(s"${colName}.${CatalogColumnStat.KEY_VERSION}", "1")
+distinctCount.foreach { v =>
+  map.put(s"${colName}.${CatalogColumnStat.KEY_DISTINCT_COUNT}", 
v.toString)
+}
+nullCount.foreach { v =>
+  map.put(s"${colName}.${CatalogColumnStat.KEY_NULL_COUNT}", 
v.toString)
+}
+avgLen.foreach { v => 
map.put(s"${colName}.${CatalogColumnStat.KEY_AVG_LEN}", v.toString) }
+maxLen.foreach { v => 
map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_LEN}", v.toString) }
+min.foreach { v => 
map.put(s"${colName}.${CatalogColumnStat.KEY_MIN_VALUE}", v) }
+max.foreach { v => 
map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_VALUE}", v) }
+histogram.foreach { h =>
+  map.put(s"${colName}.${CatalogColumnStat.KEY_HISTOGRAM}", 
HistogramSerializer.serialize(h))
+}
+map.toMap
+  }
+
+  /** Convert [[CatalogColumnStat]] to [[ColumnStat]]. */
+  def toPlanStat(
+  colName: String,
+  dataType: DataType): ColumnStat =
+ColumnStat(
+  distinctCount = distinctCount,
+  min = min.map(ColumnStat.fromExternalString(_, colName, dataType)),
+  max = max.map(ColumnStat.fromExternalString(_, colName, dataType)),
+  nullCount = nullCount,
+  avgLen = avgLen,
+  maxLen = maxLen,
+  histogram = histogram)
+}
+
+object CatalogColumnStat extends Logging {
+
+  // List of string keys used to serialize CatalogColumnStat
+  val KEY_VERSION = "version"
+  private val KEY_DISTINCT_COUNT = "distinctCount"
+  private val KEY_MIN_VALUE = "min"
+  private val KEY_MAX_VALUE = "max"
+  private val KEY_NULL_COUNT = "nullCount"
+  private val KEY_AVG_LEN = "avgLen"
+  private val KEY_MAX_LEN = "maxLen"
+  private val KEY_HISTOGRAM = "histogram"
+
+  /**
+   * Creates a [[CatalogColumnStat]] object from the given map.
+   * This is used to deserialize column stats from some external storage.
+   * The serialization side is defined in [[CatalogColumnStat.toMap]].
+   */
+  def fromMap(
+table: String,
+colName: String,
+map: Map[String, String]): Option[CatalogColumnStat] = {
+
+try {
+  Some(CatalogColumnStat(
+distinctCount = map.get(s"${colName}.${KEY_DISTINCT_COUNT}").map(v 
=> BigInt(v.toLong)),
--- End diff --

The keys or format of stats in the metastore didn't change. After this 
patch it remains backwards compatible with stats created before.

What changed here is that the `map` passed here used to contain stats for 
just one column, stripped of the columnName prefix, and now I'm passing a `map` 
that has all statistics for all columns, with keys prefixed by columnName.

It reduces complexity in `statsFromProperties`, see 
https://github.com/apache/spark/pull/20624/files#diff-159191585e10542f013cb3a714f26075R1057
It used to create a filtered map for every column, stripping the prefix 
together with column name.
Now it just passes the map of all column's stat properties, and an 
individual column picks up what it needs.

I'll add a bit of doc / comments about that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20624: [SPARK-23445] ColumnStat refactoring

2018-02-15 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/20624
  
cc @gatorsmile @cloud-fan @marmbrus 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20624: [SPARK-23445] ColumnStat refactoring

2018-02-15 Thread juliuszsompolski
GitHub user juliuszsompolski opened a pull request:

https://github.com/apache/spark/pull/20624

[SPARK-23445] ColumnStat refactoring

## What changes were proposed in this pull request?

Refactor ColumnStat to be more flexible.

* Split `ColumnStat` and `CatalogColumnStat` just like `CatalogStatistics` 
is split from `Statistics`. This detaches how the statistics are stored from 
how they are processed in the query plan. `CatalogColumnStat` keeps `min` and 
`max` as `String`, making it not depend on dataType information.
* For `CatalogColumnStat`, parse column names from property names in the 
metastore (`KEY_VERSION` property), not from metastore schema. This means that 
`CatalogColumnStat`s can be created for columns even if the schema itself is 
not stored in the metastore.
* Make all fields optional. `min`, `max` and `histogram` for columns were 
optional already. Having them all optional is more consistent, and gives 
flexibility to e.g. drop some of the fields through transformations if they are 
difficult / impossible to calculate.

The added flexibility will make it possible to have alternative 
implementations for stats, and separates stats collection from stats and 
estimation processing in plans.

## How was this patch tested?

Refactored existing tests to work with refactored `ColumnStat` and 
`CatalogColumnStat`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/juliuszsompolski/apache-spark SPARK-23445

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20624.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20624


commit cf3602075dcee35494c72975e361b739939079b4
Author: Juliusz Sompolski <julek@...>
Date:   2018-01-19T13:57:46Z

column stat refactoring




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...

2018-02-13 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/20555#discussion_r168048937
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -78,9 +79,8 @@
   // whether there is a read ahead task running,
   private boolean isReading;
 
-  // If the remaining data size in the current buffer is below this 
threshold,
-  // we issue an async read from the underlying input stream.
-  private final int readAheadThresholdInBytes;
+  // whether there is a reader waiting for data.
+  private AtomicBoolean isWaiting = new AtomicBoolean(false);
--- End diff --

I'll leave it be - should compile to basically the same, and with using 
`AtomicBoolean` the intent seems more readable to me.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...

2018-02-13 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/20555#discussion_r168048795
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -230,24 +227,32 @@ private void signalAsyncReadComplete() {
 
   private void waitForAsyncReadComplete() throws IOException {
 stateChangeLock.lock();
+isWaiting.set(true);
 try {
-  while (readInProgress) {
+  if (readInProgress) {
--- End diff --

Good catch, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20555: [SPARK-23366] Improve hot reading path in ReadAheadInput...

2018-02-13 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/20555
  
@jiangxb1987 there is ReadAheadInputStreamSuite that extends 
GenericFileInputStreamSuite.
I updated it and added more combination testing with different buffer sizes 
that should exercise more interactions between the wrapped and outer buffers.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...

2018-02-13 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/20555#discussion_r167971273
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -258,54 +263,43 @@ public int read(byte[] b, int offset, int len) throws 
IOException {
 if (len == 0) {
   return 0;
 }
-stateChangeLock.lock();
-try {
-  return readInternal(b, offset, len);
-} finally {
-  stateChangeLock.unlock();
-}
-  }
 
-  /**
-   * flip the active and read ahead buffer
-   */
-  private void swapBuffers() {
-ByteBuffer temp = activeBuffer;
-activeBuffer = readAheadBuffer;
-readAheadBuffer = temp;
-  }
-
-  /**
-   * Internal read function which should be called only from read() api. 
The assumption is that
-   * the stateChangeLock is already acquired in the caller before calling 
this function.
-   */
-  private int readInternal(byte[] b, int offset, int len) throws 
IOException {
-assert (stateChangeLock.isLocked());
 if (!activeBuffer.hasRemaining()) {
-  waitForAsyncReadComplete();
-  if (readAheadBuffer.hasRemaining()) {
-swapBuffers();
-  } else {
-// The first read or activeBuffer is skipped.
-readAsync();
+  // No remaining in active buffer - lock and switch to write ahead 
buffer.
+  stateChangeLock.lock();
+  try {
 waitForAsyncReadComplete();
-if (isEndOfStream()) {
-  return -1;
+if (!readAheadBuffer.hasRemaining()) {
+  // The first read or activeBuffer is skipped.
--- End diff --

skipped using `skip()`.
I moved the comment over from a few lines above, but looking at `skip()` 
now I don't think it can happen - the skip would trigger an `readAsync` read in 
that case.
I'll update the comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...

2018-02-12 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/20555#discussion_r167651037
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -232,7 +229,9 @@ private void waitForAsyncReadComplete() throws 
IOException {
 stateChangeLock.lock();
 try {
   while (readInProgress) {
+isWaiting.set(true);
 asyncReadComplete.await();
+isWaiting.set(false);
--- End diff --

Good catch, I added `isWaiting.set(false)` to the finally branch.
Actually, since the whole implementation assumes that there is only one 
reader, I removed the while() loop, since there is no other reader to race with 
us to trigger another read.

In practice I think not updating `isWaiting` it would have been benign, as 
after the exception the query will be going down with an 
`InterruptedException`, or elsewise anyone upstream handling that exception 
would most probably declare that stream as unusable afterwards anyway.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...

2018-02-12 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/20555#discussion_r167646954
  
--- Diff: core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java 
---
@@ -258,54 +262,43 @@ public int read(byte[] b, int offset, int len) throws 
IOException {
 if (len == 0) {
   return 0;
 }
-stateChangeLock.lock();
-try {
-  return readInternal(b, offset, len);
-} finally {
-  stateChangeLock.unlock();
-}
-  }
-
-  /**
-   * flip the active and read ahead buffer
-   */
-  private void swapBuffers() {
-ByteBuffer temp = activeBuffer;
-activeBuffer = readAheadBuffer;
-readAheadBuffer = temp;
-  }
 
-  /**
-   * Internal read function which should be called only from read() api. 
The assumption is that
-   * the stateChangeLock is already acquired in the caller before calling 
this function.
-   */
-  private int readInternal(byte[] b, int offset, int len) throws 
IOException {
-assert (stateChangeLock.isLocked());
 if (!activeBuffer.hasRemaining()) {
-  waitForAsyncReadComplete();
-  if (readAheadBuffer.hasRemaining()) {
-swapBuffers();
-  } else {
-// The first read or activeBuffer is skipped.
-readAsync();
+  // No remaining in active buffer - lock and switch to write ahead 
buffer.
+  stateChangeLock.lock();
+  try {
 waitForAsyncReadComplete();
-if (isEndOfStream()) {
-  return -1;
+if (!readAheadBuffer.hasRemaining()) {
+  // The first read or activeBuffer is skipped.
+  readAsync();
+  waitForAsyncReadComplete();
+  if (isEndOfStream()) {
+return -1;
+  }
 }
+// Swap the newly read read ahead buffer in place of empty active 
buffer.
--- End diff --

Other existing places in comments in the file use `read ahead`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20555: [SPARK-23366] Improve hot reading path in ReadAheadInput...

2018-02-08 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/20555
  
cc @kiszk @sitalkedia @zsxwing 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20555: [SPARK-23366] Improve hot reading path in ReadAhe...

2018-02-08 Thread juliuszsompolski
GitHub user juliuszsompolski opened a pull request:

https://github.com/apache/spark/pull/20555

[SPARK-23366] Improve hot reading path in ReadAheadInputStream

## What changes were proposed in this pull request?

`ReadAheadInputStream` was introduced in 
https://github.com/apache/spark/pull/18317/ to optimize reading spill files 
from disk.
However, from the profiles it seems that the hot path of reading small 
amounts of data (like readInt) is inefficient - it involves taking locks, and 
multiple checks.

Optimize locking: Lock is not needed when simply accessing the active 
buffer. Only lock when needing to swap buffers or trigger async reading, or get 
information about the async state.

Optimize short-path single byte reads, that are used e.g. by Java library 
DataInputStream.readInt.

The asyncReader used to call "read" only once on the underlying stream, 
that never filled the underlying buffer when it was wrapping an 
LZ4BlockInputStream. If the buffer was returned unfilled, that would trigger 
the async reader to be triggered to fill the read ahead buffer on each call, 
because the reader would see that the active buffer is below the refill 
threshold all the time.

However, filling the full buffer all the time could introduce increased 
latency, so also add an `AtomicBoolean` flag for the async reader to return 
earlier if there is a reader waiting for data.

Remove `readAheadThresholdInBytes` and instead immediately trigger async 
read when switching the buffers. It allows to simplify code paths, especially 
the hot one that then only has to check if there is available data in the 
active buffer, without worrying if it needs to retrigger async read. It seems 
to have positive effect on perf.

## How was this patch tested?

It was noticed as a regression in some workloads after upgrading to Spark 
2.3. 

It was particularly visible on TPCDS Q95 running on instances with fast 
disk (i3 AWS instances).
Running with profiling:
* Spark 2.2 - 5.2-5.3 minutes 9.5% in LZ4BlockInputStream.read
* Spark 2.3 - 6.4-6.6 minutes 31.1% in ReadAheadInputStream.read
* Spark 2.3 + fix - 5.3-5.4 minutes 13.3% in ReadAheadInputStream.read - 
very slightly slower, practically within noise.

We didn't see other regressions, and many workloads in general seem to be 
faster with Spark 2.3 (not investigated if thanks to async readed, or 
unrelated).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/juliuszsompolski/apache-spark SPARK-23366

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20555.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20555


commit 987f15ccb01b6c0351fbfdd49d6930b929c50a74
Author: Juliusz Sompolski <julek@...>
Date:   2018-01-30T20:54:47Z

locking tweak

commit b26ffce6780078dbc38bff658e1ef7e9c56c3dd8
Author: Juliusz Sompolski <julek@...>
Date:   2018-02-01T14:27:09Z

fill the read ahead buffer




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20152: [SPARK-22957] ApproxQuantile breaks if the number of row...

2018-01-04 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/20152
  
If the serialized form change is a problem, that part can probably be 
reverted - it's far less likely that a single compressed stats chunk will 
overflow Int.
The bug I hit was in the global rank counter part, and I changed the other 
part just by reviewing the code around for other places that could conceivably 
use a Long instead of Int.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20152: [SPARK-22957] ApproxQuantile breaks if the number...

2018-01-04 Thread juliuszsompolski
GitHub user juliuszsompolski opened a pull request:

https://github.com/apache/spark/pull/20152

[SPARK-22957] ApproxQuantile breaks if the number of rows exceeds MaxInt

## What changes were proposed in this pull request?

32bit Int was used for row rank.
That overflowed in a dataframe with more than 2B rows.

## How was this patch tested?

Added test, but ignored, as it takes 4 minutes.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/juliuszsompolski/apache-spark SPARK-22957

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20152.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20152


commit 324218b6065f1ad57479d5ee582694826c1309f9
Author: Juliusz Sompolski <julek@...>
Date:   2018-01-04T13:22:49Z

SPARK-22957




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20136: [SPARK-22938] Assert that SQLConf.get is accessed...

2018-01-03 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/20136#discussion_r159408791
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -70,7 +72,7 @@ object SQLConf {
* Default config. Only used when there is no active SparkSession for 
the thread.
* See [[get]] for more information.
*/
-  private val fallbackConf = new ThreadLocal[SQLConf] {
+  private lazy val fallbackConf = new ThreadLocal[SQLConf] {
--- End diff --

When I checked (which was before I moved the assertion from here to SQLConf 
constructor, but it shouldn't matter), not having it as lazy resulted in it 
being instantiated eagerly as a static member of SQLConf object before SparkEnv 
was set and hitting the null on SparkEnv.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20136: [SPARK-22938] Assert that SQLConf.get is accessed only o...

2018-01-02 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/20136
  
cc @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20136: [SPARK-22938] Assert that SQLConf.get is accessed...

2018-01-02 Thread juliuszsompolski
GitHub user juliuszsompolski opened a pull request:

https://github.com/apache/spark/pull/20136

[SPARK-22938] Assert that SQLConf.get is accessed only on the driver.

## What changes were proposed in this pull request?

Assert if code tries to access SQLConf.get on executor.
This can lead to hard to detect bugs, where the executor will read 
fallbackConf, falling back to default config values, ignoring potentially 
changed non-default configs.
If a config is to be passed to executor code, it needs to be read on the 
driver, and passed explicitly.

## How was this patch tested?

Check in existing tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/juliuszsompolski/apache-spark SPARK-22938

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20136.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20136


commit d2b3bc6374cde4f8d4ebeedd7612f51d18f13806
Author: Juliusz Sompolski <julek@...>
Date:   2017-12-14T17:50:23Z

[SPARK-22938] Assert that SQLConf.get is accessed only on the driver.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19923: [SPARK-22721] BytesToBytesMap peak memory not updated.

2017-12-07 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/19923
  
jenkins retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19923: [SPARK-22721] BytesToBytesMap peak memory not updated.

2017-12-07 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/19923
  
Sorry @hvanhovell for not getting it fully right the first time...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19923: [SPARK-22721] BytesToBytesMap peak memory not upd...

2017-12-07 Thread juliuszsompolski
GitHub user juliuszsompolski opened a pull request:

https://github.com/apache/spark/pull/19923

[SPARK-22721] BytesToBytesMap peak memory not updated.

## What changes were proposed in this pull request?

Follow-up to earlier commit.
The peak memory of BytesToBytesMap is not updated in more places - spill() 
and destructiveIterator().

## How was this patch tested?

Manually.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/juliuszsompolski/apache-spark SPARK-22721cd

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19923.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19923


commit 12de708a3ba894b9568be12402f561b767355acc
Author: Juliusz Sompolski <ju...@databricks.com>
Date:   2017-12-07T21:27:56Z

SPARK-22721 cd




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19915: [SPARK-22721] BytesToBytesMap peak memory usage not accu...

2017-12-06 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/19915
  
cc @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19915: [SPARK-22721] BytesToBytesMap peak memory usage n...

2017-12-06 Thread juliuszsompolski
GitHub user juliuszsompolski opened a pull request:

https://github.com/apache/spark/pull/19915

[SPARK-22721] BytesToBytesMap peak memory usage not accurate after reset()

## What changes were proposed in this pull request?

BytesToBytesMap doesn't update peak memory usage before shrinking back to 
initial capacity in reset(), so after a disk spill one never knows what was the 
size of hash table was before spilling.

## How was this patch tested?

Checked manually.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/juliuszsompolski/apache-spark SPARK-22721

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19915.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19915


commit eeb0f31da08e6bf609b8ca6cc6509b949dcbac6e
Author: Juliusz Sompolski <ju...@databricks.com>
Date:   2017-12-06T20:25:35Z

SPARK-22721




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19689: [SPARK-22462][SQL] Make rdd-based actions in Dataset tra...

2017-11-09 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/19689
  
Thanks for the fix @viirya! But I'm not a Spark committer to approve it :-).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19656: [SPARK-22445][SQL] move CodegenContext.copyResult to Cod...

2017-11-06 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/19656
  
`isShouldStopRequired` is currently not respected by most operators (they 
insert `shouldStop()` code regardless of this setting). If you're refactoring 
this, maybe make sure that all places that do `shouldStop()` use it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19181: [SPARK-21907][CORE] oom during spill

2017-10-10 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/19181
  
Looks good to me.
What do you think @hvanhovell ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19386: [SPARK-22161] [SQL] Add Impala-modified TPC-DS queries

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/19386
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141843276
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
 ---
@@ -139,4 +139,44 @@ public int compare(
 }
 assertEquals(dataToSort.length, iterLength);
   }
+
+  @Test
+  public void freeAfterOOM() {
+final TestMemoryManager testMemoryManager = new TestMemoryManager(new 
SparkConf().set("spark.memory.offHeap.enabled", "false"));
+final TaskMemoryManager memoryManager = new TaskMemoryManager(
+testMemoryManager, 0);
+final TestMemoryConsumer consumer = new 
TestMemoryConsumer(memoryManager);
+final MemoryBlock dataPage = memoryManager.allocatePage(2048, 
consumer);
+final Object baseObject = dataPage.getBaseObject();
+// Write the records into the data page:
+long position = dataPage.getBaseOffset();
+
+final HashPartitioner hashPartitioner = new HashPartitioner(4);
+// Use integer comparison for comparing prefixes (which are partition 
ids, in this case)
+final PrefixComparator prefixComparator = PrefixComparators.LONG;
+final RecordComparator recordComparator = new RecordComparator() {
+  @Override
+  public int compare(
+  Object leftBaseObject,
+  long leftBaseOffset,
+  Object rightBaseObject,
+  long rightBaseOffset) {
+return 0;
+  }
+};
+UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, 
memoryManager,
+recordComparator, prefixComparator, 100, shouldUseRadixSort());
+
+testMemoryManager.markExecutionAsOutOfMemoryOnce();
+try {
+  sorter.reset();
+} catch( OutOfMemoryError oom ) {
+  //as expected
+}
+// this currently fails on NPE at 
org.apache.spark.memory.MemoryConsumer.freeArray(MemoryConsumer.java:108)
+sorter.free();
+//simulate a 'back to back' free.
--- End diff --

nit: ws: `// simulate ...`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141843447
  
--- Diff: 
core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala ---
@@ -58,11 +58,15 @@ class TestMemoryManager(conf: SparkConf)
 
   override def maxOffHeapStorageMemory: Long = 0L
 
-  private var oomOnce = false
+  private var conseqOOM = 0
   private var available = Long.MaxValue
 
   def markExecutionAsOutOfMemoryOnce(): Unit = {
-oomOnce = true
+markConseqOOM(1)
+  }
+
+  def markConseqOOM( n : Int) : Unit = {
--- End diff --

nit: markConsequentOOM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141843414
  
--- Diff: 
core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala ---
@@ -58,11 +58,15 @@ class TestMemoryManager(conf: SparkConf)
 
   override def maxOffHeapStorageMemory: Long = 0L
 
-  private var oomOnce = false
+  private var conseqOOM = 0
--- End diff --

nit: conseq -> consequent


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141843193
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
 ---
@@ -139,4 +139,44 @@ public int compare(
 }
 assertEquals(dataToSort.length, iterLength);
   }
+
+  @Test
+  public void freeAfterOOM() {
+final TestMemoryManager testMemoryManager = new TestMemoryManager(new 
SparkConf().set("spark.memory.offHeap.enabled", "false"));
+final TaskMemoryManager memoryManager = new TaskMemoryManager(
+testMemoryManager, 0);
+final TestMemoryConsumer consumer = new 
TestMemoryConsumer(memoryManager);
+final MemoryBlock dataPage = memoryManager.allocatePage(2048, 
consumer);
+final Object baseObject = dataPage.getBaseObject();
+// Write the records into the data page:
+long position = dataPage.getBaseOffset();
+
+final HashPartitioner hashPartitioner = new HashPartitioner(4);
+// Use integer comparison for comparing prefixes (which are partition 
ids, in this case)
+final PrefixComparator prefixComparator = PrefixComparators.LONG;
+final RecordComparator recordComparator = new RecordComparator() {
+  @Override
+  public int compare(
+  Object leftBaseObject,
+  long leftBaseOffset,
+  Object rightBaseObject,
+  long rightBaseOffset) {
+return 0;
+  }
+};
+UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, 
memoryManager,
+recordComparator, prefixComparator, 100, shouldUseRadixSort());
+
+testMemoryManager.markExecutionAsOutOfMemoryOnce();
+try {
+  sorter.reset();
+} catch( OutOfMemoryError oom ) {
+  //as expected
+}
+// this currently fails on NPE at 
org.apache.spark.memory.MemoryConsumer.freeArray(MemoryConsumer.java:108)
--- End diff --

nit: tense: "this currently fails" -> "[SPARK-21907] this failed ..."
At the point when anyone reads it, it will hopefully not fail :-)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141842424
  
--- Diff: 
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 ---
@@ -85,7 +85,7 @@
   private final LinkedList spillWriters = new 
LinkedList<>();
 
   // These variables are reset after spilling:
-  @Nullable private volatile UnsafeInMemorySorter inMemSorter;
+  private @Nullable volatile UnsafeInMemorySorter inMemSorter;
--- End diff --

nit: unnecessary change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141842918
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 ---
@@ -503,6 +511,39 @@ public void testGetIterator() throws Exception {
 verifyIntIterator(sorter.getIterator(279), 279, 300);
   }
 
+  @Test
+  public void testOOMDuringSpill() throws Exception {
+final UnsafeExternalSorter sorter = newSorter();
+for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) {
+  insertNumber(sorter, i);
+}
+// todo: this might actually not be zero if pageSize is somehow 
configured differently,
+// so we actually have to compute the expected spill size according to 
the configured page size
+assertEquals(0,  sorter.getSpillSize());
--- End diff --

If this might actually not be zero, maybe don't test this assertion?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141842522
  
--- Diff: 
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
 ---
@@ -162,14 +162,25 @@ private int getUsableCapacity() {
*/
   public void free() {
 if (consumer != null) {
-  consumer.freeArray(array);
+  if (null != array) {
--- End diff --

nit: RHS literal (array != null)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141842730
  
--- Diff: 
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
 ---
@@ -162,14 +162,25 @@ private int getUsableCapacity() {
*/
   public void free() {
 if (consumer != null) {
-  consumer.freeArray(array);
+  if (null != array) {
+consumer.freeArray(array);
+  }
   array = null;
 }
   }
 
   public void reset() {
 if (consumer != null) {
   consumer.freeArray(array);
+  // this is needed to prevent a 'nested' spill,
--- End diff --

nit: it doesn't prevent a nested spill, it only renders it harmless
remove this line - the rest of the comment is true.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141843914
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 ---
@@ -19,10 +19,18 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.io.PrintWriter;
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.UUID;
 
+import jodd.io.StringOutputStream;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
--- End diff --

nit: I think you don't use most of these imports anymore.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141843227
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
 ---
@@ -139,4 +139,44 @@ public int compare(
 }
 assertEquals(dataToSort.length, iterLength);
   }
+
+  @Test
+  public void freeAfterOOM() {
+final TestMemoryManager testMemoryManager = new TestMemoryManager(new 
SparkConf().set("spark.memory.offHeap.enabled", "false"));
+final TaskMemoryManager memoryManager = new TaskMemoryManager(
+testMemoryManager, 0);
+final TestMemoryConsumer consumer = new 
TestMemoryConsumer(memoryManager);
+final MemoryBlock dataPage = memoryManager.allocatePage(2048, 
consumer);
+final Object baseObject = dataPage.getBaseObject();
+// Write the records into the data page:
+long position = dataPage.getBaseOffset();
+
+final HashPartitioner hashPartitioner = new HashPartitioner(4);
+// Use integer comparison for comparing prefixes (which are partition 
ids, in this case)
+final PrefixComparator prefixComparator = PrefixComparators.LONG;
+final RecordComparator recordComparator = new RecordComparator() {
+  @Override
+  public int compare(
+  Object leftBaseObject,
+  long leftBaseOffset,
+  Object rightBaseObject,
+  long rightBaseOffset) {
+return 0;
+  }
+};
+UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, 
memoryManager,
+recordComparator, prefixComparator, 100, shouldUseRadixSort());
+
+testMemoryManager.markExecutionAsOutOfMemoryOnce();
+try {
+  sorter.reset();
+} catch( OutOfMemoryError oom ) {
+  //as expected
--- End diff --

nit: ws: `// as expected`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141843046
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 ---
@@ -503,6 +511,39 @@ public void testGetIterator() throws Exception {
 verifyIntIterator(sorter.getIterator(279), 279, 300);
   }
 
+  @Test
+  public void testOOMDuringSpill() throws Exception {
+final UnsafeExternalSorter sorter = newSorter();
+for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) {
+  insertNumber(sorter, i);
+}
+// todo: this might actually not be zero if pageSize is somehow 
configured differently,
+// so we actually have to compute the expected spill size according to 
the configured page size
+assertEquals(0,  sorter.getSpillSize());
+// we expect the next insert to attempt growing the pointerssArray
+// first allocation is expected to fail, then a spill is triggered 
which attempts another allocation
+// which also fails and we expect to see this OOM here.
+// the original code messed with a released array within the spill code
+// and ended up with a failed assertion.
+// we also expect the location of the OOM to be 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset
+memoryManager.markConseqOOM(2);
+OutOfMemoryError expectedOOM = null;
+try {
+  insertNumber(sorter, 1024);
+}
+// we expect an OutOfMemoryError here, anything else (i.e the original 
NPE is a failure)
+catch( OutOfMemoryError oom ){
+  expectedOOM = oom;
+}
+
+assertNotNull("expected OutOfMmoryError but it seems operation 
surprisingly succeeded"
+,expectedOOM);
+String oomStackTrace = Utils.exceptionString(expectedOOM);
+assertThat("expected OutOfMemoryError in 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset"
+, oomStackTrace
+, 
Matchers.containsString("org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset"));
--- End diff --

nit: move commas to end of line (3x)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141842948
  
--- Diff: 
core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
 ---
@@ -503,6 +511,39 @@ public void testGetIterator() throws Exception {
 verifyIntIterator(sorter.getIterator(279), 279, 300);
   }
 
+  @Test
+  public void testOOMDuringSpill() throws Exception {
+final UnsafeExternalSorter sorter = newSorter();
+for (int i = 0; sorter.hasSpaceForAnotherRecord(); ++i) {
+  insertNumber(sorter, i);
+}
+// todo: this might actually not be zero if pageSize is somehow 
configured differently,
+// so we actually have to compute the expected spill size according to 
the configured page size
+assertEquals(0,  sorter.getSpillSize());
+// we expect the next insert to attempt growing the pointerssArray
+// first allocation is expected to fail, then a spill is triggered 
which attempts another allocation
+// which also fails and we expect to see this OOM here.
+// the original code messed with a released array within the spill code
+// and ended up with a failed assertion.
+// we also expect the location of the OOM to be 
org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset
+memoryManager.markConseqOOM(2);
+OutOfMemoryError expectedOOM = null;
+try {
+  insertNumber(sorter, 1024);
+}
+// we expect an OutOfMemoryError here, anything else (i.e the original 
NPE is a failure)
+catch( OutOfMemoryError oom ){
--- End diff --

nit: ws: catch (OutOfMemoryError oom) {


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19181: [SPARK-21907][CORE] oom during spill

2017-09-29 Thread juliuszsompolski
Github user juliuszsompolski commented on a diff in the pull request:

https://github.com/apache/spark/pull/19181#discussion_r141843494
  
--- Diff: 
core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala ---
@@ -58,11 +58,15 @@ class TestMemoryManager(conf: SparkConf)
 
   override def maxOffHeapStorageMemory: Long = 0L
 
-  private var oomOnce = false
+  private var conseqOOM = 0
   private var available = Long.MaxValue
 
   def markExecutionAsOutOfMemoryOnce(): Unit = {
-oomOnce = true
+markConseqOOM(1)
+  }
+
+  def markConseqOOM( n : Int) : Unit = {
--- End diff --

nit: ws: `(n: Int): Unit`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19353: [SPARK-22103][FOLLOWUP] Rename addExtraCode to addInnerC...

2017-09-26 Thread juliuszsompolski
Github user juliuszsompolski commented on the issue:

https://github.com/apache/spark/pull/19353
  
jenkins retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >