[jira] [Created] (SPARK-18534) Datasets Aggregation with Maps

2016-11-21 Thread Anton Okolnychyi (JIRA)
Anton Okolnychyi created SPARK-18534:


 Summary: Datasets Aggregation with Maps
 Key: SPARK-18534
 URL: https://issues.apache.org/jira/browse/SPARK-18534
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.6.3
Reporter: Anton Okolnychyi


There is a problem with user-defined aggregations in the Dataset API in Spark 
1.6.3, while the identical code works fine in Spark 2.0. 

The problem appears only if {{ExpressionEncoder()}} is used for Maps. The same 
code with a Kryo-based alternative produces a correct result. If the encoder 
for a map is defined with the help of {{ExpressionEncoder()}}, Spark is not 
capable of reading the reduced values in the merge phase of the considered 
aggregation.

Code to reproduce:
{code}
  case class TestStopPoint(line: String, sequenceNumber: Int, id: String)

  // Does not work with ExpressionEncoder() and produces an empty map as a 
result
  implicit val intStringMapEncoder: Encoder[Map[Int, String]] = 
ExpressionEncoder()
  // Will work if a Kryo-based encoder is used
  // implicit val intStringMapEncoder: Encoder[Map[Int, String]] = 
org.apache.spark.sql.Encoders.kryo[Map[Int, String]]

  val sparkConf = new SparkConf()
.setAppName("DS Spark 1.6 Test")
.setMaster("local[4]")
  val sparkContext = new SparkContext(sparkConf)
  val sparkSqlContext = new SQLContext(sparkContext)

  import sparkSqlContext.implicits._

  val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33", 2, 
"id#2")).toDS()

  val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int, String], 
Map[Int, String]] {
override def zero = Map[Int, String]()
override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = {
  map.updated(stopPoint.sequenceNumber, stopPoint.id)
}
override def merge(map: Map[Int, String], anotherMap: Map[Int, String]) = {
  map ++ anotherMap
}
override def finish(reduction: Map[Int, String]) = reduction
  }.toColumn

  val resultMap = stopPointDS
.groupBy(_.line)
.agg(stopPointSequenceMap)
.collect()
.toMap
{code}

The code above produces an empty map as a result if the Map encoder is defined 
as {{ExpressionEncoder()}}. The Kryo-based encoder works fine (commented in the 
code).

A preliminary investigation was done to find out possible reasons for this 
behavior. I am not a Spark expert but hope it will help. 

The Physical Plan looks like:
{noformat}
== Physical Plan ==
SortBasedAggregate(key=[value#55], 
functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Final,isDistinct=false)], 
output=[value#55,anon$1(line,sequenceNumber,id)#64])
+- ConvertToSafe
   +- Sort [value#55 ASC], false, 0
  +- TungstenExchange hashpartitioning(value#55,1), None
 +- ConvertToUnsafe
+- SortBasedAggregate(key=[value#55], 
functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Partial,isDistinct=false)],
 output=[value#55,value#60])
   +- ConvertToSafe
  +- Sort [value#55 ASC], false, 0
 +- !AppendColumns , class[line[0]: string, 
sequenceNumber[0]: int, id[0]: string], class[value[0]: string], [value#55]
+- ConvertToUnsafe
   +- LocalTableScan [line#4,sequenceNumber#5,id#6], 
[[0,22,1,280004,,31236469],[0,22,2,280004,,32236469]]
{noformat}

Everything including the first (from bottom) {{SortBasedAggregate}} step is 
handled correctly. In particular, I see that each row updates the mutable 
aggregation buffer correctly in the {{update()}} method of the 
{{org.apache.spark.sql.execution.aggregate.TypedAggregateExpression}} class. In 
my view, the problem appears in the {{ConvertToUnsafe}} step directly after the 
first {{SortBasedAggregate}}. If I take a look at the 
{{org.apache.spark.sql.execution.ConvertToUnsafe}} class, I can see that the 
first {{SortBasedAggregate}} returns a map with 2 elements (I call 
{{child.execute().collect()(0).getMap(1)}} in {{doExecute()}} of 
{{ConvertToUnsafe}} to see this). However, if I examine the output of this 
{{ConvertToUnsafe}} in the same way as its input, I see that the result map 
does not contain any elements. As a consequence, Spark operates on two empty 
maps in the {{merge()}} method of the {{TypedAggregateExpression}} class.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18534) Datasets Aggregation with Maps

2016-12-04 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15720504#comment-15720504
 ] 

Anton Okolnychyi commented on SPARK-18534:
--

I have done a small investigation to see what is going on under the hood. 

I started by looking at the code that was generated by Spark for the second 
{{ConvertToUnsafe}} operation (right after the first {{SortBasedAggregate}}). 
Once I understood each step there, I came to the conclusion that is fully 
correct (I can present the generated code and the structure of the result 
{{UnsafeRow}}). However, the problem from the ticket description was present: 
the result {{UnsafeRow}} contained a corrupted Map. I took a detailed look at 
the incoming {{InternalRow}}, which contained the {{UnsafeMapData}} that was 
written without any modifications to the underlying buffer of the result 
{{UnsafeRow}}, and it was corrupted as well. Interestingly, only the keys were 
corrupted, the values in the incoming InternalRow could be retrieved correctly. 
Therefore, the problem appeared before this step and the second 
{{ConvertToUnsafe}} operation was done correctly. 

Then I decided to look at previous steps. In my view, the problem appears in 
the {{next()}} method of {{SortBasedAggregationIterator}}. Here is the code for 
it:

{code} 
  override final def next(): InternalRow = {
if (hasNext) {
  // Process the current group.
  processCurrentSortedGroup()
  // Generate output row for the current group.
  val outputRow = generateOutput(currentGroupingKey, 
sortBasedAggregationBuffer)
  // Initialize buffer values for the next group.
  initializeBuffer(sortBasedAggregationBuffer)
  numOutputRows += 1
  outputRow
} else {
  // no more result
  throw new NoSuchElementException
}
  }
{code}

The outputRow contains correct results (I can retrieve the keys correctly) but 
only before the {{initializeBuffer(sortBasedAggregationBuffer)}} invocation. If 
I change this method slightly, I get the expected results. 

{code}
  override final def next(): InternalRow = {
if (hasNext) {
  // INITIALIZE BEFORE
  initializeBuffer(sortBasedAggregationBuffer)
  // Process the current group.
  processCurrentSortedGroup()
  // Generate output row for the current group.
  val outputRow = generateOutput(currentGroupingKey, 
sortBasedAggregationBuffer)
  numOutputRows += 1
  outputRow
} else {
  // no more result
  throw new NoSuchElementException
}
  }
{code}

> Datasets Aggregation with Maps
> --
>
> Key: SPARK-18534
> URL: https://issues.apache.org/jira/browse/SPARK-18534
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.2, 1.6.3
>Reporter: Anton Okolnychyi
>
> There is a problem with user-defined aggregations in the Dataset API in Spark 
> 1.6.3, while the identical code works fine in Spark 2.0. 
> The problem appears only if {{ExpressionEncoder()}} is used for Maps. The 
> same code with a Kryo-based alternative produces a correct result. If the 
> encoder for a map is defined with the help of {{ExpressionEncoder()}}, Spark 
> is not capable of reading the reduced values in the merge phase of the 
> considered aggregation.
> Code to reproduce:
> {code}
>   case class TestStopPoint(line: String, sequenceNumber: Int, id: String)
>   // Does not work with ExpressionEncoder() and produces an empty map as a 
> result
>   implicit val intStringMapEncoder: Encoder[Map[Int, String]] = 
> ExpressionEncoder()
>   // Will work if a Kryo-based encoder is used
>   // implicit val intStringMapEncoder: Encoder[Map[Int, String]] = 
> org.apache.spark.sql.Encoders.kryo[Map[Int, String]]
>   val sparkConf = new SparkConf()
> .setAppName("DS Spark 1.6 Test")
> .setMaster("local[4]")
>   val sparkContext = new SparkContext(sparkConf)
>   val sparkSqlContext = new SQLContext(sparkContext)
>   import sparkSqlContext.implicits._
>   val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33", 
> 2, "id#2")).toDS()
>   val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int, String], 
> Map[Int, String]] {
> override def zero = Map[Int, String]()
> override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = {
>   map.updated(stopPoint.sequenceNumber, stopPoint.id)
> }
> override def merge(map: Map[Int, String], anotherMap: Map[Int, String]) = 
> {
>   map ++ anotherMap
> }
> override def finish(reduction: Map[Int, String]) = reduction
>   }.toColumn
>   val resultMap = stopPointDS
> .groupBy(_.line)
> .agg(stopPointSequenceMap)
> .collect()
> .toMap
> {code}
> The code above produces an empty map as a result if the Map encoder is 
> defined as {{ExpressionEncoder()}}. The Kryo-based en

[jira] [Updated] (SPARK-18534) Datasets Aggregation with Maps

2016-12-04 Thread Anton Okolnychyi (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-18534:
-
Affects Version/s: 1.6.2

> Datasets Aggregation with Maps
> --
>
> Key: SPARK-18534
> URL: https://issues.apache.org/jira/browse/SPARK-18534
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.2, 1.6.3
>Reporter: Anton Okolnychyi
>
> There is a problem with user-defined aggregations in the Dataset API in Spark 
> 1.6.3, while the identical code works fine in Spark 2.0. 
> The problem appears only if {{ExpressionEncoder()}} is used for Maps. The 
> same code with a Kryo-based alternative produces a correct result. If the 
> encoder for a map is defined with the help of {{ExpressionEncoder()}}, Spark 
> is not capable of reading the reduced values in the merge phase of the 
> considered aggregation.
> Code to reproduce:
> {code}
>   case class TestStopPoint(line: String, sequenceNumber: Int, id: String)
>   // Does not work with ExpressionEncoder() and produces an empty map as a 
> result
>   implicit val intStringMapEncoder: Encoder[Map[Int, String]] = 
> ExpressionEncoder()
>   // Will work if a Kryo-based encoder is used
>   // implicit val intStringMapEncoder: Encoder[Map[Int, String]] = 
> org.apache.spark.sql.Encoders.kryo[Map[Int, String]]
>   val sparkConf = new SparkConf()
> .setAppName("DS Spark 1.6 Test")
> .setMaster("local[4]")
>   val sparkContext = new SparkContext(sparkConf)
>   val sparkSqlContext = new SQLContext(sparkContext)
>   import sparkSqlContext.implicits._
>   val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33", 
> 2, "id#2")).toDS()
>   val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int, String], 
> Map[Int, String]] {
> override def zero = Map[Int, String]()
> override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = {
>   map.updated(stopPoint.sequenceNumber, stopPoint.id)
> }
> override def merge(map: Map[Int, String], anotherMap: Map[Int, String]) = 
> {
>   map ++ anotherMap
> }
> override def finish(reduction: Map[Int, String]) = reduction
>   }.toColumn
>   val resultMap = stopPointDS
> .groupBy(_.line)
> .agg(stopPointSequenceMap)
> .collect()
> .toMap
> {code}
> The code above produces an empty map as a result if the Map encoder is 
> defined as {{ExpressionEncoder()}}. The Kryo-based encoder works fine 
> (commented in the code).
> A preliminary investigation was done to find out possible reasons for this 
> behavior. I am not a Spark expert but hope it will help. 
> The Physical Plan looks like:
> {noformat}
> == Physical Plan ==
> SortBasedAggregate(key=[value#55], 
> functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Final,isDistinct=false)],
>  output=[value#55,anon$1(line,sequenceNumber,id)#64])
> +- ConvertToSafe
>+- Sort [value#55 ASC], false, 0
>   +- TungstenExchange hashpartitioning(value#55,1), None
>  +- ConvertToUnsafe
> +- SortBasedAggregate(key=[value#55], 
> functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Partial,isDistinct=false)],
>  output=[value#55,value#60])
>+- ConvertToSafe
>   +- Sort [value#55 ASC], false, 0
>  +- !AppendColumns , class[line[0]: string, 
> sequenceNumber[0]: int, id[0]: string], class[value[0]: string], [value#55]
> +- ConvertToUnsafe
>+- LocalTableScan [line#4,sequenceNumber#5,id#6], 
> [[0,22,1,280004,,31236469],[0,22,2,280004,,32236469]]
> {noformat}
> Everything including the first (from bottom) {{SortBasedAggregate}} step is 
> handled correctly. In particular, I see that each row updates the mutable 
> aggregation buffer correctly in the {{update()}} method of the 
> {{org.apache.spark.sql.execution.aggregate.TypedAggregateExpression}} class. 
> In my view, the problem appears in the {{ConvertToUnsafe}} step directly 
> after the first {{SortBasedAggregate}}. If I take a look at the 
> {{org.apache.spark.sql.execution.ConvertToUnsafe}} class, I can see that the 
> first {{SortBasedAggregate}} returns a map with 2 elements (I call 
> {{child.execute().collect()(0).getMap(1)}} in {{doExecute()}} of 
> {{ConvertToUnsafe}} to see this). However, if I examine the output of this 
> {{ConvertToUnsafe}} in the same way as its input, I see that the result map 
> does not contain any elements. As a consequence, Spark operates on two empty 
> maps in the {{merge()}} method of the {{TypedAggregateExpression}} class.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-

[jira] [Commented] (SPARK-18534) Datasets Aggregation with Maps

2016-12-04 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15720522#comment-15720522
 ] 

Anton Okolnychyi commented on SPARK-18534:
--

[~yhuai], it seems that you should be an expert in this area. Could you, 
please, take a look whenever you have free time? I can be completely wrong. 

> Datasets Aggregation with Maps
> --
>
> Key: SPARK-18534
> URL: https://issues.apache.org/jira/browse/SPARK-18534
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 1.6.2, 1.6.3
>Reporter: Anton Okolnychyi
>
> There is a problem with user-defined aggregations in the Dataset API in Spark 
> 1.6.3, while the identical code works fine in Spark 2.0. 
> The problem appears only if {{ExpressionEncoder()}} is used for Maps. The 
> same code with a Kryo-based alternative produces a correct result. If the 
> encoder for a map is defined with the help of {{ExpressionEncoder()}}, Spark 
> is not capable of reading the reduced values in the merge phase of the 
> considered aggregation.
> Code to reproduce:
> {code}
>   case class TestStopPoint(line: String, sequenceNumber: Int, id: String)
>   // Does not work with ExpressionEncoder() and produces an empty map as a 
> result
>   implicit val intStringMapEncoder: Encoder[Map[Int, String]] = 
> ExpressionEncoder()
>   // Will work if a Kryo-based encoder is used
>   // implicit val intStringMapEncoder: Encoder[Map[Int, String]] = 
> org.apache.spark.sql.Encoders.kryo[Map[Int, String]]
>   val sparkConf = new SparkConf()
> .setAppName("DS Spark 1.6 Test")
> .setMaster("local[4]")
>   val sparkContext = new SparkContext(sparkConf)
>   val sparkSqlContext = new SQLContext(sparkContext)
>   import sparkSqlContext.implicits._
>   val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33", 
> 2, "id#2")).toDS()
>   val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int, String], 
> Map[Int, String]] {
> override def zero = Map[Int, String]()
> override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = {
>   map.updated(stopPoint.sequenceNumber, stopPoint.id)
> }
> override def merge(map: Map[Int, String], anotherMap: Map[Int, String]) = 
> {
>   map ++ anotherMap
> }
> override def finish(reduction: Map[Int, String]) = reduction
>   }.toColumn
>   val resultMap = stopPointDS
> .groupBy(_.line)
> .agg(stopPointSequenceMap)
> .collect()
> .toMap
> {code}
> The code above produces an empty map as a result if the Map encoder is 
> defined as {{ExpressionEncoder()}}. The Kryo-based encoder works fine 
> (commented in the code).
> A preliminary investigation was done to find out possible reasons for this 
> behavior. I am not a Spark expert but hope it will help. 
> The Physical Plan looks like:
> {noformat}
> == Physical Plan ==
> SortBasedAggregate(key=[value#55], 
> functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Final,isDistinct=false)],
>  output=[value#55,anon$1(line,sequenceNumber,id)#64])
> +- ConvertToSafe
>+- Sort [value#55 ASC], false, 0
>   +- TungstenExchange hashpartitioning(value#55,1), None
>  +- ConvertToUnsafe
> +- SortBasedAggregate(key=[value#55], 
> functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Partial,isDistinct=false)],
>  output=[value#55,value#60])
>+- ConvertToSafe
>   +- Sort [value#55 ASC], false, 0
>  +- !AppendColumns , class[line[0]: string, 
> sequenceNumber[0]: int, id[0]: string], class[value[0]: string], [value#55]
> +- ConvertToUnsafe
>+- LocalTableScan [line#4,sequenceNumber#5,id#6], 
> [[0,22,1,280004,,31236469],[0,22,2,280004,,32236469]]
> {noformat}
> Everything including the first (from bottom) {{SortBasedAggregate}} step is 
> handled correctly. In particular, I see that each row updates the mutable 
> aggregation buffer correctly in the {{update()}} method of the 
> {{org.apache.spark.sql.execution.aggregate.TypedAggregateExpression}} class. 
> In my view, the problem appears in the {{ConvertToUnsafe}} step directly 
> after the first {{SortBasedAggregate}}. If I take a look at the 
> {{org.apache.spark.sql.execution.ConvertToUnsafe}} class, I can see that the 
> first {{SortBasedAggregate}} returns a map with 2 elements (I call 
> {{child.execute().collect()(0).getMap(1)}} in {{doExecute()}} of 
> {{ConvertToUnsafe}} to see this). However, if I examine the output of this 
> {{ConvertToUnsafe}} in the same way as its input, I see that the result map 
> does not contain any elements. As a consequence, Spark operates on two empty 
> maps in the {{merge()}} method of the {{TypedAggregateExpression}} class.



--
This message was sent by Atlassi

[jira] [Updated] (SPARK-18534) Datasets Aggregation with Maps

2016-12-04 Thread Anton Okolnychyi (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-18534:
-
Description: 
There is a problem with user-defined aggregations in the Dataset API in Spark 
1.6.3, while the identical code works fine in Spark 2.0. 

The problem appears only if {{ExpressionEncoder()}} is used for Maps. The same 
code with a Kryo-based alternative produces a correct result. If the encoder 
for a map is defined with the help of {{ExpressionEncoder()}}, Spark is not 
capable of reading the reduced values in the merge phase of the considered 
aggregation.

Code to reproduce:
{code}
  case class TestStopPoint(line: String, sequenceNumber: Int, id: String)

  // Does not work with ExpressionEncoder() and produces an empty map as a 
result
  implicit val intStringMapEncoder: Encoder[Map[Int, String]] = 
ExpressionEncoder()
  // Will work if a Kryo-based encoder is used
  // implicit val intStringMapEncoder: Encoder[Map[Int, String]] = 
org.apache.spark.sql.Encoders.kryo[Map[Int, String]]

  val sparkConf = new SparkConf()
.setAppName("DS Spark 1.6 Test")
.setMaster("local[4]")
  val sparkContext = new SparkContext(sparkConf)
  val sparkSqlContext = new SQLContext(sparkContext)

  import sparkSqlContext.implicits._

  val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33", 2, 
"id#2")).toDS()

  val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int, String], 
Map[Int, String]] {
override def zero = Map[Int, String]()
override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = {
  map.updated(stopPoint.sequenceNumber, stopPoint.id)
}
override def merge(map: Map[Int, String], anotherMap: Map[Int, String]) = {
  map ++ anotherMap
}
override def finish(reduction: Map[Int, String]) = reduction
  }.toColumn

  val resultMap = stopPointDS
.groupBy(_.line)
.agg(stopPointSequenceMap)
.collect()
.toMap
{code}

The code above produces an empty map as a result if the Map encoder is defined 
as {{ExpressionEncoder()}}. The Kryo-based encoder works fine (commented in the 
code).

A preliminary investigation was done to find out possible reasons for this 
behavior. I am not a Spark expert but hope it will help. 

The Physical Plan looks like:
{noformat}
== Physical Plan ==
SortBasedAggregate(key=[value#55], 
functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Final,isDistinct=false)], 
output=[value#55,anon$1(line,sequenceNumber,id)#64])
+- ConvertToSafe
   +- Sort [value#55 ASC], false, 0
  +- TungstenExchange hashpartitioning(value#55,1), None
 +- ConvertToUnsafe
+- SortBasedAggregate(key=[value#55], 
functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Partial,isDistinct=false)],
 output=[value#55,value#60])
   +- ConvertToSafe
  +- Sort [value#55 ASC], false, 0
 +- !AppendColumns , class[line[0]: string, 
sequenceNumber[0]: int, id[0]: string], class[value[0]: string], [value#55]
+- ConvertToUnsafe
   +- LocalTableScan [line#4,sequenceNumber#5,id#6], 
[[0,22,1,280004,,31236469],[0,22,2,280004,,32236469]]
{noformat}

*only the initial assumptiom, which is partially correct* 
 
Everything including the first (from bottom) {{SortBasedAggregate}} step is 
handled correctly. In particular, I see that each row updates the mutable 
aggregation buffer correctly in the {{update()}} method of the 
{{org.apache.spark.sql.execution.aggregate.TypedAggregateExpression}} class. In 
my view, the problem appears in the {{ConvertToUnsafe}} step directly after the 
first {{SortBasedAggregate}}. If I take a look at the 
{{org.apache.spark.sql.execution.ConvertToUnsafe}} class, I can see that the 
first {{SortBasedAggregate}} returns a map with 2 elements (I call 
{{child.execute().collect()(0).getMap(1)}} in {{doExecute()}} of 
{{ConvertToUnsafe}} to see this). However, if I examine the output of this 
{{ConvertToUnsafe}} in the same way as its input, I see that the result map 
does not contain any elements. As a consequence, Spark operates on two empty 
maps in the {{merge()}} method of the {{TypedAggregateExpression}} class.

  was:
There is a problem with user-defined aggregations in the Dataset API in Spark 
1.6.3, while the identical code works fine in Spark 2.0. 

The problem appears only if {{ExpressionEncoder()}} is used for Maps. The same 
code with a Kryo-based alternative produces a correct result. If the encoder 
for a map is defined with the help of {{ExpressionEncoder()}}, Spark is not 
capable of reading the reduced values in the merge phase of the considered 
aggregation.

Code to reproduce:
{code}
  case class TestStopPoint(line: String, sequenceNumber: Int, id: String)

  // Does not work with ExpressionEncoder() and produces an empty map as a 
result
  im

[jira] [Updated] (SPARK-18534) Datasets Aggregation with Maps

2016-12-04 Thread Anton Okolnychyi (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-18534:
-
Description: 
There is a problem with user-defined aggregations in the Dataset API in Spark 
1.6.3, while the identical code works fine in Spark 2.0. 

The problem appears only if {{ExpressionEncoder()}} is used for Maps. The same 
code with a Kryo-based alternative produces a correct result. If the encoder 
for a map is defined with the help of {{ExpressionEncoder()}}, Spark is not 
capable of reading the reduced values in the merge phase of the considered 
aggregation.

Code to reproduce:
{code}
  case class TestStopPoint(line: String, sequenceNumber: Int, id: String)

  // Does not work with ExpressionEncoder() and produces an empty map as a 
result
  implicit val intStringMapEncoder: Encoder[Map[Int, String]] = 
ExpressionEncoder()
  // Will work if a Kryo-based encoder is used
  // implicit val intStringMapEncoder: Encoder[Map[Int, String]] = 
org.apache.spark.sql.Encoders.kryo[Map[Int, String]]

  val sparkConf = new SparkConf()
.setAppName("DS Spark 1.6 Test")
.setMaster("local[4]")
  val sparkContext = new SparkContext(sparkConf)
  val sparkSqlContext = new SQLContext(sparkContext)

  import sparkSqlContext.implicits._

  val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33", 2, 
"id#2")).toDS()

  val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int, String], 
Map[Int, String]] {
override def zero = Map[Int, String]()
override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = {
  map.updated(stopPoint.sequenceNumber, stopPoint.id)
}
override def merge(map: Map[Int, String], anotherMap: Map[Int, String]) = {
  map ++ anotherMap
}
override def finish(reduction: Map[Int, String]) = reduction
  }.toColumn

  val resultMap = stopPointDS
.groupBy(_.line)
.agg(stopPointSequenceMap)
.collect()
.toMap
{code}

The code above produces an empty map as a result if the Map encoder is defined 
as {{ExpressionEncoder()}}. The Kryo-based encoder works fine (commented in the 
code).

A preliminary investigation was done to find out possible reasons for this 
behavior. I am not a Spark expert but hope it will help. 

The Physical Plan looks like:
{noformat}
== Physical Plan ==
SortBasedAggregate(key=[value#55], 
functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Final,isDistinct=false)], 
output=[value#55,anon$1(line,sequenceNumber,id)#64])
+- ConvertToSafe
   +- Sort [value#55 ASC], false, 0
  +- TungstenExchange hashpartitioning(value#55,1), None
 +- ConvertToUnsafe
+- SortBasedAggregate(key=[value#55], 
functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Partial,isDistinct=false)],
 output=[value#55,value#60])
   +- ConvertToSafe
  +- Sort [value#55 ASC], false, 0
 +- !AppendColumns , class[line[0]: string, 
sequenceNumber[0]: int, id[0]: string], class[value[0]: string], [value#55]
+- ConvertToUnsafe
   +- LocalTableScan [line#4,sequenceNumber#5,id#6], 
[[0,22,1,280004,,31236469],[0,22,2,280004,,32236469]]
{noformat}
 
Everything untill the first (from bottom) {{SortBasedAggregate}} step and part 
of it is handled correctly. In particular, I see that each row correctly 
updates the mutable aggregation buffer in the {{update()}} method of the 
{{TypedAggregateExpression}} class. My initial idea was that the problem 
appeared in the {{ConvertToUnsafe}} step directly after the first 
{{SortBasedAggregate}}. If I take a look at the {{ConvertToUnsafe}} class, I 
can see that the first {{SortBasedAggregate}} returns a map with 2 elements (I 
call {{child.execute().collect()(0).getMap(1)}} in {{doExecute()}} of 
{{ConvertToUnsafe}} to see this). However, if I examine the output of this 
{{ConvertToUnsafe}} in the same way as its input, I see that the result map 
does not contain any elements. As a consequence, Spark operates on two empty 
maps in the {{merge()}} method of the {{TypedAggregateExpression}} class. 
However, my assumption was only partially correct. I did a more detailed 
investigation and its outcomes are described in comments.

  was:
There is a problem with user-defined aggregations in the Dataset API in Spark 
1.6.3, while the identical code works fine in Spark 2.0. 

The problem appears only if {{ExpressionEncoder()}} is used for Maps. The same 
code with a Kryo-based alternative produces a correct result. If the encoder 
for a map is defined with the help of {{ExpressionEncoder()}}, Spark is not 
capable of reading the reduced values in the merge phase of the considered 
aggregation.

Code to reproduce:
{code}
  case class TestStopPoint(line: String, sequenceNumber: Int, id: String)

  // Does not work with ExpressionEncoder() and produces an e

[jira] [Updated] (SPARK-18534) Datasets Aggregation with Maps

2016-12-04 Thread Anton Okolnychyi (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-18534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-18534:
-
Description: 
There is a problem with user-defined aggregations in the Dataset API in Spark 
1.6.3, while the identical code works fine in Spark 2.0. 

The problem appears only if {{ExpressionEncoder()}} is used for Maps. The same 
code with a Kryo-based alternative produces a correct result. If the encoder 
for a map is defined with the help of {{ExpressionEncoder()}}, Spark is not 
capable of reading the reduced values in the merge phase of the considered 
aggregation.

Code to reproduce:
{code}
  case class TestStopPoint(line: String, sequenceNumber: Int, id: String)

  // Does not work with ExpressionEncoder() and produces an empty map as a 
result
  implicit val intStringMapEncoder: Encoder[Map[Int, String]] = 
ExpressionEncoder()
  // Will work if a Kryo-based encoder is used
  // implicit val intStringMapEncoder: Encoder[Map[Int, String]] = 
org.apache.spark.sql.Encoders.kryo[Map[Int, String]]

  val sparkConf = new SparkConf()
.setAppName("DS Spark 1.6 Test")
.setMaster("local[4]")
  val sparkContext = new SparkContext(sparkConf)
  val sparkSqlContext = new SQLContext(sparkContext)

  import sparkSqlContext.implicits._

  val stopPointDS = Seq(TestStopPoint("33", 1, "id#1"), TestStopPoint("33", 2, 
"id#2")).toDS()

  val stopPointSequenceMap = new Aggregator[TestStopPoint, Map[Int, String], 
Map[Int, String]] {
override def zero = Map[Int, String]()
override def reduce(map: Map[Int, String], stopPoint: TestStopPoint) = {
  map.updated(stopPoint.sequenceNumber, stopPoint.id)
}
override def merge(map: Map[Int, String], anotherMap: Map[Int, String]) = {
  map ++ anotherMap
}
override def finish(reduction: Map[Int, String]) = reduction
  }.toColumn

  val resultMap = stopPointDS
.groupBy(_.line)
.agg(stopPointSequenceMap)
.collect()
.toMap
{code}

The code above produces an empty map as a result if the Map encoder is defined 
as {{ExpressionEncoder()}}. The Kryo-based encoder works fine (commented in the 
code).

A preliminary investigation was done to find out possible reasons for this 
behavior. I am not a Spark expert but hope it will help. 

The Physical Plan looks like:
{noformat}
== Physical Plan ==
SortBasedAggregate(key=[value#55], 
functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Final,isDistinct=false)], 
output=[value#55,anon$1(line,sequenceNumber,id)#64])
+- ConvertToSafe
   +- Sort [value#55 ASC], false, 0
  +- TungstenExchange hashpartitioning(value#55,1), None
 +- ConvertToUnsafe
+- SortBasedAggregate(key=[value#55], 
functions=[(anon$1(line#4,sequenceNumber#5,id#6),mode=Partial,isDistinct=false)],
 output=[value#55,value#60])
   +- ConvertToSafe
  +- Sort [value#55 ASC], false, 0
 +- !AppendColumns , class[line[0]: string, 
sequenceNumber[0]: int, id[0]: string], class[value[0]: string], [value#55]
+- ConvertToUnsafe
   +- LocalTableScan [line#4,sequenceNumber#5,id#6], 
[[0,22,1,280004,,31236469],[0,22,2,280004,,32236469]]
{noformat}
 
Everything untill the first (from bottom) {{SortBasedAggregate}} step and part 
of it is handled correctly. In particular, I see that each row correctly 
updates the mutable aggregation buffer in the {{update()}} method of the 
{{TypedAggregateExpression}} class. My initial idea was that the problem 
appeared in the {{ConvertToUnsafe}} step directly after the first 
{{SortBasedAggregate}}. If I take a look at the {{ConvertToUnsafe}} class, I 
can see that the first {{SortBasedAggregate}} returns a map with 2 elements (I 
call {{child.execute().collect()(0).getMap(1)}} in {{doExecute()}} of 
{{ConvertToUnsafe}} to see this). At the same time, if I examine the output of 
this {{ConvertToUnsafe}} in the identical way as its input, I see that the 
result map does not contain any elements. As a consequence, Spark operates on 
two empty maps in the {{merge()}} method of the {{TypedAggregateExpression}} 
class. However, my assumption was only partially correct. I did a more detailed 
investigation and its outcomes are described in comments.

  was:
There is a problem with user-defined aggregations in the Dataset API in Spark 
1.6.3, while the identical code works fine in Spark 2.0. 

The problem appears only if {{ExpressionEncoder()}} is used for Maps. The same 
code with a Kryo-based alternative produces a correct result. If the encoder 
for a map is defined with the help of {{ExpressionEncoder()}}, Spark is not 
capable of reading the reduced values in the merge phase of the considered 
aggregation.

Code to reproduce:
{code}
  case class TestStopPoint(line: String, sequenceNumber: Int, id: String)

  // Does not work with ExpressionEncoder() and

[jira] [Commented] (SPARK-18330) SparkR 2.1 QA: Update user guide for new features & APIs

2016-12-05 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15721753#comment-15721753
 ] 

Anton Okolnychyi commented on SPARK-18330:
--

I assume the task is R-specific. Is any help with Java/Scala docs needed?

> SparkR 2.1 QA: Update user guide for new features & APIs
> 
>
> Key: SPARK-18330
> URL: https://issues.apache.org/jira/browse/SPARK-18330
> Project: Spark
>  Issue Type: Sub-task
>  Components: Documentation, SparkR
>Reporter: Joseph K. Bradley
>Priority: Critical
>
> Check the user guide vs. a list of new APIs (classes, methods, data members) 
> to see what items require updates to the user guide.
> For each feature missing user guide doc:
> * Create a JIRA for that feature, and assign it to the author of the feature
> * Link it to (a) the original JIRA which introduced that feature ("related 
> to") and (b) to this JIRA ("requires").
> If you would like to work on this task, please comment, and we can create & 
> link JIRAs for parts of this work.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16899) Structured Streaming Checkpointing Example invalid

2016-12-07 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15729734#comment-15729734
 ] 

Anton Okolnychyi commented on SPARK-16899:
--

Is the ticket still valid? I cannot reproduce the exception if I run the 
example using Spark 2.0.2. 

> Structured Streaming Checkpointing Example invalid
> --
>
> Key: SPARK-16899
> URL: https://issues.apache.org/jira/browse/SPARK-16899
> Project: Spark
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Vladimir Feinberg
>Priority: Minor
>
> The structured streaming checkpointing example at the bottom of the page 
> (https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing)
>  has the following excerpt:
> {code}
> aggDF
>.writeStream
>.outputMode("complete")
>.option(“checkpointLocation”, “path/to/HDFS/dir”)
>.format("memory")
>.start()
> {code}
> But memory sinks are not fault-tolerant. Indeed, trying this out, I get the 
> following error: 
> {{This query does not support recovering from checkpoint location. Delete 
> /tmp/streaming.metadata-625631e5-baee-41da-acd1-f16c82f68a40/offsets to start 
> over.;}}
> The documentation should be changed to demonstrate checkpointing for a 
> non-aggregation streaming task, and explicitly mention there is no way to 
> checkpoint aggregates.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21102) Refresh command is too aggressive in parsing

2017-06-19 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16054591#comment-16054591
 ] 

Anton Okolnychyi commented on SPARK-21102:
--

Hi [~rxin],

I took a look at this issue and have a prototype that fixes this. It is 
available [here| 
https://github.com/aokolnychyi/spark/commit/fc2b7c02fab7f570ae3ca080ae1c2c9502300de7].
 I am not sure that my current implementation is the most optimal, so any 
feedback is appreciated. My first idea was to make the grammar as strict as 
possible. Unfortunately, there were some problems. I tried the approach below:

SqlBase.g4

{noformat}
...
| REFRESH TABLE tableIdentifier
#refreshTable
| REFRESH resourcePath 
#refreshResource
...

resourcePath
: STRING
| (IDENTIFIER | number | nonReserved | '/' | '-')+ // other symbols can be 
added if needed
;
{noformat}

It is not flexible enough and requires to explicitly mention all possible 
symbols. Therefore, I came up with the approach that was mentioned at the 
beginning.

Let me know your opinion on which one is better.

> Refresh command is too aggressive in parsing
> 
>
> Key: SPARK-21102
> URL: https://issues.apache.org/jira/browse/SPARK-21102
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Reynold Xin
>  Labels: starter
>
> SQL REFRESH command parsing is way too aggressive:
> {code}
> | REFRESH TABLE tableIdentifier
> #refreshTable
> | REFRESH .*?  
> #refreshResource
> {code}
> We should change it so it takes the whole string (without space), or a quoted 
> string.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18859) Catalyst codegen does not mark column as nullable when it should. Causes NPE

2017-07-05 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16075432#comment-16075432
 ] 

Anton Okolnychyi commented on SPARK-18859:
--

I took a look at this issue and seems this cannot be fixed on the Spark side. 
{{JdbcUtils#getSchema}} is responsible for identifying the schema in this case. 
Under the hood, it just uses the metadata from a {{ResultSet}} to create a 
{{Catalyst}} schema. The Optimizer is not involved in the schema extraction, 
everything is based on the {{ResultSet}} metadata. Therefore, this issue is 
only reproducible if the user specifies a query as the "dbtable" JDBC option.

> Catalyst codegen does not mark column as nullable when it should. Causes NPE
> 
>
> Key: SPARK-18859
> URL: https://issues.apache.org/jira/browse/SPARK-18859
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.0.2
>Reporter: Mykhailo Osypov
>Priority: Critical
>
> When joining two tables via LEFT JOIN, columns in right table may be NULLs, 
> however catalyst codegen cannot recognize it.
> Example:
> {code:title=schema.sql|borderStyle=solid}
> create table masterdata.testtable(
>   id int not null,
>   age int
> );
> create table masterdata.jointable(
>   id int not null,
>   name text not null
> );
> {code}
> {code:title=query_to_select.sql|borderStyle=solid}
> (select t.id, t.age, j.name from masterdata.testtable t left join 
> masterdata.jointable j on t.id = j.id) as testtable;
> {code}
> {code:title=master code|borderStyle=solid}
> val df = sqlContext
>   .read
>   .format("jdbc")
>   .option("dbTable", "query to select")
>   
>   .load
> //df generated schema
> /*
> root
>  |-- id: integer (nullable = false)
>  |-- age: integer (nullable = true)
>  |-- name: string (nullable = false)
> */
> {code}
> {code:title=Codegen|borderStyle=solid}
> /* 038 */   scan_rowWriter.write(0, scan_value);
> /* 039 */
> /* 040 */   if (scan_isNull1) {
> /* 041 */ scan_rowWriter.setNullAt(1);
> /* 042 */   } else {
> /* 043 */ scan_rowWriter.write(1, scan_value1);
> /* 044 */   }
> /* 045 */
> /* 046 */   scan_rowWriter.write(2, scan_value2);
> {code}
> Since *j.name* is from right table of *left join* query, it may be null. 
> However generated schema doesn't think so (probably because it defined as 
> *name text not null*)
> {code:title=StackTrace|borderStyle=solid}
> java.lang.NullPointerException
>   at 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:210)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.debug.package$DebugExec$$anonfun$3$$anon$1.hasNext(package.scala:146)
>   at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1763)
>   at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
>   at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-18859) Catalyst codegen does not mark column as nullable when it should. Causes NPE

2017-07-06 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076463#comment-16076463
 ] 

Anton Okolnychyi commented on SPARK-18859:
--

[~maropu] I think the approached suggested by you is a better option here since 
it will avoid the exception. However, this behavior should be also documented 
and clear to users. I can create a PR and we can discuss alternative ways to 
solve this issue there. What do you think?

> Catalyst codegen does not mark column as nullable when it should. Causes NPE
> 
>
> Key: SPARK-18859
> URL: https://issues.apache.org/jira/browse/SPARK-18859
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.0.2
>Reporter: Mykhailo Osypov
>Priority: Critical
>
> When joining two tables via LEFT JOIN, columns in right table may be NULLs, 
> however catalyst codegen cannot recognize it.
> Example:
> {code:title=schema.sql|borderStyle=solid}
> create table masterdata.testtable(
>   id int not null,
>   age int
> );
> create table masterdata.jointable(
>   id int not null,
>   name text not null
> );
> {code}
> {code:title=query_to_select.sql|borderStyle=solid}
> (select t.id, t.age, j.name from masterdata.testtable t left join 
> masterdata.jointable j on t.id = j.id) as testtable;
> {code}
> {code:title=master code|borderStyle=solid}
> val df = sqlContext
>   .read
>   .format("jdbc")
>   .option("dbTable", "query to select")
>   
>   .load
> //df generated schema
> /*
> root
>  |-- id: integer (nullable = false)
>  |-- age: integer (nullable = true)
>  |-- name: string (nullable = false)
> */
> {code}
> {code:title=Codegen|borderStyle=solid}
> /* 038 */   scan_rowWriter.write(0, scan_value);
> /* 039 */
> /* 040 */   if (scan_isNull1) {
> /* 041 */ scan_rowWriter.setNullAt(1);
> /* 042 */   } else {
> /* 043 */ scan_rowWriter.write(1, scan_value1);
> /* 044 */   }
> /* 045 */
> /* 046 */   scan_rowWriter.write(2, scan_value2);
> {code}
> Since *j.name* is from right table of *left join* query, it may be null. 
> However generated schema doesn't think so (probably because it defined as 
> *name text not null*)
> {code:title=StackTrace|borderStyle=solid}
> java.lang.NullPointerException
>   at 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:210)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.debug.package$DebugExec$$anonfun$3$$anon$1.hasNext(package.scala:146)
>   at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1763)
>   at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
>   at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-18859) Catalyst codegen does not mark column as nullable when it should. Causes NPE

2017-07-08 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-18859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16076463#comment-16076463
 ] 

Anton Okolnychyi edited comment on SPARK-18859 at 7/8/17 2:36 PM:
--

[~maropu] I think the approach suggested by you is a better option here since 
it will avoid the exception. However, this behavior should be also documented 
and clear to users. I can create a PR and we can discuss alternative ways to 
solve this issue there. What do you think?


was (Author: aokolnychyi):
[~maropu] I think the approached suggested by you is a better option here since 
it will avoid the exception. However, this behavior should be also documented 
and clear to users. I can create a PR and we can discuss alternative ways to 
solve this issue there. What do you think?

> Catalyst codegen does not mark column as nullable when it should. Causes NPE
> 
>
> Key: SPARK-18859
> URL: https://issues.apache.org/jira/browse/SPARK-18859
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.0.2
>Reporter: Mykhailo Osypov
>Priority: Critical
>
> When joining two tables via LEFT JOIN, columns in right table may be NULLs, 
> however catalyst codegen cannot recognize it.
> Example:
> {code:title=schema.sql|borderStyle=solid}
> create table masterdata.testtable(
>   id int not null,
>   age int
> );
> create table masterdata.jointable(
>   id int not null,
>   name text not null
> );
> {code}
> {code:title=query_to_select.sql|borderStyle=solid}
> (select t.id, t.age, j.name from masterdata.testtable t left join 
> masterdata.jointable j on t.id = j.id) as testtable;
> {code}
> {code:title=master code|borderStyle=solid}
> val df = sqlContext
>   .read
>   .format("jdbc")
>   .option("dbTable", "query to select")
>   
>   .load
> //df generated schema
> /*
> root
>  |-- id: integer (nullable = false)
>  |-- age: integer (nullable = true)
>  |-- name: string (nullable = false)
> */
> {code}
> {code:title=Codegen|borderStyle=solid}
> /* 038 */   scan_rowWriter.write(0, scan_value);
> /* 039 */
> /* 040 */   if (scan_isNull1) {
> /* 041 */ scan_rowWriter.setNullAt(1);
> /* 042 */   } else {
> /* 043 */ scan_rowWriter.write(1, scan_value1);
> /* 044 */   }
> /* 045 */
> /* 046 */   scan_rowWriter.write(2, scan_value2);
> {code}
> Since *j.name* is from right table of *left join* query, it may be null. 
> However generated schema doesn't think so (probably because it defined as 
> *name text not null*)
> {code:title=StackTrace|borderStyle=solid}
> java.lang.NullPointerException
>   at 
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter.write(UnsafeRowWriter.java:210)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
>   at 
> org.apache.spark.sql.execution.debug.package$DebugExec$$anonfun$3$$anon$1.hasNext(package.scala:146)
>   at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1763)
>   at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
>   at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1134)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
>   at 
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>   at org.apache.spark.scheduler.Task.run(Task.scala:86)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-20660) Not able to merge Dataframes with different column orders

2017-07-08 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16079340#comment-16079340
 ] 

Anton Okolnychyi commented on SPARK-20660:
--

I used the following code to investigate this problem:


{code}
val sc = spark.sparkContext

val inputSchema1 = StructType(
  StructField("key", StringType) ::
  StructField("value", IntegerType) ::
  Nil)
val rdd1 = sc.parallelize(1 to 2).map(x => Row(x.toString, 555))
val df1 = spark.createDataFrame(rdd1, inputSchema1)

val inputSchema2 = StructType(
  StructField("value", IntegerType) ::
  StructField("key", StringType) ::
  Nil)
val rdd2 = sc.parallelize(1 to 2).map(x => Row(555, x.toString))
val df2 = spark.createDataFrame(rdd2, inputSchema2)

val result = df1.union(df2)
result.explain(true)
result.show()
{code}

and it gives the following output:


{noformat}
== Parsed Logical Plan ==
'Union
:- LogicalRDD [key#2, value#3]
+- LogicalRDD [value#9, key#10]

== Analyzed Logical Plan ==
key: string, value: string
Union
:- Project [key#2, cast(value#3 as string) AS value#20]
:  +- LogicalRDD [key#2, value#3]
+- Project [cast(value#9 as string) AS value#21, key#10]
   +- LogicalRDD [value#9, key#10]

== Optimized Logical Plan ==
Union
:- Project [key#2, cast(value#3 as string) AS value#20]
:  +- LogicalRDD [key#2, value#3]
+- Project [cast(value#9 as string) AS value#21, key#10]
   +- LogicalRDD [value#9, key#10]

== Physical Plan ==
Union
:- *Project [key#2, cast(value#3 as string) AS value#20]
:  +- Scan ExistingRDD[key#2,value#3]
+- *Project [cast(value#9 as string) AS value#21, key#10]
   +- Scan ExistingRDD[value#9,key#10]
{noformat}


{noformat}
+---+-+
|key|value|
+---+-+
|  1|  555|
|  2|  555|
|555|1|
|555|2|
+---+-+
{noformat}

It is important to notice the result schema that consists of two strings even 
though the original schemes were different. This happens because of the 
{{WidenSetOperationTypes}} rule, which introduces casts to strings in the 
analyzed logical plan (since one column has type String the other is promoted 
to String as well). According to the Scala doc, this cast is done on purpose 
but leads to a confusing result in this particular scenario.

I am wondering what would be a better outcome in this case.

> Not able to merge Dataframes with different column orders
> -
>
> Key: SPARK-20660
> URL: https://issues.apache.org/jira/browse/SPARK-20660
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.1.0
>Reporter: Michel Lemay
>Priority: Minor
>
> Union on two dataframes with different column orders is not supported and 
> lead to hard to find issues.
> Here is an example showing the issue.
> {code}
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.Row
> var inputSchema = StructType(StructField("key", StringType, nullable=true) :: 
> StructField("value", IntegerType, nullable=true) :: Nil)
> var a = spark.createDataFrame(sc.parallelize((1 to 10)).map(x => 
> Row(x.toString, 555)), inputSchema)
> var b = a.select($"value" * 2 alias "value", $"key")  // any transformation 
> changing column order will show the problem.
> a.union(b).show
> // in order to make it work, we need to reorder columns
> val bCols = a.columns.map(aCol => b(aCol))
> a.union(b.select(bCols:_*)).show
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21332) Incorrect result type inferred for some decimal expressions

2017-07-09 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16079525#comment-16079525
 ] 

Anton Okolnychyi commented on SPARK-21332:
--

I know the root cause and will submit a PR soon.

> Incorrect result type inferred for some decimal expressions
> ---
>
> Key: SPARK-21332
> URL: https://issues.apache.org/jira/browse/SPARK-21332
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2
>Reporter: Alexander Shkapsky
>
> Decimal expressions do not always follow the type inference rules explained 
> in DecimalPrecision.scala.  An incorrect result type is produced when the 
> expressions contains  more than 2 decimals.
> For example:
> spark-sql> CREATE TABLE Decimals(decimal_26_6 DECIMAL(26,6)); 
> ...
> spark-sql> describe decimals;
> ...
> decimal_26_6  decimal(26,6)   NULL
> spark-sql> explain select decimal_26_6 * decimal_26_6 from decimals;
> ...
> == Physical Plan ==
> *Project [CheckOverflow((decimal_26_6#99 * decimal_26_6#99), 
> DecimalType(38,12)) AS (decimal_26_6 * decimal_26_6)#100]
> +- HiveTableScan [decimal_26_6#99], MetastoreRelation default, decimals
> However:
> spark-sql> explain select decimal_26_6 * decimal_26_6 * decimal_26_6 from 
> decimals;
> ...
> == Physical Plan ==
> *Project [CheckOverflow((cast(CheckOverflow((decimal_26_6#104 * 
> decimal_26_6#104), DecimalType(38,12)) as decimal(26,6)) * decimal_26_6#104), 
> DecimalType(38,12)) AS ((decimal_26_6 * decimal_26_6) * decimal_26_6)#105]
> +- HiveTableScan [decimal_26_6#104], MetastoreRelation default, decimals
> The expected result type is DecimalType(38,18).
> In Hive 1.1.0:
> hive> explain select decimal_26_6 * decimal_26_6 from decimals;
> OK
> STAGE DEPENDENCIES:
>   Stage-0 is a root stage
> STAGE PLANS:
>   Stage: Stage-0
> Fetch Operator
>   limit: -1
>   Processor Tree:
> TableScan
>   alias: decimals
>   Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column 
> stats: NONE
>   Select Operator
> expressions: (decimal_26_6 * decimal_26_6) (type: decimal(38,12))
> outputColumnNames: _col0
> Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column 
> stats: NONE
> ListSink
> Time taken: 0.772 seconds, Fetched: 17 row(s)
> hive> explain select decimal_26_6 * decimal_26_6 * decimal_26_6 from decimals;
> OK
> STAGE DEPENDENCIES:
>   Stage-0 is a root stage
> STAGE PLANS:
>   Stage: Stage-0
> Fetch Operator
>   limit: -1
>   Processor Tree:
> TableScan
>   alias: decimals
>   Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column 
> stats: NONE
>   Select Operator
> expressions: ((decimal_26_6 * decimal_26_6) * decimal_26_6) 
> (type: decimal(38,18))
> outputColumnNames: _col0
> Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column 
> stats: NONE
> ListSink
> Time taken: 0.064 seconds, Fetched: 17 row(s)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21417) Detect transitive join conditions via expressions

2017-07-16 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16088878#comment-16088878
 ] 

Anton Okolnychyi commented on SPARK-21417:
--

[~Aklakan] Do you want to work on it? If not, I can submit a PR and we can 
discuss all details with others there.

> Detect transitive join conditions via expressions
> -
>
> Key: SPARK-21417
> URL: https://issues.apache.org/jira/browse/SPARK-21417
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Claus Stadler
>
> _Disclaimer: The nature of this report is similar to that of 
> https://issues.apache.org/jira/browse/CALCITE-1887 - yet, as SPARK (to my 
> understanding) uses its own SQL implementation, the requested improvement has 
> to be treated as a separate issue._
> Given table aliases ta, tb column names ca, cb, and an arbitrary 
> (deterministic) expression expr then calcite should be capable to infer join 
> conditions by transitivity:
> {noformat}
> ta.ca = expr AND tb.cb = expr -> ta.ca = tb.cb
> {noformat}
> The use case for us stems from SPARQL to SQL rewriting, where SPARQL queries 
> such as
> {code:java}
> SELECT {
>   dbr:Leipzig a ?type .
>   dbr:Leipzig dbo:mayor ?mayor
> }
> {code}
> result in an SQL query similar to
> {noformat}
> SELECT s.rdf a, s.rdf b WHERE a.s = 'dbr:Leipzig' AND b.s = 'dbr:Leipzig'
> {noformat}
> A consequence of the join condition not being recognized is, that Apache 
> SPARK does not find an executable plan to process the query.
> Self contained example:
> {code:java}
> package my.package;
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types.{StringType, StructField, StructType}
> import org.scalatest._
> class TestSparkSqlJoin extends FlatSpec {
>   "SPARK SQL processor" should "be capable of handling transitive join 
> conditions" in {
> val spark = SparkSession
>   .builder()
>   .master("local[2]")
>   .appName("Spark SQL parser bug")
>   .getOrCreate()
> import spark.implicits._
> // The schema is encoded in a string
> val schemaString = "s p o"
> // Generate the schema based on the string of schema
> val fields = schemaString.split(" ")
>   .map(fieldName => StructField(fieldName, StringType, nullable = true))
> val schema = StructType(fields)
> val data = List(("s1", "p1", "o1"))
> val dataRDD = spark.sparkContext.parallelize(data).map(attributes => 
> Row(attributes._1, attributes._2, attributes._3))
> val df = spark.createDataFrame(dataRDD, schema).as("TRIPLES")
> df.createOrReplaceTempView("TRIPLES")
> println("First Query")
> spark.sql("SELECT A.s FROM TRIPLES A, TRIPLES B WHERE A.s = B.s AND A.s = 
> 'dbr:Leipzig'").show(10)
> println("Second Query")
> spark.sql("SELECT A.s FROM TRIPLES A, TRIPLES B WHERE A.s = 'dbr:Leipzig' 
> AND B.s = 'dbr:Leipzig'").show(10)
>   }
> }
> {code}
> Output (excerpt):
> {noformat}
> First Query
> ...
> +---+
> |  s|
> +---+
> +---+
> Second Query
> - should be capable of handling transitive join conditions *** FAILED ***
>   org.apache.spark.sql.AnalysisException: Detected cartesian product for 
> INNER join between logical plans
> Project [s#3]
> +- Filter (isnotnull(s#3) && (s#3 = dbr:Leipzig))
>+- LogicalRDD [s#3, p#4, o#5]
> and
> Project
> +- Filter (isnotnull(s#20) && (s#20 = dbr:Leipzig))
>+- LogicalRDD [s#20, p#21, o#22]
> Join condition is missing or trivial.
> Use the CROSS JOIN syntax to allow cartesian products between these 
> relations.;
>   at 
> org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$20.applyOrElse(Optimizer.scala:1080)
>   at 
> org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$20.applyOrElse(Optimizer.scala:1077)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>   ...
> Run completed in 6 seconds, 833 milliseconds.
> Total number of tests run: 1
> Suites: completed 1, aborted 0
> Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
> *** 1 TEST FAILED *

[jira] [Commented] (SPARK-21479) Outer join filter pushdown in null supplying table when condition is on one of the joined columns

2017-07-25 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16100691#comment-16100691
 ] 

Anton Okolnychyi commented on SPARK-21479:
--

I used the following code to investigate:

{code}
val inputSchema1 = StructType(
  StructField("col1", StringType) ::
  StructField("col2", IntegerType) ::
  Nil)

val inputSchema2 = StructType(
  StructField("col1", StringType) ::
  StructField("col3", StringType) ::
  Nil)

val rdd1 = sc.parallelize(1 to 3).map(v => Row(s"value $v", v))
val df1 = spark.createDataFrame(rdd1, inputSchema1)
val rdd2 = sc.parallelize(1 to 3).map(v => Row(s"value $v", "some value"))
val df2 = spark.createDataFrame(rdd2, inputSchema2)

// 1st use case
df1.join(df2, Seq("col1"), "right_outer").where("col2 = 2").explain(true)
// 2nd use case
df1.join(df2, Seq("col1"), "right_outer").where("col1 = 'value 
2'").explain(true)
{code}

It is important to notice that the actual join type in the first case is 
`inner` and not `rigth_outer`. This happens due to the `EliminateOuterJoin` 
rule, which sees that `col2 = 2` filters out non-matching rows on the left side 
of the join. Once the join type is changed, the `PushPredicateThroughJoin` rule 
pushes `col2 = 2` to the left relation. The analyzed and optimized logical 
plans are:

{noformat}
== Analyzed Logical Plan ==
col1: string, col2: int, col3: string
Filter (col2#3 = 2)
+- Project [col1#9, col2#3, col3#10]
   +- Join RightOuter, (col1#2 = col1#9)
  :- LogicalRDD [col1#2, col2#3]
  +- LogicalRDD [col1#9, col3#10]

== Optimized Logical Plan ==
Project [col1#9, col2#3, col3#10]
+- Join Inner, (col1#2 = col1#9)
   :- Filter ((isnotnull(col2#3) && (col2#3 = 2)) && isnotnull(col1#2))
   :  +- LogicalRDD [col1#2, col2#3]
   +- Filter isnotnull(col1#9)
  +- LogicalRDD [col1#9, col3#10]

{noformat}

The second case is different. The join type stays the same (i.e., 
`right_outer`) and the analyzed logical plan looks like:

{noformat}
== Analyzed Logical Plan ==
col1: string, col2: int, col3: string
Filter (col1#9 = value 2)
+- Project [col1#9, col2#3, col3#10]
   +- Join RightOuter, (col1#2 = col1#9)
  :- LogicalRDD [col1#2, col2#3]
  +- LogicalRDD [col1#9, col3#10]
{noformat}

`col1#9` from the Filter belongs to the right relation. After 
`PushPredicateThroughJoin` we have:

{noformat}
Join RightOuter, (col1#2 = col1#9)
:- LogicalRDD [col1#2, col2#3]
+- Filter (isnotnull(col1#9) && (col1#9 = value 2))
   +- LogicalRDD [col1#9, col3#10]
{noformat}


In theory, `InferFiltersFromConstraints` is capable of inferring `(col1#2 = 
value 2)` from `(col1#9 = value 2, col1#2 = col1#9)`. However, not in this case 
since the join type is `right_outer` and `InferFiltersFromConstraints` will 
process only constraints from the right relation (i.e., `(isnotnull(col1#9) && 
(col1#9 = value 2))`), which is not enough to infer `(col1#2 = value 2)`.

It seems like this is done on purpose and it is expected behavior even though 
additional `(col1#2 = value 2)` on the left relation would be logically correct 
here (as far as I understand).

> Outer join filter pushdown in null supplying table when condition is on one 
> of the joined columns
> -
>
> Key: SPARK-21479
> URL: https://issues.apache.org/jira/browse/SPARK-21479
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.1.0, 2.1.1, 2.2.0
>Reporter: Abhijit Bhole
>
> Here are two different query plans - 
> {code:java}
> df1 = spark.createDataFrame([{ "a": 1, "b" : 2}, { "a": 3, "b" : 4}])
> df2 = spark.createDataFrame([{ "a": 1, "c" : 5}, { "a": 3, "c" : 6}, { "a": 
> 5, "c" : 8}])
> df1.join(df2, ['a'], 'right_outer').where("b = 2").explain()
> == Physical Plan ==
> *Project [a#16299L, b#16295L, c#16300L]
> +- *SortMergeJoin [a#16294L], [a#16299L], Inner
>:- *Sort [a#16294L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartitioning(a#16294L, 4)
>: +- *Filter ((isnotnull(b#16295L) && (b#16295L = 2)) && 
> isnotnull(a#16294L))
>:+- Scan ExistingRDD[a#16294L,b#16295L]
>+- *Sort [a#16299L ASC NULLS FIRST], false, 0
>   +- Exchange hashpartitioning(a#16299L, 4)
>  +- *Filter isnotnull(a#16299L)
> +- Scan ExistingRDD[a#16299L,c#16300L]
> df1 = spark.createDataFrame([{ "a": 1, "b" : 2}, { "a": 3, "b" : 4}])
> df2 = spark.createDataFrame([{ "a": 1, "c" : 5}, { "a": 3, "c" : 6}, { "a": 
> 5, "c" : 8}])
> df1.join(df2, ['a'], 'right_outer').where("a = 1").explain()
> == Physical Plan ==
> *Project [a#16314L, b#16310L, c#16315L]
> +- SortMergeJoin [a#16309L], [a#16314L], RightOuter
>:- *Sort [a#16309L ASC NULLS FIRST], false, 0
>:  +- Exchange hashpartition

[jira] [Commented] (SPARK-21588) SQLContext.getConf(key, null) should return null, but it throws NPE

2017-08-03 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16113412#comment-16113412
 ] 

Anton Okolnychyi commented on SPARK-21588:
--

I did not manage to reproduce this. I tried:

{code}
spark.sqlContext.getConf("spark.sql.streaming.checkpointLocation", null) // null
spark.sqlContext.getConf("spark.sql.thriftserver.scheduler.pool", null) // null
spark.sqlContext.getConf("spark.sql.sources.outputCommitterClass", null) // null
spark.sqlContext.getConf("blabla", null) // null
spark.sqlContext.getConf("spark.sql.sources.outputCommitterClass") // 

{code}

I got a NPE only when I called getConf(key, null) for a parameter with a 
default value. For example, 
{code}
spark.sqlContext.getConf("spark.sql.thriftServer.incrementalCollect", 
"") // 
spark.sqlContext.getConf("spark.sql.thriftServer.incrementalCollect", null) // 
NPE
{code}


> SQLContext.getConf(key, null) should return null, but it throws NPE
> ---
>
> Key: SPARK-21588
> URL: https://issues.apache.org/jira/browse/SPARK-21588
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Burak Yavuz
>Priority: Minor
>
> SQLContext.get(key) for a key that is not defined in the conf, and doesn't 
> have a default value defined, throws a NoSuchElementException. In order to 
> avoid that, I used a null as the default value, which threw a NPE instead. If 
> it is null, it shouldn't try to parse the default value in `getConfString`



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21588) SQLContext.getConf(key, null) should return null, but it throws NPE

2017-08-03 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16113515#comment-16113515
 ] 

Anton Okolnychyi commented on SPARK-21588:
--

Sure, but the converter will not be called if the default value that you pass 
is "". However, the check can be extended to `defaultValue != null 
&& defaultValue != ""` in the SQLConf#getConfString.

> SQLContext.getConf(key, null) should return null, but it throws NPE
> ---
>
> Key: SPARK-21588
> URL: https://issues.apache.org/jira/browse/SPARK-21588
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Burak Yavuz
>Priority: Minor
>
> SQLContext.get(key) for a key that is not defined in the conf, and doesn't 
> have a default value defined, throws a NoSuchElementException. In order to 
> avoid that, I used a null as the default value, which threw a NPE instead. If 
> it is null, it shouldn't try to parse the default value in `getConfString`



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-21652) Optimizer cannot reach a fixed point on certain queries

2017-08-07 Thread Anton Okolnychyi (JIRA)
Anton Okolnychyi created SPARK-21652:


 Summary: Optimizer cannot reach a fixed point on certain queries
 Key: SPARK-21652
 URL: https://issues.apache.org/jira/browse/SPARK-21652
 Project: Spark
  Issue Type: Bug
  Components: Optimizer, SQL
Affects Versions: 2.2.0
Reporter: Anton Okolnychyi


The optimizer cannot reach a fixed point on the following query:

{code}
Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t1")
Seq(1, 2).toDF("col").write.saveAsTable("t2")
spark.sql("SELECT * FROM t1, t2 WHERE t1.col1 = 1 AND 1 = t1.col2 AND t1.col1 = 
t2.col AND t1.col2 = t2.col").explain(true)
{code}

At some point during the optimization, InferFiltersFromConstraints infers a new 
constraint '(col2#33 = col1#32)' that is appended to the join condition, then 
PushPredicateThroughJoin pushes it down, ConstantPropagation replaces '(col2#33 
= col1#32)' with '1 = 1' based on other propagated constraints, ConstantFolding 
replaces '1 = 1' with 'true and BooleanSimplification finally removes this 
predicate. However, InferFiltersFromConstraints will again infer '(col2#33 = 
col1#32)' on the next iteration and the process will continue until the limit 
of iterations is reached. 

See below for more details

{noformat}
=== Applying Rule 
org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
!Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) 
  Join Inner, ((col2#33 = col1#32) && ((col1#32 = col#34) && 
(col2#33 = col#34)))
 :- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && (1 
= col2#33)))   :- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && 
((col1#32 = 1) && (1 = col2#33)))
 :  +- Relation[col1#32,col2#33] parquet
  :  +- Relation[col1#32,col2#33] parquet
 +- Filter ((1 = col#34) && isnotnull(col#34))  
  +- Filter ((1 = col#34) && isnotnull(col#34))
+- Relation[col#34] parquet 
 +- Relation[col#34] parquet


=== Applying Rule 
org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin ===
!Join Inner, ((col2#33 = col1#32) && ((col1#32 = col#34) && (col2#33 = 
col#34)))  Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))
!:- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && (1 
= col2#33)))   :- Filter (col2#33 = col1#32)
!:  +- Relation[col1#32,col2#33] parquet
  :  +- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && 
((col1#32 = 1) && (1 = col2#33)))
!+- Filter ((1 = col#34) && isnotnull(col#34))  
  : +- Relation[col1#32,col2#33] parquet
!   +- Relation[col#34] parquet 
  +- Filter ((1 = col#34) && isnotnull(col#34))
!   
 +- Relation[col#34] parquet


=== Applying Rule org.apache.spark.sql.catalyst.optimizer.CombineFilters ===
 Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) 
 Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))
!:- Filter (col2#33 = col1#32)  
 :- Filter (((isnotnull(col1#32) && isnotnull(col2#33)) && 
((col1#32 = 1) && (1 = col2#33))) && (col2#33 = col1#32))
!:  +- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && 
(1 = col2#33)))   :  +- Relation[col1#32,col2#33] parquet
!: +- Relation[col1#32,col2#33] parquet 
 +- Filter ((1 = col#34) && isnotnull(col#34))
!+- Filter ((1 = col#34) && isnotnull(col#34))  
+- Relation[col#34] parquet
!   +- Relation[col#34] parquet 
 


=== Applying Rule org.apache.spark.sql.catalyst.optimizer.ConstantPropagation 
===
 Join Inner, ((col1#32 = col#34) && (col2#33 = col#34)) 
   Join Inner, ((col1#32 = col#34) && 
(col2#33 = col#34))
!:- Filter (((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && (1 
= col2#33))) && (col2#33 = col1#32))   :- Filter (((isnotnull(col1#32) && 
isnotnull(col2#33)) && ((col1#32 = 1) && (1 = col2#33))) && (1 = 1))
 :  +- Relation[col1#32,col2#33] parquet
   :  +- Relation[col1#32,col2#33] parquet
 +- Filter ((1 = col#34) && isnotnull(col#34))  
   +- Filter ((1 = col#34) && 
isnotnull(col#34))
+- Rel

[jira] [Commented] (SPARK-21652) Optimizer cannot reach a fixed point on certain queries

2017-08-07 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16116192#comment-16116192
 ] 

Anton Okolnychyi commented on SPARK-21652:
--

One option to fix this is NOT to apply ConstantPropagation to such predicates 
as '(col1 = col2)' if both sides can be replaced with a constant value.

> Optimizer cannot reach a fixed point on certain queries
> ---
>
> Key: SPARK-21652
> URL: https://issues.apache.org/jira/browse/SPARK-21652
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.2.0
>Reporter: Anton Okolnychyi
>
> The optimizer cannot reach a fixed point on the following query:
> {code}
> Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t1")
> Seq(1, 2).toDF("col").write.saveAsTable("t2")
> spark.sql("SELECT * FROM t1, t2 WHERE t1.col1 = 1 AND 1 = t1.col2 AND t1.col1 
> = t2.col AND t1.col2 = t2.col").explain(true)
> {code}
> At some point during the optimization, InferFiltersFromConstraints infers a 
> new constraint '(col2#33 = col1#32)' that is appended to the join condition, 
> then PushPredicateThroughJoin pushes it down, ConstantPropagation replaces 
> '(col2#33 = col1#32)' with '1 = 1' based on other propagated constraints, 
> ConstantFolding replaces '1 = 1' with 'true and BooleanSimplification finally 
> removes this predicate. However, InferFiltersFromConstraints will again infer 
> '(col2#33 = col1#32)' on the next iteration and the process will continue 
> until the limit of iterations is reached. 
> See below for more details
> {noformat}
> === Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
> !Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))   
> Join Inner, ((col2#33 = col1#32) && ((col1#32 = col#34) && 
> (col2#33 = col#34)))
>  :- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && 
> (1 = col2#33)))   :- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && 
> ((col1#32 = 1) && (1 = col2#33)))
>  :  +- Relation[col1#32,col2#33] parquet  
> :  +- Relation[col1#32,col2#33] parquet
>  +- Filter ((1 = col#34) && isnotnull(col#34))
> +- Filter ((1 = col#34) && isnotnull(col#34))
> +- Relation[col#34] parquet   
>+- Relation[col#34] parquet
> 
> === Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin ===
> !Join Inner, ((col2#33 = col1#32) && ((col1#32 = col#34) && (col2#33 = 
> col#34)))  Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))
> !:- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && 
> (1 = col2#33)))   :- Filter (col2#33 = col1#32)
> !:  +- Relation[col1#32,col2#33] parquet  
> :  +- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && 
> ((col1#32 = 1) && (1 = col2#33)))
> !+- Filter ((1 = col#34) && isnotnull(col#34))
> : +- Relation[col1#32,col2#33] parquet
> !   +- Relation[col#34] parquet   
> +- Filter ((1 = col#34) && isnotnull(col#34))
> ! 
>+- Relation[col#34] parquet
> 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.CombineFilters ===
>  Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))   
>Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))
> !:- Filter (col2#33 = col1#32)
>:- Filter (((isnotnull(col1#32) && isnotnull(col2#33)) && 
> ((col1#32 = 1) && (1 = col2#33))) && (col2#33 = col1#32))
> !:  +- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) 
> && (1 = col2#33)))   :  +- Relation[col1#32,col2#33] parquet
> !: +- Relation[col1#32,col2#33] parquet   
>+- Filter ((1 = col#34) && isnotnull(col#34))
> !+- Filter ((1 = col#34) && isnotnull(col#34))
>   +- Relation[col#34] parquet
> !   +- Relation[col#34] parquet   
>
> 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.ConstantPropagation 
> ===
>  Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))   
>  Join Inner, ((col1#32 = col#34) && 
> (col2#33 = col#34))
> !:- Filter (((isnotnull(col1#32) && isnotnull(col

[jira] [Commented] (SPARK-21652) Optimizer cannot reach a fixed point on certain queries

2017-08-07 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16116936#comment-16116936
 ] 

Anton Okolnychyi commented on SPARK-21652:
--

Yes, disabling the constraint propagation helps because 
`InferFiltersFromConstraints` will not apply. I found several issues regarding 
the performance of InferFiltersFromConstraints but what about the logic of 
`ConstantPropagation` in the above example? Should it replace such predicates 
as `(a = b)` with `(1 = 1)` even if it is semantically correct?

> Optimizer cannot reach a fixed point on certain queries
> ---
>
> Key: SPARK-21652
> URL: https://issues.apache.org/jira/browse/SPARK-21652
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.2.0
>Reporter: Anton Okolnychyi
>
> The optimizer cannot reach a fixed point on the following query:
> {code}
> Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t1")
> Seq(1, 2).toDF("col").write.saveAsTable("t2")
> spark.sql("SELECT * FROM t1, t2 WHERE t1.col1 = 1 AND 1 = t1.col2 AND t1.col1 
> = t2.col AND t1.col2 = t2.col").explain(true)
> {code}
> At some point during the optimization, InferFiltersFromConstraints infers a 
> new constraint '(col2#33 = col1#32)' that is appended to the join condition, 
> then PushPredicateThroughJoin pushes it down, ConstantPropagation replaces 
> '(col2#33 = col1#32)' with '1 = 1' based on other propagated constraints, 
> ConstantFolding replaces '1 = 1' with 'true and BooleanSimplification finally 
> removes this predicate. However, InferFiltersFromConstraints will again infer 
> '(col2#33 = col1#32)' on the next iteration and the process will continue 
> until the limit of iterations is reached. 
> See below for more details
> {noformat}
> === Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
> !Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))   
> Join Inner, ((col2#33 = col1#32) && ((col1#32 = col#34) && 
> (col2#33 = col#34)))
>  :- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && 
> (1 = col2#33)))   :- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && 
> ((col1#32 = 1) && (1 = col2#33)))
>  :  +- Relation[col1#32,col2#33] parquet  
> :  +- Relation[col1#32,col2#33] parquet
>  +- Filter ((1 = col#34) && isnotnull(col#34))
> +- Filter ((1 = col#34) && isnotnull(col#34))
> +- Relation[col#34] parquet   
>+- Relation[col#34] parquet
> 
> === Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin ===
> !Join Inner, ((col2#33 = col1#32) && ((col1#32 = col#34) && (col2#33 = 
> col#34)))  Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))
> !:- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && 
> (1 = col2#33)))   :- Filter (col2#33 = col1#32)
> !:  +- Relation[col1#32,col2#33] parquet  
> :  +- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && 
> ((col1#32 = 1) && (1 = col2#33)))
> !+- Filter ((1 = col#34) && isnotnull(col#34))
> : +- Relation[col1#32,col2#33] parquet
> !   +- Relation[col#34] parquet   
> +- Filter ((1 = col#34) && isnotnull(col#34))
> ! 
>+- Relation[col#34] parquet
> 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.CombineFilters ===
>  Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))   
>Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))
> !:- Filter (col2#33 = col1#32)
>:- Filter (((isnotnull(col1#32) && isnotnull(col2#33)) && 
> ((col1#32 = 1) && (1 = col2#33))) && (col2#33 = col1#32))
> !:  +- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) 
> && (1 = col2#33)))   :  +- Relation[col1#32,col2#33] parquet
> !: +- Relation[col1#32,col2#33] parquet   
>+- Filter ((1 = col#34) && isnotnull(col#34))
> !+- Filter ((1 = col#34) && isnotnull(col#34))
>   +- Relation[col#34] parquet
> !   +- Relation[col#34] parquet   
>
> 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.ConstantPropagation 
> ===
>  Join Inner, ((col1#32 = c

[jira] [Comment Edited] (SPARK-21652) Optimizer cannot reach a fixed point on certain queries

2017-08-07 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16116936#comment-16116936
 ] 

Anton Okolnychyi edited comment on SPARK-21652 at 8/7/17 8:06 PM:
--

Yes, disabling the constraint propagation helps because 
`InferFiltersFromConstraints` will not apply. I found several known issues 
regarding the performance of `InferFiltersFromConstraints` but what about the 
logic of `ConstantPropagation` in the above example? Should it replace such 
predicates as `(a = b)` with `(1 = 1)` even if it is semantically correct?


was (Author: aokolnychyi):
Yes, disabling the constraint propagation helps because 
`InferFiltersFromConstraints` will not apply. I found several issues regarding 
the performance of InferFiltersFromConstraints but what about the logic of 
`ConstantPropagation` in the above example? Should it replace such predicates 
as `(a = b)` with `(1 = 1)` even if it is semantically correct?

> Optimizer cannot reach a fixed point on certain queries
> ---
>
> Key: SPARK-21652
> URL: https://issues.apache.org/jira/browse/SPARK-21652
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.2.0
>Reporter: Anton Okolnychyi
>
> The optimizer cannot reach a fixed point on the following query:
> {code}
> Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t1")
> Seq(1, 2).toDF("col").write.saveAsTable("t2")
> spark.sql("SELECT * FROM t1, t2 WHERE t1.col1 = 1 AND 1 = t1.col2 AND t1.col1 
> = t2.col AND t1.col2 = t2.col").explain(true)
> {code}
> At some point during the optimization, InferFiltersFromConstraints infers a 
> new constraint '(col2#33 = col1#32)' that is appended to the join condition, 
> then PushPredicateThroughJoin pushes it down, ConstantPropagation replaces 
> '(col2#33 = col1#32)' with '1 = 1' based on other propagated constraints, 
> ConstantFolding replaces '1 = 1' with 'true and BooleanSimplification finally 
> removes this predicate. However, InferFiltersFromConstraints will again infer 
> '(col2#33 = col1#32)' on the next iteration and the process will continue 
> until the limit of iterations is reached. 
> See below for more details
> {noformat}
> === Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
> !Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))   
> Join Inner, ((col2#33 = col1#32) && ((col1#32 = col#34) && 
> (col2#33 = col#34)))
>  :- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && 
> (1 = col2#33)))   :- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && 
> ((col1#32 = 1) && (1 = col2#33)))
>  :  +- Relation[col1#32,col2#33] parquet  
> :  +- Relation[col1#32,col2#33] parquet
>  +- Filter ((1 = col#34) && isnotnull(col#34))
> +- Filter ((1 = col#34) && isnotnull(col#34))
> +- Relation[col#34] parquet   
>+- Relation[col#34] parquet
> 
> === Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin ===
> !Join Inner, ((col2#33 = col1#32) && ((col1#32 = col#34) && (col2#33 = 
> col#34)))  Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))
> !:- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && 
> (1 = col2#33)))   :- Filter (col2#33 = col1#32)
> !:  +- Relation[col1#32,col2#33] parquet  
> :  +- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && 
> ((col1#32 = 1) && (1 = col2#33)))
> !+- Filter ((1 = col#34) && isnotnull(col#34))
> : +- Relation[col1#32,col2#33] parquet
> !   +- Relation[col#34] parquet   
> +- Filter ((1 = col#34) && isnotnull(col#34))
> ! 
>+- Relation[col#34] parquet
> 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.CombineFilters ===
>  Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))   
>Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))
> !:- Filter (col2#33 = col1#32)
>:- Filter (((isnotnull(col1#32) && isnotnull(col2#33)) && 
> ((col1#32 = 1) && (1 = col2#33))) && (col2#33 = col1#32))
> !:  +- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) 
> && (1 = col2#33)))   :  +- Relation[col1#32,col2#33] parquet
> !: +- Relation[col1#32,col2#33] parquet 

[jira] [Commented] (SPARK-21691) Accessing canonicalized plan for query with limit throws exception

2017-08-12 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16124532#comment-16124532
 ] 

Anton Okolnychyi commented on SPARK-21691:
--

The issue is related to `Project \[\*\]` but not to `Limit`. You will always 
have this exception if you have a non-top level `Project \[\*\]` in your 
logical plan. For instance, the following query will also produce the same 
exception:

{noformat}
spark.sql("select * from (select * from (values 0, 1)) as 
v").queryExecution.logical.canonicalized
{noformat}

In the failing example from the ticket description, the non-canonicalized 
logical plan looks like:

{noformat}
'GlobalLimit 1
+- 'LocalLimit 1
   +- 'Project [*]
  +- 'SubqueryAlias __auto_generated_subquery_name
 +- 'UnresolvedInlineTable [col1], [List(0), List(1)]
{noformat}

Once Spark tries to canonicalize it and processes `LocalLimit 1`, it will get 
all attributes by calling `children.flatMap(_.output)`, which triggers the 
problem. `Project#output` will try to convert its project list to attributes, 
which will fail for `UnresolvedStar` with the aforementioned exception.

I see that `UnresolvedRelation` and `UnresolvedInlineTable` return Nil as 
output. Therefore, one option to fix this problem is to return `Nil` as output 
from `Project` if it is unresolved.

{noformat}
override def output: Seq[Attribute] = if (resolved) 
projectList.map(_.toAttribute) else Nil
{noformat}

I can fix it once we agree on a solution.


> Accessing canonicalized plan for query with limit throws exception
> --
>
> Key: SPARK-21691
> URL: https://issues.apache.org/jira/browse/SPARK-21691
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Bjoern Toldbod
>
> Accessing the logical, canonicalized plan fails for queries with limits.
> The following demonstrates the issue:
> {code:java}
> val session = SparkSession.builder.master("local").getOrCreate()
> // This works
> session.sql("select * from (values 0, 
> 1)").queryExecution.logical.canonicalized
> // This fails
> session.sql("select * from (values 0, 1) limit 
> 1").queryExecution.logical.canonicalized
> {code}
> The message in the thrown exception is somewhat confusing (or at least not 
> directly related to the limit):
> "Invalid call to toAttribute on unresolved object, tree: *"



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21691) Accessing canonicalized plan for query with limit throws exception

2017-08-24 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16141248#comment-16141248
 ] 

Anton Okolnychyi commented on SPARK-21691:
--

[~Robin Shao] My suggestion was to modify the output method in 'Project'. 

[~smilegator] Could you, please, provide your opinion on the proposed solution 
in the first comment?

> Accessing canonicalized plan for query with limit throws exception
> --
>
> Key: SPARK-21691
> URL: https://issues.apache.org/jira/browse/SPARK-21691
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Bjoern Toldbod
>
> Accessing the logical, canonicalized plan fails for queries with limits.
> The following demonstrates the issue:
> {code:java}
> val session = SparkSession.builder.master("local").getOrCreate()
> // This works
> session.sql("select * from (values 0, 
> 1)").queryExecution.logical.canonicalized
> // This fails
> session.sql("select * from (values 0, 1) limit 
> 1").queryExecution.logical.canonicalized
> {code}
> The message in the thrown exception is somewhat confusing (or at least not 
> directly related to the limit):
> "Invalid call to toAttribute on unresolved object, tree: *"



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21850) SparkSQL cannot perform LIKE someColumn if someColumn's value contains a backslash \

2017-08-28 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16144833#comment-16144833
 ] 

Anton Okolnychyi commented on SPARK-21850:
--

I guess this is related to [PR#15398| 
https://github.com/apache/spark/pull/15398], which follows [this 
standard|https://www.ibm.com/support/knowledgecenter/en/SSGU8G_12.1.0/com.ibm.sqls.doc/ids_sqs_1388.html].
 As a result, the only supported wildcard characters are '%' and '_', which can 
be escaped with '\'. The logic is implemented in 'StringUtils#escapeLikeRegex'. 

> SparkSQL cannot perform LIKE someColumn if someColumn's value contains a 
> backslash \
> 
>
> Key: SPARK-21850
> URL: https://issues.apache.org/jira/browse/SPARK-21850
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Adrien Lavoillotte
>
> I have a test table looking like this:
> {code:none}
> spark.sql("select * from `test`.`types_basic`").show()
> {code}
> ||id||c_tinyint|| [...] || c_string||
> |  0| -128| [...] |  string|
> |  1|0| [...] |string 'with' "qu...|
> |  2|  127| [...] |  unicod€ strĭng|
> |  3|   42| [...] |there is a \n in ...|
> |  4| null| [...] |null|
> Note the line with ID 3, which has a literal \n in c_string. I would like to 
> join another table using a LIKE condition (to join on prefix). If I do this:
> {code:none}
> spark.sql("select * from `test`.`types_basic` a where a.`c_string` LIKE 
> CONCAT(a.`c_string`, '%')").show()
> {code}
> I get the following error in spark 2.2 (but not in any earlier version):
> {noformat}
> 17/08/28 12:47:38 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 9.0 
> (TID 12, cdh5.local, executor 2): org.apache.spark.sql.AnalysisException: the 
> pattern 'there is a \n in this line%' is invalid, the escape character is not 
> allowed to precede 'n';
>   at 
> org.apache.spark.sql.catalyst.util.StringUtils$.fail$1(StringUtils.scala:42)
>   at 
> org.apache.spark.sql.catalyst.util.StringUtils$.escapeLikeRegex(StringUtils.scala:51)
>   at 
> org.apache.spark.sql.catalyst.util.StringUtils.escapeLikeRegex(StringUtils.scala)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:108)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:748)
> {noformat}
> It seems to me that if LIKE requires special escaping there, then it should 
> be provided by SparkSQL on the value of the column.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21850) SparkSQL cannot perform LIKE someColumn if someColumn's value contains a backslash \

2017-08-31 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16148705#comment-16148705
 ] 

Anton Okolnychyi commented on SPARK-21850:
--

Then we should not be bound to the LIKE case only. I am wondering what is 
expected in the code below?

{code}
Seq((1, "\\n")).toDF("col1", "col2").write.saveAsTable("t1")
spark.sql("SELECT * FROM t1").show()
+++
|col1|col2|
+++
|   1|  \n|
+++
spark.sql("SELECT * FROM t1 WHERE col2 = '\\n'").show() // does it give a wrong 
result?
+++
|col1|col2|
+++
+++
spark.sql("SELECT * FROM t1 WHERE col2 = '\n'").show()
+++
|col1|col2|
+++
+++
{code}

Just to mention, [PR#15398|https://github.com/apache/spark/pull/15398] was also 
merged in 2.1 (might not be publicly available yet). I got the same exception 
in the 2.1 branch using the following code:

{code}
Seq((1, "\\n")).toDF("col1", "col2").write.saveAsTable("t1")
spark.sql("SELECT * FROM t1").show()
spark.sql("SELECT * FROM t1 WHERE col2 LIKE CONCAT(col2, '%')").show()
{code}


> SparkSQL cannot perform LIKE someColumn if someColumn's value contains a 
> backslash \
> 
>
> Key: SPARK-21850
> URL: https://issues.apache.org/jira/browse/SPARK-21850
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Adrien Lavoillotte
>
> I have a test table looking like this:
> {code:none}
> spark.sql("select * from `test`.`types_basic`").show()
> {code}
> ||id||c_tinyint|| [...] || c_string||
> |  0| -128| [...] |  string|
> |  1|0| [...] |string 'with' "qu...|
> |  2|  127| [...] |  unicod€ strĭng|
> |  3|   42| [...] |there is a \n in ...|
> |  4| null| [...] |null|
> Note the line with ID 3, which has a literal \n in c_string (e.g. "some \\n 
> string", not a line break). I would like to join another table using a LIKE 
> condition (to join on prefix). If I do this:
> {code:none}
> spark.sql("select * from `test`.`types_basic` a where a.`c_string` LIKE 
> CONCAT(a.`c_string`, '%')").show()
> {code}
> I get the following error in spark 2.2 (but not in any earlier version):
> {noformat}
> 17/08/28 12:47:38 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 9.0 
> (TID 12, cdh5.local, executor 2): org.apache.spark.sql.AnalysisException: the 
> pattern 'there is a \n in this line%' is invalid, the escape character is not 
> allowed to precede 'n';
>   at 
> org.apache.spark.sql.catalyst.util.StringUtils$.fail$1(StringUtils.scala:42)
>   at 
> org.apache.spark.sql.catalyst.util.StringUtils$.escapeLikeRegex(StringUtils.scala:51)
>   at 
> org.apache.spark.sql.catalyst.util.StringUtils.escapeLikeRegex(StringUtils.scala)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:108)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:748)
> {noformat}
> It seems to me that if LIKE requires special escaping there, then it should 
> be provided by SparkSQL on the value of the column.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21850) SparkSQL cannot perform LIKE someColumn if someColumn's value contains a backslash \

2017-08-31 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16149401#comment-16149401
 ] 

Anton Okolnychyi commented on SPARK-21850:
--

[~instanceof me] yeah, you are absolutely correct, I need to double escape it. 
This means that a scala string "n" is "\\n" on the SQL layer and is 
"\n" as an actual column value, right? 

The LIKE pattern validation happens on actual values, which means that the last 
statement in the block below also fails with the same exception:
{code}
Seq((1, "\\nbc")).toDF("col1", "col2").write.saveAsTable("t1")
spark.sql("SELECT * FROM t1").show()
+++
|col1|col2|
+++
|   1|\nbc|
+++
spark.sql("SELECT * FROM t1 WHERE col2 = 'nbc'").show()
+++
|col1|col2|
+++
|   1|\nbc|
+++
spark.sql("SELECT * FROM t1 WHERE col2 LIKE 'nb_'").show() // fails
{code}

Since "\n" as a column value corresponds to "n" in a scala string and the 
exception occurs even with literals, the overall behavior looks logical. Do I 
miss anything? I am curious to understand so any explanation is welcome.

> SparkSQL cannot perform LIKE someColumn if someColumn's value contains a 
> backslash \
> 
>
> Key: SPARK-21850
> URL: https://issues.apache.org/jira/browse/SPARK-21850
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.0
>Reporter: Adrien Lavoillotte
>
> I have a test table looking like this:
> {code:none}
> spark.sql("select * from `test`.`types_basic`").show()
> {code}
> ||id||c_tinyint|| [...] || c_string||
> |  0| -128| [...] |  string|
> |  1|0| [...] |string 'with' "qu...|
> |  2|  127| [...] |  unicod€ strĭng|
> |  3|   42| [...] |there is a \n in ...|
> |  4| null| [...] |null|
> Note the line with ID 3, which has a literal \n in c_string (e.g. "some \\n 
> string", not a line break). I would like to join another table using a LIKE 
> condition (to join on prefix). If I do this:
> {code:none}
> spark.sql("select * from `test`.`types_basic` a where a.`c_string` LIKE 
> CONCAT(a.`c_string`, '%')").show()
> {code}
> I get the following error in spark 2.2 (but not in any earlier version):
> {noformat}
> 17/08/28 12:47:38 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 9.0 
> (TID 12, cdh5.local, executor 2): org.apache.spark.sql.AnalysisException: the 
> pattern 'there is a \n in this line%' is invalid, the escape character is not 
> allowed to precede 'n';
>   at 
> org.apache.spark.sql.catalyst.util.StringUtils$.fail$1(StringUtils.scala:42)
>   at 
> org.apache.spark.sql.catalyst.util.StringUtils$.escapeLikeRegex(StringUtils.scala:51)
>   at 
> org.apache.spark.sql.catalyst.util.StringUtils.escapeLikeRegex(StringUtils.scala)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>   at org.apache.spark.scheduler.Task.run(Task.scala:108)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:748)
> {noformat}
> It seems to me that if LIKE requires special escaping there, then it should 
> be provided by SparkSQL on the value of the column.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-21652) Optimizer cannot reach a fixed point on certain queries

2017-08-31 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16149462#comment-16149462
 ] 

Anton Okolnychyi commented on SPARK-21652:
--

Is there anything I can help here? I see that some cost-based estimation is 
needed. If there is an example/guide of what should be done, I can try to fix 
the issue.

> Optimizer cannot reach a fixed point on certain queries
> ---
>
> Key: SPARK-21652
> URL: https://issues.apache.org/jira/browse/SPARK-21652
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.2.0
>Reporter: Anton Okolnychyi
>
> The optimizer cannot reach a fixed point on the following query:
> {code}
> Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t1")
> Seq(1, 2).toDF("col").write.saveAsTable("t2")
> spark.sql("SELECT * FROM t1, t2 WHERE t1.col1 = 1 AND 1 = t1.col2 AND t1.col1 
> = t2.col AND t1.col2 = t2.col").explain(true)
> {code}
> At some point during the optimization, InferFiltersFromConstraints infers a 
> new constraint '(col2#33 = col1#32)' that is appended to the join condition, 
> then PushPredicateThroughJoin pushes it down, ConstantPropagation replaces 
> '(col2#33 = col1#32)' with '1 = 1' based on other propagated constraints, 
> ConstantFolding replaces '1 = 1' with 'true and BooleanSimplification finally 
> removes this predicate. However, InferFiltersFromConstraints will again infer 
> '(col2#33 = col1#32)' on the next iteration and the process will continue 
> until the limit of iterations is reached. 
> See below for more details
> {noformat}
> === Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
> !Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))   
> Join Inner, ((col2#33 = col1#32) && ((col1#32 = col#34) && 
> (col2#33 = col#34)))
>  :- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && 
> (1 = col2#33)))   :- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && 
> ((col1#32 = 1) && (1 = col2#33)))
>  :  +- Relation[col1#32,col2#33] parquet  
> :  +- Relation[col1#32,col2#33] parquet
>  +- Filter ((1 = col#34) && isnotnull(col#34))
> +- Filter ((1 = col#34) && isnotnull(col#34))
> +- Relation[col#34] parquet   
>+- Relation[col#34] parquet
> 
> === Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin ===
> !Join Inner, ((col2#33 = col1#32) && ((col1#32 = col#34) && (col2#33 = 
> col#34)))  Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))
> !:- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && 
> (1 = col2#33)))   :- Filter (col2#33 = col1#32)
> !:  +- Relation[col1#32,col2#33] parquet  
> :  +- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && 
> ((col1#32 = 1) && (1 = col2#33)))
> !+- Filter ((1 = col#34) && isnotnull(col#34))
> : +- Relation[col1#32,col2#33] parquet
> !   +- Relation[col#34] parquet   
> +- Filter ((1 = col#34) && isnotnull(col#34))
> ! 
>+- Relation[col#34] parquet
> 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.CombineFilters ===
>  Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))   
>Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))
> !:- Filter (col2#33 = col1#32)
>:- Filter (((isnotnull(col1#32) && isnotnull(col2#33)) && 
> ((col1#32 = 1) && (1 = col2#33))) && (col2#33 = col1#32))
> !:  +- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) 
> && (1 = col2#33)))   :  +- Relation[col1#32,col2#33] parquet
> !: +- Relation[col1#32,col2#33] parquet   
>+- Filter ((1 = col#34) && isnotnull(col#34))
> !+- Filter ((1 = col#34) && isnotnull(col#34))
>   +- Relation[col#34] parquet
> !   +- Relation[col#34] parquet   
>
> 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.ConstantPropagation 
> ===
>  Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))   
>  Join Inner, ((col1#32 = col#34) && 
> (col2#33 = col#34))
> !:- Filter (((isnotnull(col1#32) 

[jira] [Issue Comment Deleted] (SPARK-21652) Optimizer cannot reach a fixed point on certain queries

2017-08-31 Thread Anton Okolnychyi (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-21652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-21652:
-
Comment: was deleted

(was: Is there anything I can help here? I see that some cost-based estimation 
is needed. If there is an example/guide of what should be done, I can try to 
fix the issue.)

> Optimizer cannot reach a fixed point on certain queries
> ---
>
> Key: SPARK-21652
> URL: https://issues.apache.org/jira/browse/SPARK-21652
> Project: Spark
>  Issue Type: Bug
>  Components: Optimizer, SQL
>Affects Versions: 2.2.0
>Reporter: Anton Okolnychyi
>
> The optimizer cannot reach a fixed point on the following query:
> {code}
> Seq((1, 2)).toDF("col1", "col2").write.saveAsTable("t1")
> Seq(1, 2).toDF("col").write.saveAsTable("t2")
> spark.sql("SELECT * FROM t1, t2 WHERE t1.col1 = 1 AND 1 = t1.col2 AND t1.col1 
> = t2.col AND t1.col2 = t2.col").explain(true)
> {code}
> At some point during the optimization, InferFiltersFromConstraints infers a 
> new constraint '(col2#33 = col1#32)' that is appended to the join condition, 
> then PushPredicateThroughJoin pushes it down, ConstantPropagation replaces 
> '(col2#33 = col1#32)' with '1 = 1' based on other propagated constraints, 
> ConstantFolding replaces '1 = 1' with 'true and BooleanSimplification finally 
> removes this predicate. However, InferFiltersFromConstraints will again infer 
> '(col2#33 = col1#32)' on the next iteration and the process will continue 
> until the limit of iterations is reached. 
> See below for more details
> {noformat}
> === Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.InferFiltersFromConstraints ===
> !Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))   
> Join Inner, ((col2#33 = col1#32) && ((col1#32 = col#34) && 
> (col2#33 = col#34)))
>  :- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && 
> (1 = col2#33)))   :- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && 
> ((col1#32 = 1) && (1 = col2#33)))
>  :  +- Relation[col1#32,col2#33] parquet  
> :  +- Relation[col1#32,col2#33] parquet
>  +- Filter ((1 = col#34) && isnotnull(col#34))
> +- Filter ((1 = col#34) && isnotnull(col#34))
> +- Relation[col#34] parquet   
>+- Relation[col#34] parquet
> 
> === Applying Rule 
> org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin ===
> !Join Inner, ((col2#33 = col1#32) && ((col1#32 = col#34) && (col2#33 = 
> col#34)))  Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))
> !:- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) && 
> (1 = col2#33)))   :- Filter (col2#33 = col1#32)
> !:  +- Relation[col1#32,col2#33] parquet  
> :  +- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && 
> ((col1#32 = 1) && (1 = col2#33)))
> !+- Filter ((1 = col#34) && isnotnull(col#34))
> : +- Relation[col1#32,col2#33] parquet
> !   +- Relation[col#34] parquet   
> +- Filter ((1 = col#34) && isnotnull(col#34))
> ! 
>+- Relation[col#34] parquet
> 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.CombineFilters ===
>  Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))   
>Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))
> !:- Filter (col2#33 = col1#32)
>:- Filter (((isnotnull(col1#32) && isnotnull(col2#33)) && 
> ((col1#32 = 1) && (1 = col2#33))) && (col2#33 = col1#32))
> !:  +- Filter ((isnotnull(col1#32) && isnotnull(col2#33)) && ((col1#32 = 1) 
> && (1 = col2#33)))   :  +- Relation[col1#32,col2#33] parquet
> !: +- Relation[col1#32,col2#33] parquet   
>+- Filter ((1 = col#34) && isnotnull(col#34))
> !+- Filter ((1 = col#34) && isnotnull(col#34))
>   +- Relation[col#34] parquet
> !   +- Relation[col#34] parquet   
>
> 
> === Applying Rule org.apache.spark.sql.catalyst.optimizer.ConstantPropagation 
> ===
>  Join Inner, ((col1#32 = col#34) && (col2#33 = col#34))   
>  Join Inner, ((col1#32 = col#34) && 
> (col2#33 = col#34))
> !:- Filter (((isnotnull(col1#32) && isnotnull(col2#33)) &&

[jira] [Commented] (SPARK-21896) Stack Overflow when window function nested inside aggregate function

2017-09-02 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16151555#comment-16151555
 ] 

Anton Okolnychyi commented on SPARK-21896:
--

The root cause of this issue is the inability of {{ExtractWindowExpressions}} 
to handle aggregations defined directly on top of window expressions (e.g., 
{{df.groupBy().agg(max(rank().over(window)))}}). The first place that has to be 
updated is {{ExtractWindowExpressions#extract}} and the second one is 
{{ExtractWindowExpressions#apply}} (only the aggregation case). The former does 
not extract present window expressions from aggregate functions (lines 
1771-1774 in Analyzer) while the latter does not handle cases when aggregations 
should be computed on top of window computations (lines 1877-1888 in Analyzer).

I did not dive too much into the code, but it seems like 
{{ExtractWindowExpressions}} does not assume such definitions as 
{{df.groupBy().agg(max(rank().over(window)))}}. The question is if 
{{ExtractWindowExpressions}} should be extended/fixed to handle this case as 
well or it is by design and some analysis exception should be thrown.

As it is mentioned in the ticket description, 
{{df.select(rank().over(window).alias("rank")).agg(max("rank"))}} works fine.

> Stack Overflow when window function nested inside aggregate function
> 
>
> Key: SPARK-21896
> URL: https://issues.apache.org/jira/browse/SPARK-21896
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, SQL
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Luyao Yang
>Priority: Minor
>
> A minimal example: with the following simple test data
> {noformat}
> >>> df = spark.createDataFrame([(1, 2), (1, 3), (2, 4)], ['a', 'b'])
> >>> df.show()
> +---+---+
> |  a|  b|
> +---+---+
> |  1|  2|
> |  1|  3|
> |  2|  4|
> +---+---+
> {noformat}
> This works: 
> {noformat}
> >>> w = Window().orderBy('b')
> >>> result = (df.select(F.rank().over(w).alias('rk'))
> ....groupby()
> ....agg(F.max('rk'))
> ...  )
> >>> result.show()
> +---+
> |max(rk)|
> +---+
> |  3|
> +---+
> {noformat}
> But this equivalent gives an error. Note that the error is thrown right when 
> the operation is defined, not when an action is called later:
> {noformat}
> >>> result = (df.groupby()
> ....agg(F.max(F.rank().over(w)))
> ...  )
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/site-packages/IPython/core/interactiveshell.py", 
> line 2885, in run_code
> exec(code_obj, self.user_global_ns, self.user_ns)
>   File "", line 2, in 
> .agg(F.max(F.rank().over(w)))
>   File "/usr/lib/spark/python/pyspark/sql/group.py", line 91, in agg
> _to_seq(self.sql_ctx._sc, [c._jc for c in exprs[1:]]))
>   File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", 
> line 1133, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
> return f(*a, **kw)
>   File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 
> 319, in get_return_value
> format(target_id, ".", name), value)
> Py4JJavaError: An error occurred while calling o10789.agg.
> : java.lang.StackOverflowError
>   at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:55)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:400)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:381)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:293)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:277)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$71.apply(Analyzer.scala:1688)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$$anonfun$71.apply(Analyzer.scala:1724)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressions$.org$apache$spark$sql$catalyst$analysis$Analyzer$ExtractWindowExpressions$$extract(Analyzer.scala:1687)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ExtractWindowExpressio

[jira] [Comment Edited] (SPARK-21896) Stack Overflow when window function nested inside aggregate function

2017-09-02 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-21896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16151555#comment-16151555
 ] 

Anton Okolnychyi edited comment on SPARK-21896 at 9/2/17 7:21 PM:
--

The root cause of this issue is the inability of {{ExtractWindowExpressions}} 
to handle aggregations defined directly on top of window expressions (e.g., 
{{df.groupBy().agg(max(rank().over(window)))}}). 

The first place that has to be updated is {{ExtractWindowExpressions#extract}} 
and the second one is {{ExtractWindowExpressions#apply}} (only the aggregation 
case). The former does not extract present window expressions from aggregate 
functions (lines 1771-1774 in Analyzer) while the latter does not handle cases 
when aggregations should be computed on top of window computations (lines 
1877-1888 in Analyzer).

I did not dive too much into the code, but it seems like 
{{ExtractWindowExpressions}} does not assume such definitions as 
{{df.groupBy().agg(max(rank().over(window)))}}. The question is if 
{{ExtractWindowExpressions}} should be extended/fixed to handle this case as 
well or it is by design and some analysis exception should be thrown.

As it is mentioned in the ticket description, 
{{df.select(rank().over(window).alias("rank")).agg(max("rank"))}} works fine.


was (Author: aokolnychyi):
The root cause of this issue is the inability of {{ExtractWindowExpressions}} 
to handle aggregations defined directly on top of window expressions (e.g., 
{{df.groupBy().agg(max(rank().over(window)))}}). The first place that has to be 
updated is {{ExtractWindowExpressions#extract}} and the second one is 
{{ExtractWindowExpressions#apply}} (only the aggregation case). The former does 
not extract present window expressions from aggregate functions (lines 
1771-1774 in Analyzer) while the latter does not handle cases when aggregations 
should be computed on top of window computations (lines 1877-1888 in Analyzer).

I did not dive too much into the code, but it seems like 
{{ExtractWindowExpressions}} does not assume such definitions as 
{{df.groupBy().agg(max(rank().over(window)))}}. The question is if 
{{ExtractWindowExpressions}} should be extended/fixed to handle this case as 
well or it is by design and some analysis exception should be thrown.

As it is mentioned in the ticket description, 
{{df.select(rank().over(window).alias("rank")).agg(max("rank"))}} works fine.

> Stack Overflow when window function nested inside aggregate function
> 
>
> Key: SPARK-21896
> URL: https://issues.apache.org/jira/browse/SPARK-21896
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, SQL
>Affects Versions: 2.1.0, 2.2.0
>Reporter: Luyao Yang
>Priority: Minor
>
> A minimal example: with the following simple test data
> {noformat}
> >>> df = spark.createDataFrame([(1, 2), (1, 3), (2, 4)], ['a', 'b'])
> >>> df.show()
> +---+---+
> |  a|  b|
> +---+---+
> |  1|  2|
> |  1|  3|
> |  2|  4|
> +---+---+
> {noformat}
> This works: 
> {noformat}
> >>> w = Window().orderBy('b')
> >>> result = (df.select(F.rank().over(w).alias('rk'))
> ....groupby()
> ....agg(F.max('rk'))
> ...  )
> >>> result.show()
> +---+
> |max(rk)|
> +---+
> |  3|
> +---+
> {noformat}
> But this equivalent gives an error. Note that the error is thrown right when 
> the operation is defined, not when an action is called later:
> {noformat}
> >>> result = (df.groupby()
> ....agg(F.max(F.rank().over(w)))
> ...  )
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/site-packages/IPython/core/interactiveshell.py", 
> line 2885, in run_code
> exec(code_obj, self.user_global_ns, self.user_ns)
>   File "", line 2, in 
> .agg(F.max(F.rank().over(w)))
>   File "/usr/lib/spark/python/pyspark/sql/group.py", line 91, in agg
> _to_seq(self.sql_ctx._sc, [c._jc for c in exprs[1:]]))
>   File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", 
> line 1133, in __call__
> answer, self.gateway_client, self.target_id, self.name)
>   File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
> return f(*a, **kw)
>   File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 
> 319, in get_return_value
> format(target_id, ".", name), value)
> Py4JJavaError: An error occurred while calling o10789.agg.
> : java.lang.StackOverflowError
>   at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:55)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:400)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:381)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.

[jira] [Commented] (SPARK-16303) Update SQL examples and programming guide for Scala and Java language bindings

2016-07-07 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366905#comment-15366905
 ] 

Anton Okolnychyi commented on SPARK-16303:
--

[~lian cheng] 
I would like to share some updates and get initial feedback on some points.

The code can be found here:
https://github.com/apache/spark/compare/master...aokolnychyi:sql_guide_update_prototype

1. On one hand, it would be nice to split the whole SQL guide into several 
source files. On the other hand, too many source files are not good either. 
According to what I have seen in the aforementioned prototype and my own point 
of view, I suggest having everything till the 'Data Sources' section in one 
single source file. However, it is still nice to have some modularity within 
this source file. For that reason, I suggest using separate methods for each 
meaningful block. This will help to avoid naming problems and will make 
navigation within the file easier. This idea is already present in the code 
that I shared.

2. Imports. First of all, is the same style used for Java imports as for Scala 
ones? I also noticed that imports are present in some examples. If we have 
source files that will cover several code snippets, then we will have a 
problem. As far as I read, it is impossible to overlap examples in the plugin 
that we use to extract code snippets from the source files.

3. I noticed that the current java version is 1.7 in the parent pom. Is it 
possible to update the examples submodule to 1.8? I believe that lambdas will 
simplify the Java code and make it more readable.  

4. What is the correct way to load RDDs? There are different alternatives. For 
instance, via spark.sparkContext, or via DataFrames/Datasets. I assume that the 
first way makes more sense in section 'Interoperating with RDDs' rather than 
creating DataFrames/Datasets, getting RDDs and then converting back. 

5. Is it fine to re-use encoders?

6. If I use the getValuesMap\[T\]() method, then I will have a Dataset of 
Map\[String, T\] as a result. It seems that Maps are unsupported right now in 
Datasets.

> Update SQL examples and programming guide for Scala and Java language bindings
> --
>
> Key: SPARK-16303
> URL: https://issues.apache.org/jira/browse/SPARK-16303
> Project: Spark
>  Issue Type: Sub-task
>  Components: Documentation, Examples
>Affects Versions: 2.0.0
>Reporter: Cheng Lian
>Assignee: Anton Okolnychyi
>
> We need to update SQL examples code under the {{examples}} sub-project, and 
> then replace hard-coded snippets in the SQL programming guide with snippets 
> automatically extracted from actual source files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16046) Add Spark SQL Dataset Tutorial

2016-07-19 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15383865#comment-15383865
 ] 

Anton Okolnychyi commented on SPARK-16046:
--

Hi, [~pedrorodriguez]. 

Is anything that I can help here? 
I have added recently a few examples of accessing columns via $ in the SQL 
guide. 
In my view, it is useful also to add window function examples. I noticed that 
the process of resolving window frames is not obvious/documented. There were 
related questions on the users mailing list.

> Add Spark SQL Dataset Tutorial
> --
>
> Key: SPARK-16046
> URL: https://issues.apache.org/jira/browse/SPARK-16046
> Project: Spark
>  Issue Type: Documentation
>  Components: Documentation, SQL
>Affects Versions: 2.0.0
>Reporter: Pedro Rodriguez
>
> Issue to update the Spark SQL guide to provide more content around using 
> Datasets. This would expand the Creating Datasets section of the Spark SQL 
> documentation.
> Goals
> 1. Add more examples of column access via $ and `
> 2. Add examples of aggregates
> 3. Add examples of using Spark SQL functions
> What else would be useful to have?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16303) Update SQL examples and programming guide

2016-07-05 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15362291#comment-15362291
 ] 

Anton Okolnychyi commented on SPARK-16303:
--

Dear [~lian cheng],

I have seen your recent work in replacing hard-coded snippets in the SQL 
programming guide with the actual code from examples. For instance, Spark 
session initialization based on the sql.RDDRelation.scala example. 
Is anything that I can help here? 

> Update SQL examples and programming guide
> -
>
> Key: SPARK-16303
> URL: https://issues.apache.org/jira/browse/SPARK-16303
> Project: Spark
>  Issue Type: Sub-task
>  Components: Documentation, Examples
>Affects Versions: 2.0.0
>Reporter: Cheng Lian
>Assignee: Cheng Lian
>
> We need to update SQL examples code under the {{examples}} sub-project, and 
> then replace hard-coded snippets in the SQL programming guide with snippets 
> automatically extracted from actual source files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-16303) Update SQL examples and programming guide

2016-07-05 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15362291#comment-15362291
 ] 

Anton Okolnychyi edited comment on SPARK-16303 at 7/5/16 9:48 AM:
--

Dear [~lian cheng],

I have seen your recent work in replacing hard-coded snippets in the SQL 
programming guide with the actual code from examples. For instance, Spark 
session initialization based on the sql.RDDRelation.scala example. 
Is anything that I can help here? For example, replace some hard-coded snippets 
based on the existing examples, create new examples to cover more 
functionality, etc.


was (Author: aokolnychyi):
Dear [~lian cheng],

I have seen your recent work in replacing hard-coded snippets in the SQL 
programming guide with the actual code from examples. For instance, Spark 
session initialization based on the sql.RDDRelation.scala example. 
Is anything that I can help here? 

> Update SQL examples and programming guide
> -
>
> Key: SPARK-16303
> URL: https://issues.apache.org/jira/browse/SPARK-16303
> Project: Spark
>  Issue Type: Sub-task
>  Components: Documentation, Examples
>Affects Versions: 2.0.0
>Reporter: Cheng Lian
>Assignee: Cheng Lian
>
> We need to update SQL examples code under the {{examples}} sub-project, and 
> then replace hard-coded snippets in the SQL programming guide with snippets 
> automatically extracted from actual source files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16303) Update SQL examples and programming guide

2016-07-05 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15362618#comment-15362618
 ] 

Anton Okolnychyi commented on SPARK-16303:
--

Thanks for the detailed response.

I also noticed that there are not many Spark SQL examples. For instance, I did 
not find any Window function examples. That's why I created a small pull 
request yesterday. The link to it: https://github.com/apache/spark/pull/14050

I do not know Python and R, but I would be glad to help with Java and Scala. I 
completely agree with the goals you mentioned. So, let me know if we can split 
the task and I can actually help. 

> Update SQL examples and programming guide
> -
>
> Key: SPARK-16303
> URL: https://issues.apache.org/jira/browse/SPARK-16303
> Project: Spark
>  Issue Type: Sub-task
>  Components: Documentation, Examples
>Affects Versions: 2.0.0
>Reporter: Cheng Lian
>Assignee: Cheng Lian
>
> We need to update SQL examples code under the {{examples}} sub-project, and 
> then replace hard-coded snippets in the SQL programming guide with snippets 
> automatically extracted from actual source files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-16303) Update SQL examples and programming guide for Scala and Java language bindings

2016-07-05 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-16303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15362649#comment-15362649
 ] 

Anton Okolnychyi commented on SPARK-16303:
--

Thanks a lot! 

I have started the progress and will post all updates here.  

> Update SQL examples and programming guide for Scala and Java language bindings
> --
>
> Key: SPARK-16303
> URL: https://issues.apache.org/jira/browse/SPARK-16303
> Project: Spark
>  Issue Type: Sub-task
>  Components: Documentation, Examples
>Affects Versions: 2.0.0
>Reporter: Cheng Lian
>Assignee: Anton Okolnychyi
>
> We need to update SQL examples code under the {{examples}} sub-project, and 
> then replace hard-coded snippets in the SQL programming guide with snippets 
> automatically extracted from actual source files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26204) Optimize InSet expression

2019-02-13 Thread Anton Okolnychyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-26204:
-
Description: 
The {{InSet}} expression was introduced in SPARK-3711 to avoid O(n) time 
complexity in the {{In}} expression. As {{InSet}} relies on Scala 
{{immutable.Set}}, it introduces expensive autoboxing. As a consequence, the 
performance of {{InSet}} might be significantly slower than {{In}} even on 100+ 
values.

We need to find an approach how to optimize {{InSet}} expressions and avoid the 
cost of autoboxing.

 There are a few approaches that we can use:
 * Collections for primitive values (e.g., FastUtil,  HPPC)
 * Type specialization in Scala (e.g., OpenHashSet in Spark)

According to my local benchmarks, {{OpenHashSet}}, which is already available 
in Spark and uses type specialization, can significantly reduce the memory 
footprint. However, it slows down the computation even compared to the built-in 
Scala sets. On the other hand, FastUtil and HPPC did work and gave a 
substantial improvement in the performance. So, it makes sense to evaluate 
primitive collections.

See the attached screenshot of what I experienced while testing.

  was:
The {{InSet}} expression was introduced in SPARK-3711 to avoid O\(n\) time 
complexity in the {{In}} expression. As {{InSet}} relies on Scala 
{{immutable.Set}}, it introduces expensive autoboxing. As a consequence, the 
performance of {{InSet}} might be significantly slower than {{In}} even on 100+ 
values.

We need to find an approach how to optimize {{InSet}} expressions and avoid the 
cost of autoboxing.

 There are a few approaches that we can use:
 * Collections for primitive values (e.g., FastUtil,  HPPC)
 * Type specialization in Scala (would it even work for code gen in Spark?)

I tried to use {{OpenHashSet}}, which is already available in Spark and uses 
type specialization. However, I did not manage to avoid autoboxing. On the 
other hand, FastUtil did work and I saw a substantial improvement in the 
performance.

See the attached screenshot of what I experienced while testing.
 


> Optimize InSet expression
> -
>
> Key: SPARK-26204
> URL: https://issues.apache.org/jira/browse/SPARK-26204
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Anton Okolnychyi
>Priority: Major
> Attachments: heap size.png
>
>
> The {{InSet}} expression was introduced in SPARK-3711 to avoid O(n) time 
> complexity in the {{In}} expression. As {{InSet}} relies on Scala 
> {{immutable.Set}}, it introduces expensive autoboxing. As a consequence, the 
> performance of {{InSet}} might be significantly slower than {{In}} even on 
> 100+ values.
> We need to find an approach how to optimize {{InSet}} expressions and avoid 
> the cost of autoboxing.
>  There are a few approaches that we can use:
>  * Collections for primitive values (e.g., FastUtil,  HPPC)
>  * Type specialization in Scala (e.g., OpenHashSet in Spark)
> According to my local benchmarks, {{OpenHashSet}}, which is already available 
> in Spark and uses type specialization, can significantly reduce the memory 
> footprint. However, it slows down the computation even compared to the 
> built-in Scala sets. On the other hand, FastUtil and HPPC did work and gave a 
> substantial improvement in the performance. So, it makes sense to evaluate 
> primitive collections.
> See the attached screenshot of what I experienced while testing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26204) Optimize InSet expression

2019-02-13 Thread Anton Okolnychyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-26204:
-
Description: 
The {{InSet}} expression was introduced in SPARK-3711 to avoid O\(n\) time 
complexity in the {{In}} expression. As {{InSet}} relies on Scala 
{{immutable.Set}}, it introduces expensive autoboxing. As a consequence, the 
performance of {{InSet}} might be significantly slower than {{In}} even on 100+ 
values.

We need to find an approach how to optimize {{InSet}} expressions and avoid the 
cost of autoboxing.

 There are a few approaches that we can use:
 * Collections for primitive values (e.g., FastUtil,  HPPC)
 * Type specialization in Scala (e.g., OpenHashSet in Spark)

According to my local benchmarks, {{OpenHashSet}}, which is already available 
in Spark and uses type specialization, can significantly reduce the memory 
footprint. However, it slows down the computation even compared to the built-in 
Scala sets. On the other hand, FastUtil and HPPC did work and gave a 
substantial improvement in the performance. So, it makes sense to evaluate 
primitive collections.

See the attached screenshot of what I experienced while testing.

  was:
The {{InSet}} expression was introduced in SPARK-3711 to avoid O(n) time 
complexity in the {{In}} expression. As {{InSet}} relies on Scala 
{{immutable.Set}}, it introduces expensive autoboxing. As a consequence, the 
performance of {{InSet}} might be significantly slower than {{In}} even on 100+ 
values.

We need to find an approach how to optimize {{InSet}} expressions and avoid the 
cost of autoboxing.

 There are a few approaches that we can use:
 * Collections for primitive values (e.g., FastUtil,  HPPC)
 * Type specialization in Scala (e.g., OpenHashSet in Spark)

According to my local benchmarks, {{OpenHashSet}}, which is already available 
in Spark and uses type specialization, can significantly reduce the memory 
footprint. However, it slows down the computation even compared to the built-in 
Scala sets. On the other hand, FastUtil and HPPC did work and gave a 
substantial improvement in the performance. So, it makes sense to evaluate 
primitive collections.

See the attached screenshot of what I experienced while testing.


> Optimize InSet expression
> -
>
> Key: SPARK-26204
> URL: https://issues.apache.org/jira/browse/SPARK-26204
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Anton Okolnychyi
>Priority: Major
> Attachments: heap size.png
>
>
> The {{InSet}} expression was introduced in SPARK-3711 to avoid O\(n\) time 
> complexity in the {{In}} expression. As {{InSet}} relies on Scala 
> {{immutable.Set}}, it introduces expensive autoboxing. As a consequence, the 
> performance of {{InSet}} might be significantly slower than {{In}} even on 
> 100+ values.
> We need to find an approach how to optimize {{InSet}} expressions and avoid 
> the cost of autoboxing.
>  There are a few approaches that we can use:
>  * Collections for primitive values (e.g., FastUtil,  HPPC)
>  * Type specialization in Scala (e.g., OpenHashSet in Spark)
> According to my local benchmarks, {{OpenHashSet}}, which is already available 
> in Spark and uses type specialization, can significantly reduce the memory 
> footprint. However, it slows down the computation even compared to the 
> built-in Scala sets. On the other hand, FastUtil and HPPC did work and gave a 
> substantial improvement in the performance. So, it makes sense to evaluate 
> primitive collections.
> See the attached screenshot of what I experienced while testing.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26205) Optimize InSet expression for bytes, shorts, ints, dates

2019-02-28 Thread Anton Okolnychyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-26205:
-
Summary: Optimize InSet expression for bytes, shorts, ints, dates  (was: 
Optimize In expression for bytes, shorts, ints)

> Optimize InSet expression for bytes, shorts, ints, dates
> 
>
> Key: SPARK-26205
> URL: https://issues.apache.org/jira/browse/SPARK-26205
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> Currently, {{In}} expressions are compiled into a sequence of if-else 
> statements, which results in O\(n\) time complexity. {{InSet}} is an 
> optimized version of {{In}}, which is supposed to improve the performance if 
> the number of elements is big enough. However, {{InSet}} actually degrades 
> the performance in many cases due to various reasons (benchmarks will be 
> available in SPARK-26203 and solutions are discussed in SPARK-26204).
> The main idea of this JIRA is to make use of {{tableswitch}} and 
> {{lookupswitch}} bytecode instructions. In short, we can improve our time 
> complexity from O\(n\) to O\(1\) or at least O\(log n\) by using Java 
> {{switch}} statements. We will have O\(1\) time complexity if our case values 
> are compact and {{tableswitch}} can be used. Otherwise, {{lookupswitch}} will 
> give us O\(log n\). 
> An important benefit of the proposed approach is that we do not have to pay 
> an extra cost for autoboxing as in case of {{InSet}}. As a consequence, we 
> can substantially outperform {{InSet}} even on 250+ elements.
> See 
> [here|https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-3.html#jvms-3.10]
>  and 
> [here|https://stackoverflow.com/questions/10287700/difference-between-jvms-lookupswitch-and-tableswitch]
>  for more information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26205) Optimize InSet expression for bytes, shorts, ints, dates

2019-02-28 Thread Anton Okolnychyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-26205:
-
Description: 
{{In}} expressions are compiled into a sequence of if-else statements, which 
results in O\(n\) time complexity. {{InSet}} is an optimized version of {{In}}, 
which is supposed to improve the performance if the number of elements is big 
enough. However, {{InSet}} actually degrades the performance in many cases due 
to various reasons (benchmarks were created in SPARK-26203 and solutions to the 
boxing problem are discussed in SPARK-26204).

The main idea of this JIRA is to use Java {{switch}} statements to 
significantly improve the performance of {{InSet}} expressions for bytes, 
shorts, ints, dates. All {{switch}} statements are compiled into 
{{tableswitch}} and {{lookupswitch}} bytecode instructions. We will have O\(1\) 
time complexity if our case values are compact and {{tableswitch}} can be used. 
Otherwise, {{lookupswitch}} will give us O\(log n\). Our local benchmarks show 
that this logic is more than two times faster even on 500+ elements than using 
primitive collections in {{InSet}} expressions. As Spark is using Scala 
{{HashSet}} right now, the performance gain will be is even bigger.

See 
[here|https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-3.html#jvms-3.10] 
and 
[here|https://stackoverflow.com/questions/10287700/difference-between-jvms-lookupswitch-and-tableswitch]
 for more information.

  was:
Currently, {{In}} expressions are compiled into a sequence of if-else 
statements, which results in O\(n\) time complexity. {{InSet}} is an optimized 
version of {{In}}, which is supposed to improve the performance if the number 
of elements is big enough. However, {{InSet}} actually degrades the performance 
in many cases due to various reasons (benchmarks will be available in 
SPARK-26203 and solutions are discussed in SPARK-26204).

The main idea of this JIRA is to make use of {{tableswitch}} and 
{{lookupswitch}} bytecode instructions. In short, we can improve our time 
complexity from O\(n\) to O\(1\) or at least O\(log n\) by using Java 
{{switch}} statements. We will have O\(1\) time complexity if our case values 
are compact and {{tableswitch}} can be used. Otherwise, {{lookupswitch}} will 
give us O\(log n\). 

An important benefit of the proposed approach is that we do not have to pay an 
extra cost for autoboxing as in case of {{InSet}}. As a consequence, we can 
substantially outperform {{InSet}} even on 250+ elements.

See 
[here|https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-3.html#jvms-3.10] 
and 
[here|https://stackoverflow.com/questions/10287700/difference-between-jvms-lookupswitch-and-tableswitch]
 for more information.


> Optimize InSet expression for bytes, shorts, ints, dates
> 
>
> Key: SPARK-26205
> URL: https://issues.apache.org/jira/browse/SPARK-26205
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> {{In}} expressions are compiled into a sequence of if-else statements, which 
> results in O\(n\) time complexity. {{InSet}} is an optimized version of 
> {{In}}, which is supposed to improve the performance if the number of 
> elements is big enough. However, {{InSet}} actually degrades the performance 
> in many cases due to various reasons (benchmarks were created in SPARK-26203 
> and solutions to the boxing problem are discussed in SPARK-26204).
> The main idea of this JIRA is to use Java {{switch}} statements to 
> significantly improve the performance of {{InSet}} expressions for bytes, 
> shorts, ints, dates. All {{switch}} statements are compiled into 
> {{tableswitch}} and {{lookupswitch}} bytecode instructions. We will have 
> O\(1\) time complexity if our case values are compact and {{tableswitch}} can 
> be used. Otherwise, {{lookupswitch}} will give us O\(log n\). Our local 
> benchmarks show that this logic is more than two times faster even on 500+ 
> elements than using primitive collections in {{InSet}} expressions. As Spark 
> is using Scala {{HashSet}} right now, the performance gain will be is even 
> bigger.
> See 
> [here|https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-3.html#jvms-3.10]
>  and 
> [here|https://stackoverflow.com/questions/10287700/difference-between-jvms-lookupswitch-and-tableswitch]
>  for more information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-24575) Prohibit window expressions inside WHERE and HAVING clauses

2018-06-17 Thread Anton Okolnychyi (JIRA)
Anton Okolnychyi created SPARK-24575:


 Summary: Prohibit window expressions inside WHERE and HAVING 
clauses
 Key: SPARK-24575
 URL: https://issues.apache.org/jira/browse/SPARK-24575
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.4.0
Reporter: Anton Okolnychyi
 Fix For: 2.4.0


Why window functions inside WHERE and HAVING clauses should be prohibited is 
described 
[here|https://stackoverflow.com/questions/13997177/why-no-windowed-functions-in-where-clauses].

Spark, on the other hand, does not handle this explicitly and will fail with 
non-descriptive exceptions.


{code:scala}
val df = Seq((1, 2), (1, 3), (2, 4), (5, 5)).toDF("a", "b")
df.createTempView("t1")

spark.sql("SELECT t1.a FROM t1 WHERE RANK() OVER(ORDER BY t1.b) = 
1").explain(true)
spark.sql("SELECT t1.a FROM t1 WHERE RANK() OVER(ORDER BY t1.b) = 
1").show(false)
{code}

{noformat}
Exception in thread "main" java.lang.UnsupportedOperationException: Cannot 
evaluate expression: rank(input[1, int, false]) windowspecdefinition(input[1, 
int, false] ASC NULLS FIRST, specifiedwindowframe(RowFrame, 
unboundedpreceding$(), currentrow$()))
at 
org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:261)
at 
org.apache.spark.sql.catalyst.expressions.WindowExpression.doGenCode(windowExpressions.scala:278)
at 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108)
at 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
at scala.Option.getOrElse(Option.scala:121)
...
{noformat}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-24575) Prohibit window expressions inside WHERE and HAVING clauses

2018-06-17 Thread Anton Okolnychyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-24575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-24575:
-
Description: 
Why window functions inside WHERE and HAVING clauses should be prohibited is 
described 
[here|https://stackoverflow.com/questions/13997177/why-no-windowed-functions-in-where-clauses].

Spark, on the other hand, does not handle this explicitly and will fail with 
non-descriptive exceptions.
{code}
val df = Seq((1, 2), (1, 3), (2, 4), (5, 5)).toDF("a", "b")
df.createTempView("t1")

spark.sql("SELECT t1.a FROM t1 WHERE RANK() OVER(ORDER BY t1.b) = 
1").show(false)
{code}
{noformat}
Exception in thread "main" java.lang.UnsupportedOperationException: Cannot 
evaluate expression: rank(input[1, int, false]) windowspecdefinition(input[1, 
int, false] ASC NULLS FIRST, specifiedwindowframe(RowFrame, 
unboundedpreceding$(), currentrow$()))
at 
org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:261)
at 
org.apache.spark.sql.catalyst.expressions.WindowExpression.doGenCode(windowExpressions.scala:278)
at 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108)
at 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
at scala.Option.getOrElse(Option.scala:121)
...
{noformat}

  was:
Why window functions inside WHERE and HAVING clauses should be prohibited is 
described 
[here|https://stackoverflow.com/questions/13997177/why-no-windowed-functions-in-where-clauses].

Spark, on the other hand, does not handle this explicitly and will fail with 
non-descriptive exceptions.


{code:scala}
val df = Seq((1, 2), (1, 3), (2, 4), (5, 5)).toDF("a", "b")
df.createTempView("t1")

spark.sql("SELECT t1.a FROM t1 WHERE RANK() OVER(ORDER BY t1.b) = 
1").explain(true)
spark.sql("SELECT t1.a FROM t1 WHERE RANK() OVER(ORDER BY t1.b) = 
1").show(false)
{code}

{noformat}
Exception in thread "main" java.lang.UnsupportedOperationException: Cannot 
evaluate expression: rank(input[1, int, false]) windowspecdefinition(input[1, 
int, false] ASC NULLS FIRST, specifiedwindowframe(RowFrame, 
unboundedpreceding$(), currentrow$()))
at 
org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:261)
at 
org.apache.spark.sql.catalyst.expressions.WindowExpression.doGenCode(windowExpressions.scala:278)
at 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108)
at 
org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
at scala.Option.getOrElse(Option.scala:121)
...
{noformat}



> Prohibit window expressions inside WHERE and HAVING clauses
> ---
>
> Key: SPARK-24575
> URL: https://issues.apache.org/jira/browse/SPARK-24575
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Anton Okolnychyi
>Priority: Minor
> Fix For: 2.4.0
>
>
> Why window functions inside WHERE and HAVING clauses should be prohibited is 
> described 
> [here|https://stackoverflow.com/questions/13997177/why-no-windowed-functions-in-where-clauses].
> Spark, on the other hand, does not handle this explicitly and will fail with 
> non-descriptive exceptions.
> {code}
> val df = Seq((1, 2), (1, 3), (2, 4), (5, 5)).toDF("a", "b")
> df.createTempView("t1")
> spark.sql("SELECT t1.a FROM t1 WHERE RANK() OVER(ORDER BY t1.b) = 
> 1").show(false)
> {code}
> {noformat}
> Exception in thread "main" java.lang.UnsupportedOperationException: Cannot 
> evaluate expression: rank(input[1, int, false]) windowspecdefinition(input[1, 
> int, false] ASC NULLS FIRST, specifiedwindowframe(RowFrame, 
> unboundedpreceding$(), currentrow$()))
>   at 
> org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:261)
>   at 
> org.apache.spark.sql.catalyst.expressions.WindowExpression.doGenCode(windowExpressions.scala:278)
>   at 
> org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108)
>   at 
> org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
>   at scala.Option.getOrElse(Option.scala:121)
>   ...
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25860) Replace Literal(null, _) with FalseLiteral whenever possible

2018-10-27 Thread Anton Okolnychyi (JIRA)
Anton Okolnychyi created SPARK-25860:


 Summary: Replace Literal(null, _) with FalseLiteral whenever 
possible
 Key: SPARK-25860
 URL: https://issues.apache.org/jira/browse/SPARK-25860
 Project: Spark
  Issue Type: Improvement
  Components: Optimizer, SQL
Affects Versions: 3.0.0
Reporter: Anton Okolnychyi


We should have a new optimization rule that replaces {{Literal(null, _)}} with 
{{FalseLiteral}} in conditions in {{Join}} and {{Filter}}, predicates in 
{{If}}, conditions in {{CaseWhen}}.

The underlying idea is that those expressions evaluate to {{false}} if the 
underlying expression is {{null}} (as an example see 
{{GeneratePredicate$create}} or {{doGenCode}} and {{eval}} methods in {{If}} 
and {{CaseWhen}}). Therefore, we can replace {{Literal(null, _)}} with 
{{FalseLiteral}}, which can lead to more optimizations later on.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26203) Benchmark performance of In and InSet expressions

2018-11-28 Thread Anton Okolnychyi (JIRA)
Anton Okolnychyi created SPARK-26203:


 Summary: Benchmark performance of In and InSet expressions
 Key: SPARK-26203
 URL: https://issues.apache.org/jira/browse/SPARK-26203
 Project: Spark
  Issue Type: Test
  Components: SQL
Affects Versions: 3.0.0
Reporter: Anton Okolnychyi


{{OptimizeIn}} rule that replaces {{In}} with {{InSet}} if the number of 
possible values exceeds "spark.sql.optimizer.inSetConversionThreshold" and all 
values are literals. This was done for performance reasons to avoid O(n) time 
complexity for {{In}}.

The original optimization was done in SPARK-3711. A lot has changed after that 
(e.g., generation of Java code to evaluate expressions), so it is worth to 
measure the performance of this optimization again.

According to my local benchmarks, {{InSet}} can be up to 10x time slower than 
{{In}} due to autoboxing and other issues.

The scope of this JIRA is to benchmark every supported data type inside {{In}} 
and {{InSet}} and outline existing bottlenecks. Once we have this information, 
we can come up with solutions. 

Based on my preliminary investigation, we can do quite some optimizations, 
which quite frequently depend on a specific data type.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26203) Benchmark performance of In and InSet expressions

2018-11-28 Thread Anton Okolnychyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-26203:
-
Description: 
{{OptimizeIn}} rule replaces {{In}} with {{InSet}} if the number of possible 
values exceeds "spark.sql.optimizer.inSetConversionThreshold" and all values 
are literals. This was done for performance reasons to avoid O(n) time 
complexity for {{In}}.

The original optimization was done in SPARK-3711. A lot has changed after that 
(e.g., generation of Java code to evaluate expressions), so it is worth to 
measure the performance of this optimization again.

According to my local benchmarks, {{InSet}} can be up to 10x time slower than 
{{In}} due to autoboxing and other issues.

The scope of this JIRA is to benchmark every supported data type inside {{In}} 
and {{InSet}} and outline existing bottlenecks. Once we have this information, 
we can come up with solutions. 

Based on my preliminary investigation, we can do quite some optimizations, 
which quite frequently depend on a specific data type.


  was:
{{OptimizeIn}} rule that replaces {{In}} with {{InSet}} if the number of 
possible values exceeds "spark.sql.optimizer.inSetConversionThreshold" and all 
values are literals. This was done for performance reasons to avoid O(n) time 
complexity for {{In}}.

The original optimization was done in SPARK-3711. A lot has changed after that 
(e.g., generation of Java code to evaluate expressions), so it is worth to 
measure the performance of this optimization again.

According to my local benchmarks, {{InSet}} can be up to 10x time slower than 
{{In}} due to autoboxing and other issues.

The scope of this JIRA is to benchmark every supported data type inside {{In}} 
and {{InSet}} and outline existing bottlenecks. Once we have this information, 
we can come up with solutions. 

Based on my preliminary investigation, we can do quite some optimizations, 
which quite frequently depend on a specific data type.



> Benchmark performance of In and InSet expressions
> -
>
> Key: SPARK-26203
> URL: https://issues.apache.org/jira/browse/SPARK-26203
> Project: Spark
>  Issue Type: Test
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> {{OptimizeIn}} rule replaces {{In}} with {{InSet}} if the number of possible 
> values exceeds "spark.sql.optimizer.inSetConversionThreshold" and all values 
> are literals. This was done for performance reasons to avoid O(n) time 
> complexity for {{In}}.
> The original optimization was done in SPARK-3711. A lot has changed after 
> that (e.g., generation of Java code to evaluate expressions), so it is worth 
> to measure the performance of this optimization again.
> According to my local benchmarks, {{InSet}} can be up to 10x time slower than 
> {{In}} due to autoboxing and other issues.
> The scope of this JIRA is to benchmark every supported data type inside 
> {{In}} and {{InSet}} and outline existing bottlenecks. Once we have this 
> information, we can come up with solutions. 
> Based on my preliminary investigation, we can do quite some optimizations, 
> which quite frequently depend on a specific data type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26203) Benchmark performance of In and InSet expressions

2018-11-28 Thread Anton Okolnychyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-26203:
-
Description: 
{{OptimizeIn}} rule replaces {{In}} with {{InSet}} if the number of possible 
values exceeds "spark.sql.optimizer.inSetConversionThreshold" and all values 
are literals. This was done for performance reasons to avoid O\(n\) time 
complexity for {{In}}.

The original optimization was done in SPARK-3711. A lot has changed after that 
(e.g., generation of Java code to evaluate expressions), so it is worth to 
measure the performance of this optimization again.

According to my local benchmarks, {{InSet}} can be up to 10x time slower than 
{{In}} due to autoboxing and other issues.

The scope of this JIRA is to benchmark every supported data type inside {{In}} 
and {{InSet}} and outline existing bottlenecks. Once we have this information, 
we can come up with solutions. 

Based on my preliminary investigation, we can do quite some optimizations, 
which quite frequently depend on a specific data type.


  was:
{{OptimizeIn}} rule replaces {{In}} with {{InSet}} if the number of possible 
values exceeds "spark.sql.optimizer.inSetConversionThreshold" and all values 
are literals. This was done for performance reasons to avoid O(n) time 
complexity for {{In}}.

The original optimization was done in SPARK-3711. A lot has changed after that 
(e.g., generation of Java code to evaluate expressions), so it is worth to 
measure the performance of this optimization again.

According to my local benchmarks, {{InSet}} can be up to 10x time slower than 
{{In}} due to autoboxing and other issues.

The scope of this JIRA is to benchmark every supported data type inside {{In}} 
and {{InSet}} and outline existing bottlenecks. Once we have this information, 
we can come up with solutions. 

Based on my preliminary investigation, we can do quite some optimizations, 
which quite frequently depend on a specific data type.



> Benchmark performance of In and InSet expressions
> -
>
> Key: SPARK-26203
> URL: https://issues.apache.org/jira/browse/SPARK-26203
> Project: Spark
>  Issue Type: Test
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> {{OptimizeIn}} rule replaces {{In}} with {{InSet}} if the number of possible 
> values exceeds "spark.sql.optimizer.inSetConversionThreshold" and all values 
> are literals. This was done for performance reasons to avoid O\(n\) time 
> complexity for {{In}}.
> The original optimization was done in SPARK-3711. A lot has changed after 
> that (e.g., generation of Java code to evaluate expressions), so it is worth 
> to measure the performance of this optimization again.
> According to my local benchmarks, {{InSet}} can be up to 10x time slower than 
> {{In}} due to autoboxing and other issues.
> The scope of this JIRA is to benchmark every supported data type inside 
> {{In}} and {{InSet}} and outline existing bottlenecks. Once we have this 
> information, we can come up with solutions. 
> Based on my preliminary investigation, we can do quite some optimizations, 
> which quite frequently depend on a specific data type.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26204) Optimize InSet expression

2018-11-28 Thread Anton Okolnychyi (JIRA)
Anton Okolnychyi created SPARK-26204:


 Summary: Optimize InSet expression
 Key: SPARK-26204
 URL: https://issues.apache.org/jira/browse/SPARK-26204
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Anton Okolnychyi
 Attachments: fastutils.png

The {{InSet}} expression was introduced in SPARK-3711 to avoid O\(n\) time 
complexity in the `In` expression. As {{InSet}} relies on Scala 
{{immutable.Set}}, it introduces expensive autoboxing. As a consequence, the 
performance of {{InSet}} might be significantly slower than {{In}} even on 100+ 
values.

We need to find an approach how to optimize {{InSet}} expressions and avoid the 
cost of autoboxing.

 There are a few approaches that we can use:
 * Collections for primitive values (e.g., FastUtil,  HPPC)
 * Type specialization in Scala (would it even work for code gen in Spark?)

I tried to use {{OpenHashSet}}, which is already available in Spark and uses 
type specialization. However, I did not manage to avoid autoboxing. On the 
other hand, FastUtil did work and I saw a substantial improvement in the 
performance.

See the attached screenshot of what I experienced while testing.
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26204) Optimize InSet expression

2018-11-28 Thread Anton Okolnychyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-26204:
-
Attachment: fastutils.png

> Optimize InSet expression
> -
>
> Key: SPARK-26204
> URL: https://issues.apache.org/jira/browse/SPARK-26204
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Anton Okolnychyi
>Priority: Major
> Attachments: fastutils.png
>
>
> The {{InSet}} expression was introduced in SPARK-3711 to avoid O\(n\) time 
> complexity in the `In` expression. As {{InSet}} relies on Scala 
> {{immutable.Set}}, it introduces expensive autoboxing. As a consequence, the 
> performance of {{InSet}} might be significantly slower than {{In}} even on 
> 100+ values.
> We need to find an approach how to optimize {{InSet}} expressions and avoid 
> the cost of autoboxing.
>  There are a few approaches that we can use:
>  * Collections for primitive values (e.g., FastUtil,  HPPC)
>  * Type specialization in Scala (would it even work for code gen in Spark?)
> I tried to use {{OpenHashSet}}, which is already available in Spark and uses 
> type specialization. However, I did not manage to avoid autoboxing. On the 
> other hand, FastUtil did work and I saw a substantial improvement in the 
> performance.
> See the attached screenshot of what I experienced while testing.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26204) Optimize InSet expression

2018-11-28 Thread Anton Okolnychyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-26204:
-
Attachment: (was: fastutils.png)

> Optimize InSet expression
> -
>
> Key: SPARK-26204
> URL: https://issues.apache.org/jira/browse/SPARK-26204
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> The {{InSet}} expression was introduced in SPARK-3711 to avoid O\(n\) time 
> complexity in the `In` expression. As {{InSet}} relies on Scala 
> {{immutable.Set}}, it introduces expensive autoboxing. As a consequence, the 
> performance of {{InSet}} might be significantly slower than {{In}} even on 
> 100+ values.
> We need to find an approach how to optimize {{InSet}} expressions and avoid 
> the cost of autoboxing.
>  There are a few approaches that we can use:
>  * Collections for primitive values (e.g., FastUtil,  HPPC)
>  * Type specialization in Scala (would it even work for code gen in Spark?)
> I tried to use {{OpenHashSet}}, which is already available in Spark and uses 
> type specialization. However, I did not manage to avoid autoboxing. On the 
> other hand, FastUtil did work and I saw a substantial improvement in the 
> performance.
> See the attached screenshot of what I experienced while testing.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26204) Optimize InSet expression

2018-11-28 Thread Anton Okolnychyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-26204:
-
Attachment: heap size.png

> Optimize InSet expression
> -
>
> Key: SPARK-26204
> URL: https://issues.apache.org/jira/browse/SPARK-26204
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Anton Okolnychyi
>Priority: Major
> Attachments: heap size.png
>
>
> The {{InSet}} expression was introduced in SPARK-3711 to avoid O\(n\) time 
> complexity in the `In` expression. As {{InSet}} relies on Scala 
> {{immutable.Set}}, it introduces expensive autoboxing. As a consequence, the 
> performance of {{InSet}} might be significantly slower than {{In}} even on 
> 100+ values.
> We need to find an approach how to optimize {{InSet}} expressions and avoid 
> the cost of autoboxing.
>  There are a few approaches that we can use:
>  * Collections for primitive values (e.g., FastUtil,  HPPC)
>  * Type specialization in Scala (would it even work for code gen in Spark?)
> I tried to use {{OpenHashSet}}, which is already available in Spark and uses 
> type specialization. However, I did not manage to avoid autoboxing. On the 
> other hand, FastUtil did work and I saw a substantial improvement in the 
> performance.
> See the attached screenshot of what I experienced while testing.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26204) Optimize InSet expression

2018-11-28 Thread Anton Okolnychyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-26204:
-
Description: 
The {{InSet}} expression was introduced in SPARK-3711 to avoid O\(n\) time 
complexity in the {{In}} expression. As {{InSet}} relies on Scala 
{{immutable.Set}}, it introduces expensive autoboxing. As a consequence, the 
performance of {{InSet}} might be significantly slower than {{In}} even on 100+ 
values.

We need to find an approach how to optimize {{InSet}} expressions and avoid the 
cost of autoboxing.

 There are a few approaches that we can use:
 * Collections for primitive values (e.g., FastUtil,  HPPC)
 * Type specialization in Scala (would it even work for code gen in Spark?)

I tried to use {{OpenHashSet}}, which is already available in Spark and uses 
type specialization. However, I did not manage to avoid autoboxing. On the 
other hand, FastUtil did work and I saw a substantial improvement in the 
performance.

See the attached screenshot of what I experienced while testing.
 

  was:
The {{InSet}} expression was introduced in SPARK-3711 to avoid O\(n\) time 
complexity in the `In` expression. As {{InSet}} relies on Scala 
{{immutable.Set}}, it introduces expensive autoboxing. As a consequence, the 
performance of {{InSet}} might be significantly slower than {{In}} even on 100+ 
values.

We need to find an approach how to optimize {{InSet}} expressions and avoid the 
cost of autoboxing.

 There are a few approaches that we can use:
 * Collections for primitive values (e.g., FastUtil,  HPPC)
 * Type specialization in Scala (would it even work for code gen in Spark?)

I tried to use {{OpenHashSet}}, which is already available in Spark and uses 
type specialization. However, I did not manage to avoid autoboxing. On the 
other hand, FastUtil did work and I saw a substantial improvement in the 
performance.

See the attached screenshot of what I experienced while testing.
 


> Optimize InSet expression
> -
>
> Key: SPARK-26204
> URL: https://issues.apache.org/jira/browse/SPARK-26204
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Anton Okolnychyi
>Priority: Major
> Attachments: heap size.png
>
>
> The {{InSet}} expression was introduced in SPARK-3711 to avoid O\(n\) time 
> complexity in the {{In}} expression. As {{InSet}} relies on Scala 
> {{immutable.Set}}, it introduces expensive autoboxing. As a consequence, the 
> performance of {{InSet}} might be significantly slower than {{In}} even on 
> 100+ values.
> We need to find an approach how to optimize {{InSet}} expressions and avoid 
> the cost of autoboxing.
>  There are a few approaches that we can use:
>  * Collections for primitive values (e.g., FastUtil,  HPPC)
>  * Type specialization in Scala (would it even work for code gen in Spark?)
> I tried to use {{OpenHashSet}}, which is already available in Spark and uses 
> type specialization. However, I did not manage to avoid autoboxing. On the 
> other hand, FastUtil did work and I saw a substantial improvement in the 
> performance.
> See the attached screenshot of what I experienced while testing.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26205) Optimize In expression for bytes, shorts, ints

2018-11-28 Thread Anton Okolnychyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-26205:
-
Description: 
Currently, {{In}} expressions are compiled into a sequence of if-else 
statements, which results in O\(n\) time complexity. {{InSet}} is an optimized 
version of {{In}}, which is supposed to improve the performance if the number 
of elements is big enough. However, {{InSet}} actually degrades the performance 
in many cases due to various reasons (benchmarks will be available in 
SPARK-26203 and solutions are discussed in SPARK-26204).

The main idea of this JIRA is to make use of {{tableswitch}} and 
{{lookupswitch}} bytecode instructions. In short, we can improve our time 
complexity from O\(n\) to O\(1\) or at least O\(log n\) by using Java 
{{switch}} statements. We will have O\(1\) time complexity if our case values 
are compact and {{tableswitch}} can be used. Otherwise, {{lookupswitch}} will 
give us O\(log n\). 

An important benefit of the proposed approach is that we do not have to pay an 
extra cost for autoboxing as in case of {{InSet}}. As a consequence, we can 
substantially outperform {{InSet}} even on 250+ elements.

See 
[here|https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-3.html#jvms-3.10] 
and 
[here|https://stackoverflow.com/questions/10287700/difference-between-jvms-lookupswitch-and-tableswitch]
 for more information.

  was:
Currently, {{In}} expressions are compiled into a sequence of if-else 
statements, which results in O\(n\) time complexity. {{InSet}} is an optimized 
version of {{In}}, which is supposed to improve the performance if the number 
of elements is big enough. However, {{InSet}} actually degrades the performance 
in many cases due to various reasons (benchmarks will be available in 
SPARK-26203 and solutions are discussed in SPARK-26204).

The main idea of this JIRA is to make use of {{tableswitch}} and 
{{lookupswitch}} bytecode instructions. In short, we can improve our time 
complexity from O\(n\) to O\(1\) or at least O\(log n\) by using Java 
{{switch}} statements. We will have O\(1\) time complexity if our case values 
are compact and {{tableswitch}} can be used. Otherwise, {{lookupswitch}} will 
give us O\(log n\). 

An important benefit of the proposed approach is that we do not have to pay an 
extra cost for autoboxing as in case of {{InSet}}. As a consequence, we can 
substantially outperform {{InSet}} even on 250+ elements.

See 
[here|https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-3.html#jvms-3.10] 
and here for 
[more|https://stackoverflow.com/questions/10287700/difference-between-jvms-lookupswitch-and-tableswitch]
 information.


> Optimize In expression for bytes, shorts, ints
> --
>
> Key: SPARK-26205
> URL: https://issues.apache.org/jira/browse/SPARK-26205
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> Currently, {{In}} expressions are compiled into a sequence of if-else 
> statements, which results in O\(n\) time complexity. {{InSet}} is an 
> optimized version of {{In}}, which is supposed to improve the performance if 
> the number of elements is big enough. However, {{InSet}} actually degrades 
> the performance in many cases due to various reasons (benchmarks will be 
> available in SPARK-26203 and solutions are discussed in SPARK-26204).
> The main idea of this JIRA is to make use of {{tableswitch}} and 
> {{lookupswitch}} bytecode instructions. In short, we can improve our time 
> complexity from O\(n\) to O\(1\) or at least O\(log n\) by using Java 
> {{switch}} statements. We will have O\(1\) time complexity if our case values 
> are compact and {{tableswitch}} can be used. Otherwise, {{lookupswitch}} will 
> give us O\(log n\). 
> An important benefit of the proposed approach is that we do not have to pay 
> an extra cost for autoboxing as in case of {{InSet}}. As a consequence, we 
> can substantially outperform {{InSet}} even on 250+ elements.
> See 
> [here|https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-3.html#jvms-3.10]
>  and 
> [here|https://stackoverflow.com/questions/10287700/difference-between-jvms-lookupswitch-and-tableswitch]
>  for more information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-26205) Optimize In expression for bytes, shorts, ints

2018-11-28 Thread Anton Okolnychyi (JIRA)
Anton Okolnychyi created SPARK-26205:


 Summary: Optimize In expression for bytes, shorts, ints
 Key: SPARK-26205
 URL: https://issues.apache.org/jira/browse/SPARK-26205
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Anton Okolnychyi


Currently, {{In}} expressions are compiled into a sequence of if-else 
statements, which results in O\(n\) time complexity. {{InSet}} is an optimized 
version of {{In}}, which is supposed to improve the performance if the number 
of elements is big enough. However, {{InSet}} actually degrades the performance 
in many cases due to various reasons (benchmarks will be available in 
SPARK-26203 and solutions are discussed in SPARK-26204).

The main idea of this JIRA is to make use of {{tableswitch}} and 
{{lookupswitch}} bytecode instructions. In short, we can improve our time 
complexity from O\(n\) to O\(1\) or at least O\(log n\) by using Java 
{{switch}} statements. We will have O\(1\) time complexity if our case values 
are compact and {{tableswitch}} can be used. Otherwise, {{lookupswitch}} will 
give us O\(log n\). 

An important benefit of the proposed approach is that we do not have to pay an 
extra cost for autoboxing as in case of {{InSet}}. As a consequence, we can 
substantially outperform {{InSet}} even on 250+ elements.

See 
[here|https://docs.oracle.com/javase/specs/jvms/se7/html/jvms-3.html#jvms-3.10] 
and here for 
[more|https://stackoverflow.com/questions/10287700/difference-between-jvms-lookupswitch-and-tableswitch]
 information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23638) Spark on k8s: spark.kubernetes.initContainer.image has no effect

2018-03-11 Thread Anton Okolnychyi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16394615#comment-16394615
 ] 

Anton Okolnychyi commented on SPARK-23638:
--

Could you also share your spark-submit command? It looks you are specifying a 
custom docker image for the init container (as 
``spark.kubernetes.initContainer.image`` is different from 
``spark.kubernetes.container.image``). Are you sure you need a custom docker 
image for the init container?

In general, if you have a remote jar in --jars and specify 
``spark.kubernetes.container.image``, Spark will create an init container for 
you and you do not need to reason about it.

> Spark on k8s: spark.kubernetes.initContainer.image has no effect
> 
>
> Key: SPARK-23638
> URL: https://issues.apache.org/jira/browse/SPARK-23638
> Project: Spark
>  Issue Type: Bug
>  Components: Kubernetes
>Affects Versions: 2.3.0
> Environment: K8 server: Ubuntu 16.04
> Submission client: macOS Sierra 10.12.x
> Client Version: version.Info\{Major:"1", Minor:"9", GitVersion:"v1.9.3", 
> GitCommit:"d2835416544f298c919e2ead3be3d0864b52323b", GitTreeState:"clean", 
> BuildDate:"2018-02-07T12:22:21Z", GoVersion:"go1.9.2", Compiler:"gc", 
> Platform:"darwin/amd64"}
> Server Version: version.Info\{Major:"1", Minor:"8", GitVersion:"v1.8.3", 
> GitCommit:"f0efb3cb883751c5ffdbe6d515f3cb4fbe7b7acd", GitTreeState:"clean", 
> BuildDate:"2017-11-08T18:27:48Z", GoVersion:"go1.8.3", Compiler:"gc", 
> Platform:"linux/amd64"}
>Reporter: maheshvra
>Priority: Major
>
> Hi all - I am trying to use initContainer to download remote dependencies. To 
> begin with, I ran a test with initContainer which basically "echo hello 
> world". However, when i triggered the pod deployment via spark-submit, I did 
> not see any trace of initContainer execution in my kubernetes cluster.
>  
> {code:java}
> SPARK_DRIVER_MEMORY: 1g 
> SPARK_DRIVER_CLASS: com.bigdata.App SPARK_DRIVER_ARGS: -c 
> /opt/spark/work-dir/app/main/environments/int -w 
> ./../../workflows/workflow_main.json -e prod -n features -v off 
> SPARK_DRIVER_BIND_ADDRESS:  
> SPARK_JAVA_OPT_0: -Dspark.submit.deployMode=cluster 
> SPARK_JAVA_OPT_1: -Dspark.driver.blockManager.port=7079 
> SPARK_JAVA_OPT_2: -Dspark.app.name=fg-am00-raw12 
> SPARK_JAVA_OPT_3: 
> -Dspark.kubernetes.container.image=docker.com/cmapp/fg-am00-raw:1.0.0 
> SPARK_JAVA_OPT_4: -Dspark.app.id=spark-4fa9a5ce1b1d401fa9c1e413ff030d44 
> SPARK_JAVA_OPT_5: 
> -Dspark.jars=/opt/spark/jars/aws-java-sdk-1.7.4.jar,/opt/spark/jars/hadoop-aws-2.7.3.jar,/opt/spark/jars/guava-14.0.1.jar,/opt/spark/jars/SparkApp.jar,/opt/spark/jars/datacleanup-component-1.0-SNAPSHOT.jar
>  
> SPARK_JAVA_OPT_6: -Dspark.driver.port=7078 
> SPARK_JAVA_OPT_7: 
> -Dspark.kubernetes.initContainer.image=docker.com/cmapp/custombusybox:1.0.0 
> SPARK_JAVA_OPT_8: 
> -Dspark.kubernetes.executor.podNamePrefix=fg-am00-raw12-b1c8112b8536304ab0fc64fcc41e0615
>  
> SPARK_JAVA_OPT_9: 
> -Dspark.kubernetes.driver.pod.name=fg-am00-raw12-b1c8112b8536304ab0fc64fcc41e0615-driver
>  
> SPARK_JAVA_OPT_10: 
> -Dspark.driver.host=fg-am00-raw12-b1c8112b8536304ab0fc64fcc41e0615-driver-svc.experimental.svc
>  SPARK_JAVA_OPT_11: -Dspark.executor.instances=5 
> SPARK_JAVA_OPT_12: 
> -Dspark.hadoop.fs.s3a.server-side-encryption-algorithm=AES256 
> SPARK_JAVA_OPT_13: -Dspark.kubernetes.namespace=experimental 
> SPARK_JAVA_OPT_14: 
> -Dspark.kubernetes.authenticate.driver.serviceAccountName=experimental-service-account
>  SPARK_JAVA_OPT_15: -Dspark.master=k8s://https://bigdata
> {code}
>  
> Further, I did not see spec.initContainers section in the generated pod. 
> Please see the details below
>  
> {code:java}
>  
> {
> "kind": "Pod",
> "apiVersion": "v1",
> "metadata": {
> "name": "fg-am00-raw12-b1c8112b8536304ab0fc64fcc41e0615-driver",
> "namespace": "experimental",
> "selfLink": 
> "/api/v1/namespaces/experimental/pods/fg-am00-raw12-b1c8112b8536304ab0fc64fcc41e0615-driver",
> "uid": "adc5a50a-2342-11e8-87dc-12c5b3954044",
> "resourceVersion": "299054",
> "creationTimestamp": "2018-03-09T02:36:32Z",
> "labels": {
> "spark-app-selector": "spark-4fa9a5ce1b1d401fa9c1e413ff030d44",
> "spark-role": "driver"
> },
> "annotations": {
> "spark-app-name": "fg-am00-raw12"
> }
> },
> "spec": {
> "volumes": [
> {
> "name": "experimental-service-account-token-msmth",
> "secret": {
> "secretName": "experimental-service-account-token-msmth",
> "defaultMode": 420
> }
> }
> ],
> "containers": [
> {
> "name": "spark-kubernetes-driver",
> "image": "docker.com/cmapp/fg-am00-raw:1.0.0",
> "args": [
> "driver"
> ],
> "env": [
> {
> "name": "SPARK_DRIVER_MEMORY",
> "value": "1g"
> },
> {
> "name": "SPARK_DRIVER_CLASS",
> "value": "com.myapp.App"
> },
> {
> "name": "SPARK_DRIVER_ARGS",
> "value": "-c /

[jira] [Created] (SPARK-26706) Fix Cast$mayTruncate for bytes

2019-01-23 Thread Anton Okolnychyi (JIRA)
Anton Okolnychyi created SPARK-26706:


 Summary: Fix Cast$mayTruncate for bytes
 Key: SPARK-26706
 URL: https://issues.apache.org/jira/browse/SPARK-26706
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.3.3, 3.0.0
Reporter: Anton Okolnychyi


The logic in {{Cast$mayTruncate}} is broken for bytes.

Right now, {{mayTruncate(ByteType, LongType)}} returns {{false}} while 
{{mayTruncate(ShortType, LongType)}} returns {{true}}. Consequently, 
{{spark.range(1, 3).as[Byte]}} and {{spark.range(1, 3).as[Short]}} will behave 
differently.

Potentially, this bug can silently corrupt someone's data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-26706) Fix Cast$mayTruncate for bytes

2019-01-23 Thread Anton Okolnychyi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16750002#comment-16750002
 ] 

Anton Okolnychyi commented on SPARK-26706:
--

I am about to submit a fix.

> Fix Cast$mayTruncate for bytes
> --
>
> Key: SPARK-26706
> URL: https://issues.apache.org/jira/browse/SPARK-26706
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.3, 3.0.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> The logic in {{Cast$mayTruncate}} is broken for bytes.
> Right now, {{mayTruncate(ByteType, LongType)}} returns {{false}} while 
> {{mayTruncate(ShortType, LongType)}} returns {{true}}. Consequently, 
> {{spark.range(1, 3).as[Byte]}} and {{spark.range(1, 3).as[Short]}} will 
> behave differently.
> Potentially, this bug can silently corrupt someone's data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26706) Fix Cast$mayTruncate for bytes

2019-01-23 Thread Anton Okolnychyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-26706:
-
Affects Version/s: 2.4.1

> Fix Cast$mayTruncate for bytes
> --
>
> Key: SPARK-26706
> URL: https://issues.apache.org/jira/browse/SPARK-26706
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.3, 2.4.1, 3.0.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> The logic in {{Cast$mayTruncate}} is broken for bytes.
> Right now, {{mayTruncate(ByteType, LongType)}} returns {{false}} while 
> {{mayTruncate(ShortType, LongType)}} returns {{true}}. Consequently, 
> {{spark.range(1, 3).as[Byte]}} and {{spark.range(1, 3).as[Short]}} will 
> behave differently.
> Potentially, this bug can lead to silently corrupting someone's data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26706) Fix Cast$mayTruncate for bytes

2019-01-23 Thread Anton Okolnychyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-26706:
-
Description: 
The logic in {{Cast$mayTruncate}} is broken for bytes.

Right now, {{mayTruncate(ByteType, LongType)}} returns {{false}} while 
{{mayTruncate(ShortType, LongType)}} returns {{true}}. Consequently, 
{{spark.range(1, 3).as[Byte]}} and {{spark.range(1, 3).as[Short]}} will behave 
differently.

Potentially, this bug can lead to silently corrupting someone's data.

  was:
The logic in {{Cast$mayTruncate}} is broken for bytes.

Right now, {{mayTruncate(ByteType, LongType)}} returns {{false}} while 
{{mayTruncate(ShortType, LongType)}} returns {{true}}. Consequently, 
{{spark.range(1, 3).as[Byte]}} and {{spark.range(1, 3).as[Short]}} will behave 
differently.

Potentially, this bug can silently corrupt someone's data.


> Fix Cast$mayTruncate for bytes
> --
>
> Key: SPARK-26706
> URL: https://issues.apache.org/jira/browse/SPARK-26706
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.3, 3.0.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> The logic in {{Cast$mayTruncate}} is broken for bytes.
> Right now, {{mayTruncate(ByteType, LongType)}} returns {{false}} while 
> {{mayTruncate(ShortType, LongType)}} returns {{true}}. Consequently, 
> {{spark.range(1, 3).as[Byte]}} and {{spark.range(1, 3).as[Short]}} will 
> behave differently.
> Potentially, this bug can lead to silently corrupting someone's data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-26706) Fix Cast$mayTruncate for bytes

2019-01-23 Thread Anton Okolnychyi (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-26706:
-
Description: 
The logic in {{Cast$mayTruncate}} is broken for bytes.

Right now, {{mayTruncate(ByteType, LongType)}} returns {{false}} while 
{{mayTruncate(ShortType, LongType)}} returns {{true}}. Consequently, 
{{spark.range(1, 3).as[Byte]}} and {{spark.range(1, 3).as[Short]}} will behave 
differently.

Potentially, this bug can lead to silently corrupting someone's data.

{code}
// executes silently even though Long is converted into Byte
spark.range(Long.MaxValue - 10, Long.MaxValue).as[Byte]
  .map(b => b - 1)
  .show()
+-+
|value|
+-+
|  -12|
|  -11|
|  -10|
|   -9|
|   -8|
|   -7|
|   -6|
|   -5|
|   -4|
|   -3|
+-+
// throws an AnalysisException: Cannot up cast `id` from bigint to smallint as 
it may truncate
spark.range(Long.MaxValue - 10, Long.MaxValue).as[Short]
  .map(s => s - 1)
  .show()
{code}

  was:
The logic in {{Cast$mayTruncate}} is broken for bytes.

Right now, {{mayTruncate(ByteType, LongType)}} returns {{false}} while 
{{mayTruncate(ShortType, LongType)}} returns {{true}}. Consequently, 
{{spark.range(1, 3).as[Byte]}} and {{spark.range(1, 3).as[Short]}} will behave 
differently.

Potentially, this bug can lead to silently corrupting someone's data.


> Fix Cast$mayTruncate for bytes
> --
>
> Key: SPARK-26706
> URL: https://issues.apache.org/jira/browse/SPARK-26706
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.3, 2.4.1, 3.0.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> The logic in {{Cast$mayTruncate}} is broken for bytes.
> Right now, {{mayTruncate(ByteType, LongType)}} returns {{false}} while 
> {{mayTruncate(ShortType, LongType)}} returns {{true}}. Consequently, 
> {{spark.range(1, 3).as[Byte]}} and {{spark.range(1, 3).as[Short]}} will 
> behave differently.
> Potentially, this bug can lead to silently corrupting someone's data.
> {code}
> // executes silently even though Long is converted into Byte
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Byte]
>   .map(b => b - 1)
>   .show()
> +-+
> |value|
> +-+
> |  -12|
> |  -11|
> |  -10|
> |   -9|
> |   -8|
> |   -7|
> |   -6|
> |   -5|
> |   -4|
> |   -3|
> +-+
> // throws an AnalysisException: Cannot up cast `id` from bigint to smallint 
> as it may truncate
> spark.range(Long.MaxValue - 10, Long.MaxValue).as[Short]
>   .map(s => s - 1)
>   .show()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-30107) Expose nested schema pruning to all V2 sources

2019-12-03 Thread Anton Okolnychyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-30107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-30107:
-
Description: I think it would be great to expose the existing logic for 
nested schema pruning to all V2 sources, which is in line with the description 
of {{SupportsPushDownRequiredColumns}} . That way, all sources that are capable 
of pruning nested columns will benefit from this.  (was: I think it would be 
great to expose the existing logic for nested schema pruning to all V2 sources, 
which is in line with the description of `SupportsPushDownRequiredColumns` . 
That way, all sources that are capable of pruning nested columns will benefit 
from this.)

> Expose nested schema pruning to all V2 sources
> --
>
> Key: SPARK-30107
> URL: https://issues.apache.org/jira/browse/SPARK-30107
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> I think it would be great to expose the existing logic for nested schema 
> pruning to all V2 sources, which is in line with the description of 
> {{SupportsPushDownRequiredColumns}} . That way, all sources that are capable 
> of pruning nested columns will benefit from this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-30107) Expose nested schema pruning to all V2 sources

2019-12-03 Thread Anton Okolnychyi (Jira)
Anton Okolnychyi created SPARK-30107:


 Summary: Expose nested schema pruning to all V2 sources
 Key: SPARK-30107
 URL: https://issues.apache.org/jira/browse/SPARK-30107
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.0.0
Reporter: Anton Okolnychyi


I think it would be great to expose the existing logic for nested schema 
pruning to all V2 sources, which is in line with the description of 
`SupportsPushDownRequiredColumns` . That way, all sources that are capable of 
pruning nested columns will benefit from this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-30107) Expose nested schema pruning to all V2 sources

2019-12-03 Thread Anton Okolnychyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30107?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16986756#comment-16986756
 ] 

Anton Okolnychyi commented on SPARK-30107:
--

I'll submit a PR

> Expose nested schema pruning to all V2 sources
> --
>
> Key: SPARK-30107
> URL: https://issues.apache.org/jira/browse/SPARK-30107
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> I think it would be great to expose the existing logic for nested schema 
> pruning to all V2 sources, which is in line with the description of 
> {{SupportsPushDownRequiredColumns}} . That way, all sources that are capable 
> of pruning nested columns will benefit from this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-32276) Remove redundant sorts before repartition/repartitionByExpression/coalesce

2020-07-10 Thread Anton Okolnychyi (Jira)
Anton Okolnychyi created SPARK-32276:


 Summary: Remove redundant sorts before 
repartition/repartitionByExpression/coalesce
 Key: SPARK-32276
 URL: https://issues.apache.org/jira/browse/SPARK-32276
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.1.0
Reporter: Anton Okolnychyi


I think our {{EliminateSorts}} rule can be extended further to remove sorts 
before repartition, repartitionByExpression and coalesce nodes. Independently 
of whether we do a shuffle or not, each repartition operation will change the 
ordering and distribution of data.

That's why we should be able to rewrite {{Repartition -> Sort -> Scan}} as 
{{Repartition -> Scan}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-32276) Remove redundant sorts before repartition nodes

2020-07-10 Thread Anton Okolnychyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-32276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-32276:
-
Summary: Remove redundant sorts before repartition nodes  (was: Remove 
redundant sorts before repartition/repartitionByExpression/coalesce)

> Remove redundant sorts before repartition nodes
> ---
>
> Key: SPARK-32276
> URL: https://issues.apache.org/jira/browse/SPARK-32276
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> I think our {{EliminateSorts}} rule can be extended further to remove sorts 
> before repartition, repartitionByExpression and coalesce nodes. Independently 
> of whether we do a shuffle or not, each repartition operation will change the 
> ordering and distribution of data.
> That's why we should be able to rewrite {{Repartition -> Sort -> Scan}} as 
> {{Repartition -> Scan}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23889) DataSourceV2: Add interfaces to pass required sorting and clustering for writes

2020-03-27 Thread Anton Okolnychyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-23889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17069018#comment-17069018
 ] 

Anton Okolnychyi commented on SPARK-23889:
--

I've submitted [a 
proposal|[http://apache-spark-developers-list.1001551.n3.nabble.com/DISCUSS-SPARK-23889-DataSourceV2-required-sorting-and-clustering-for-writes-td28985.html]]
 how we can implement this to the dev list. It would be great if people 
interested in this could comment and we can start working on it.

> DataSourceV2: Add interfaces to pass required sorting and clustering for 
> writes
> ---
>
> Key: SPARK-23889
> URL: https://issues.apache.org/jira/browse/SPARK-23889
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ryan Blue
>Priority: Major
>
> From a [discussion on the dev 
> list|https://lists.apache.org/thread.html/d8bb72fc9b4be8acc3f49367bfc99cbf029194a58333eba69df49717@%3Cdev.spark.apache.org%3E],
>  there is consensus around adding interfaces to pass required sorting and 
> clustering to Spark. The proposal is to add:
> {code:java}
> interface RequiresClustering {
>   Set requiredClustering();
> }
> interface RequiresSort {
>   List requiredOrdering();
> }
> {code}
> When only {{RequiresSort}} is present, the sort would produce a global sort. 
> The partitioning introduced by that sort would be overridden by 
> {{RequiresClustering}}, making the sort local to each partition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23889) DataSourceV2: Add interfaces to pass required sorting and clustering for writes

2020-03-27 Thread Anton Okolnychyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-23889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17069018#comment-17069018
 ] 

Anton Okolnychyi edited comment on SPARK-23889 at 3/27/20, 7:59 PM:


I've submitted [a 
proposal|http://apache-spark-developers-list.1001551.n3.nabble.com/DISCUSS-SPARK-23889-DataSourceV2-required-sorting-and-clustering-for-writes-td28985.html]
 how we can implement this to the dev list. It would be great if people 
interested in this could comment and we can start working on it.


was (Author: aokolnychyi):
I've submitted [a 
proposal|[http://apache-spark-developers-list.1001551.n3.nabble.com/DISCUSS-SPARK-23889-DataSourceV2-required-sorting-and-clustering-for-writes-td28985.html]]
 how we can implement this to the dev list. It would be great if people 
interested in this could comment and we can start working on it.

> DataSourceV2: Add interfaces to pass required sorting and clustering for 
> writes
> ---
>
> Key: SPARK-23889
> URL: https://issues.apache.org/jira/browse/SPARK-23889
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ryan Blue
>Priority: Major
>
> From a [discussion on the dev 
> list|https://lists.apache.org/thread.html/d8bb72fc9b4be8acc3f49367bfc99cbf029194a58333eba69df49717@%3Cdev.spark.apache.org%3E],
>  there is consensus around adding interfaces to pass required sorting and 
> clustering to Spark. The proposal is to add:
> {code:java}
> interface RequiresClustering {
>   Set requiredClustering();
> }
> interface RequiresSort {
>   List requiredOrdering();
> }
> {code}
> When only {{RequiresSort}} is present, the sort would produce a global sort. 
> The partitioning introduced by that sort would be overridden by 
> {{RequiresClustering}}, making the sort local to each partition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33608) Handle DELETE/UPDATE/MERGE in PullupCorrelatedPredicates rule

2020-11-30 Thread Anton Okolnychyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17240744#comment-17240744
 ] 

Anton Okolnychyi commented on SPARK-33608:
--

I am going to submit a PR soon.

> Handle DELETE/UPDATE/MERGE in PullupCorrelatedPredicates rule
> -
>
> Key: SPARK-33608
> URL: https://issues.apache.org/jira/browse/SPARK-33608
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> {{PullupCorrelatedPredicates}} should handle DELETE/UPDATE/MERGE commands. 
> Right now, the rule works with filters and unary nodes only. As a result, 
> correlated predicates in DELETE/UPDATE/MERGE are not rewritten.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33608) Handle DELETE/UPDATE/MERGE in PullupCorrelatedPredicates rule

2020-11-30 Thread Anton Okolnychyi (Jira)
Anton Okolnychyi created SPARK-33608:


 Summary: Handle DELETE/UPDATE/MERGE in PullupCorrelatedPredicates 
rule
 Key: SPARK-33608
 URL: https://issues.apache.org/jira/browse/SPARK-33608
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.1.0
Reporter: Anton Okolnychyi


{{PullupCorrelatedPredicates}} should handle DELETE/UPDATE/MERGE commands. 
Right now, the rule works with filters and unary nodes only. As a result, 
correlated predicates in DELETE/UPDATE/MERGE are not rewritten.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33612) Rename earlyScanPushDownRules to be more generic

2020-11-30 Thread Anton Okolnychyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17240937#comment-17240937
 ] 

Anton Okolnychyi commented on SPARK-33612:
--

I am going to submit a PR soon.

> Rename earlyScanPushDownRules to be more generic
> 
>
> Key: SPARK-33612
> URL: https://issues.apache.org/jira/browse/SPARK-33612
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> Right now, we have a special place in the optimizer where we run rules for 
> constructing v2 scans. As time shows, we need more rewrite rules for v2 
> tables and not all of them related to reads. That's why it makes sense to 
> rename that batch into something more generic and reuse it for all v2 rewrite 
> rules.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33612) Rename earlyScanPushDownRules to be more generic

2020-11-30 Thread Anton Okolnychyi (Jira)
Anton Okolnychyi created SPARK-33612:


 Summary: Rename earlyScanPushDownRules to be more generic
 Key: SPARK-33612
 URL: https://issues.apache.org/jira/browse/SPARK-33612
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.1.0
Reporter: Anton Okolnychyi


Right now, we have a special place in the optimizer where we run rules for 
constructing v2 scans. As time shows, we need more rewrite rules for v2 tables 
and not all of them related to reads. That's why it makes sense to rename that 
batch into something more generic and reuse it for all v2 rewrite rules.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33612) Add v2SourceRewriteRules batch to Optimizer

2020-11-30 Thread Anton Okolnychyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-33612:
-
Summary: Add v2SourceRewriteRules batch to Optimizer  (was: Rename 
earlyScanPushDownRules to be more generic)

> Add v2SourceRewriteRules batch to Optimizer
> ---
>
> Key: SPARK-33612
> URL: https://issues.apache.org/jira/browse/SPARK-33612
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> Right now, we have a special place in the optimizer where we run rules for 
> constructing v2 scans. As time shows, we need more rewrite rules for v2 
> tables and not all of them related to reads. That's why it makes sense to 
> rename that batch into something more generic and reuse it for all v2 rewrite 
> rules.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33612) Add v2SourceRewriteRules batch to Optimizer

2020-11-30 Thread Anton Okolnychyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-33612:
-
Description: Right now, we have a special place in the optimizer where we 
run rules for constructing v2 scans. As time shows, we need more rewrite rules 
for v2 tables and not all of them related to reads. One option is to rename the 
current batch into something more generic but it would require changing quite 
some places. That's why it makes sense to introduce a new batch and use it for 
all data source v2 rewrites.  (was: Right now, we have a special place in the 
optimizer where we run rules for constructing v2 scans. As time shows, we need 
more rewrite rules for v2 tables and not all of them related to reads. That's 
why it makes sense to rename that batch into something more generic and reuse 
it for all v2 rewrite rules.)

> Add v2SourceRewriteRules batch to Optimizer
> ---
>
> Key: SPARK-33612
> URL: https://issues.apache.org/jira/browse/SPARK-33612
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> Right now, we have a special place in the optimizer where we run rules for 
> constructing v2 scans. As time shows, we need more rewrite rules for v2 
> tables and not all of them related to reads. One option is to rename the 
> current batch into something more generic but it would require changing quite 
> some places. That's why it makes sense to introduce a new batch and use it 
> for all data source v2 rewrites.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33612) Add v2SourceRewriteRules batch to Optimizer

2020-11-30 Thread Anton Okolnychyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-33612:
-
Description: Right now, we have a special place in the optimizer where we 
run rules for constructing v2 scans. As time shows, we need more rewrite rules 
for v2 tables and not all of them related to reads. One option is to rename the 
current batch into something more generic but it would require changing quite 
some places. Moreover, the batch contains some non-V2 logic as well. That's why 
it seems better to introduce a new batch and use it for all data source v2 
rewrites.  (was: Right now, we have a special place in the optimizer where we 
run rules for constructing v2 scans. As time shows, we need more rewrite rules 
for v2 tables and not all of them related to reads. One option is to rename the 
current batch into something more generic but it would require changing quite 
some places. That's why it makes sense to introduce a new batch and use it for 
all data source v2 rewrites.)

> Add v2SourceRewriteRules batch to Optimizer
> ---
>
> Key: SPARK-33612
> URL: https://issues.apache.org/jira/browse/SPARK-33612
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> Right now, we have a special place in the optimizer where we run rules for 
> constructing v2 scans. As time shows, we need more rewrite rules for v2 
> tables and not all of them related to reads. One option is to rename the 
> current batch into something more generic but it would require changing quite 
> some places. Moreover, the batch contains some non-V2 logic as well. That's 
> why it seems better to introduce a new batch and use it for all data source 
> v2 rewrites.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33612) Add v2SourceRewriteRules batch to Optimizer

2020-12-01 Thread Anton Okolnychyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-33612:
-
Description: Right now, we have a special place in the optimizer where we 
run rules for constructing v2 scans. As time shows, we need more rewrite rules 
for v2 tables and not all of them related to reads. One option is to rename the 
current batch into something more generic but it would require changing quite 
some places. Moreover, the batch contains some non-V2 logic as well. That's why 
it seems better to introduce a new batch and use it for all data source v2 
rewrites and beyond.  (was: Right now, we have a special place in the optimizer 
where we run rules for constructing v2 scans. As time shows, we need more 
rewrite rules for v2 tables and not all of them related to reads. One option is 
to rename the current batch into something more generic but it would require 
changing quite some places. Moreover, the batch contains some non-V2 logic as 
well. That's why it seems better to introduce a new batch and use it for all 
data source v2 rewrites.)

> Add v2SourceRewriteRules batch to Optimizer
> ---
>
> Key: SPARK-33612
> URL: https://issues.apache.org/jira/browse/SPARK-33612
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> Right now, we have a special place in the optimizer where we run rules for 
> constructing v2 scans. As time shows, we need more rewrite rules for v2 
> tables and not all of them related to reads. One option is to rename the 
> current batch into something more generic but it would require changing quite 
> some places. Moreover, the batch contains some non-V2 logic as well. That's 
> why it seems better to introduce a new batch and use it for all data source 
> v2 rewrites and beyond.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33612) Add dataSourceRewriteRules batch to Optimizer

2020-12-01 Thread Anton Okolnychyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-33612:
-
Summary: Add dataSourceRewriteRules batch to Optimizer  (was: Add 
v2SourceRewriteRules batch to Optimizer)

> Add dataSourceRewriteRules batch to Optimizer
> -
>
> Key: SPARK-33612
> URL: https://issues.apache.org/jira/browse/SPARK-33612
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> Right now, we have a special place in the optimizer where we run rules for 
> constructing v2 scans. As time shows, we need more rewrite rules for v2 
> tables and not all of them related to reads. One option is to rename the 
> current batch into something more generic but it would require changing quite 
> some places. Moreover, the batch contains some non-V2 logic as well. That's 
> why it seems better to introduce a new batch and use it for all data source 
> v2 rewrites and beyond.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33621) Add a way to inject optimization rules after expression optimization

2020-12-01 Thread Anton Okolnychyi (Jira)
Anton Okolnychyi created SPARK-33621:


 Summary: Add a way to inject optimization rules after expression 
optimization
 Key: SPARK-33621
 URL: https://issues.apache.org/jira/browse/SPARK-33621
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.1.0
Reporter: Anton Okolnychyi


{{SparkSessionExtensions}} allow us to inject optimization rules but they are 
added to operator optimization batch. There are cases when users need to run 
rules after the operator optimization batch. Currently, this is not possible.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33621) Add a way to inject optimization rules after expression optimization

2020-12-01 Thread Anton Okolnychyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33621?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17241450#comment-17241450
 ] 

Anton Okolnychyi commented on SPARK-33621:
--

I am going to submit a PR soon.

> Add a way to inject optimization rules after expression optimization
> 
>
> Key: SPARK-33621
> URL: https://issues.apache.org/jira/browse/SPARK-33621
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> {{SparkSessionExtensions}} allow us to inject optimization rules but they are 
> added to operator optimization batch. There are cases when users need to run 
> rules after the operator optimization batch. Currently, this is not possible.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33623) Add canDeleteWhere to SupportsDelete

2020-12-01 Thread Anton Okolnychyi (Jira)
Anton Okolnychyi created SPARK-33623:


 Summary: Add canDeleteWhere to SupportsDelete
 Key: SPARK-33623
 URL: https://issues.apache.org/jira/browse/SPARK-33623
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.1.0
Reporter: Anton Okolnychyi


The only way to support delete statements right now is to implement 
\{{SupportsDelete}}. According to its Javadoc, that interface is meant for 
cases when we can delete data without much effort (e.g. like deleting a 
complete partition in a Hive table). It is clear we need a more sophisticated 
API for row-level deletes. That's why it would be beneficial to add a method to 
\{{SupportsDelete}} so that Spark can check if a source can easily delete data 
with just having filters or it will need a full rewrite later on. This way, we 
have more control in the future.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33623) Add canDeleteWhere to SupportsDelete

2020-12-01 Thread Anton Okolnychyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17241469#comment-17241469
 ] 

Anton Okolnychyi commented on SPARK-33623:
--

I am going to submit a PR soon.

> Add canDeleteWhere to SupportsDelete
> 
>
> Key: SPARK-33623
> URL: https://issues.apache.org/jira/browse/SPARK-33623
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> The only way to support delete statements right now is to implement 
> \{{SupportsDelete}}. According to its Javadoc, that interface is meant for 
> cases when we can delete data without much effort (e.g. like deleting a 
> complete partition in a Hive table). It is clear we need a more sophisticated 
> API for row-level deletes. That's why it would be beneficial to add a method 
> to \{{SupportsDelete}} so that Spark can check if a source can easily delete 
> data with just having filters or it will need a full rewrite later on. This 
> way, we have more control in the future.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33624) Extend SupportsSubquery in Filter, Aggregate and Project

2020-12-01 Thread Anton Okolnychyi (Jira)
Anton Okolnychyi created SPARK-33624:


 Summary: Extend SupportsSubquery in Filter, Aggregate and Project
 Key: SPARK-33624
 URL: https://issues.apache.org/jira/browse/SPARK-33624
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.1.0
Reporter: Anton Okolnychyi


We should extend SupportsSubquery in Filter, Aggregate and Project as described 
[here|https://github.com/apache/spark/pull/30555#discussion_r55689].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33624) Extend SupportsSubquery in Filter, Aggregate and Project

2020-12-01 Thread Anton Okolnychyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17241567#comment-17241567
 ] 

Anton Okolnychyi commented on SPARK-33624:
--

I am going to submit a PR soon.

> Extend SupportsSubquery in Filter, Aggregate and Project
> 
>
> Key: SPARK-33624
> URL: https://issues.apache.org/jira/browse/SPARK-33624
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> We should extend SupportsSubquery in Filter, Aggregate and Project as 
> described 
> [here|https://github.com/apache/spark/pull/30555#discussion_r55689].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33624) Extend SupportsSubquery in Filter, Aggregate and Project

2020-12-01 Thread Anton Okolnychyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33624?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17241614#comment-17241614
 ] 

Anton Okolnychyi commented on SPARK-33624:
--

Actually, we cannot do this as we distinguish `Aggregate` and 
`SupportsSubquery` in `CheckAnalysis`.

> Extend SupportsSubquery in Filter, Aggregate and Project
> 
>
> Key: SPARK-33624
> URL: https://issues.apache.org/jira/browse/SPARK-33624
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> We should extend SupportsSubquery in Filter, Aggregate and Project as 
> described 
> [here|https://github.com/apache/spark/pull/30555#discussion_r55689].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-33621) Add a way to inject data source rewrite rules

2020-12-01 Thread Anton Okolnychyi (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-33621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Anton Okolnychyi updated SPARK-33621:
-
Summary: Add a way to inject data source rewrite rules  (was: Add a way to 
inject optimization rules after expression optimization)

> Add a way to inject data source rewrite rules
> -
>
> Key: SPARK-33621
> URL: https://issues.apache.org/jira/browse/SPARK-33621
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.1.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> {{SparkSessionExtensions}} allow us to inject optimization rules but they are 
> added to operator optimization batch. There are cases when users need to run 
> rules after the operator optimization batch. Currently, this is not possible.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33642) DataSource V2: API for row-level operations

2020-12-03 Thread Anton Okolnychyi (Jira)
Anton Okolnychyi created SPARK-33642:


 Summary: DataSource V2: API for row-level operations
 Key: SPARK-33642
 URL: https://issues.apache.org/jira/browse/SPARK-33642
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.2.0
Reporter: Anton Okolnychyi


We need to extend the Data Source V2 API with a way to report changed rows from 
DELETE/UPDATE/MERGE operations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33642) DataSource V2: API for row-level operations

2020-12-03 Thread Anton Okolnychyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17243038#comment-17243038
 ] 

Anton Okolnychyi commented on SPARK-33642:
--

The design doc is being prepared and will be attached here.

> DataSource V2: API for row-level operations
> ---
>
> Key: SPARK-33642
> URL: https://issues.apache.org/jira/browse/SPARK-33642
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> We need to extend the Data Source V2 API with a way to report changed rows 
> from DELETE/UPDATE/MERGE operations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33644) DataSource V2: Support row-level changes in UPDATE

2020-12-03 Thread Anton Okolnychyi (Jira)
Anton Okolnychyi created SPARK-33644:


 Summary: DataSource V2: Support row-level changes in UPDATE
 Key: SPARK-33644
 URL: https://issues.apache.org/jira/browse/SPARK-33644
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 3.2.0
Reporter: Anton Okolnychyi


Spark should have a way to execute a plan to find which records must be updated 
and report them to data sources.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33643) DataSource V2: Support row-level changes in DELETE

2020-12-03 Thread Anton Okolnychyi (Jira)
Anton Okolnychyi created SPARK-33643:


 Summary: DataSource V2: Support row-level changes in DELETE
 Key: SPARK-33643
 URL: https://issues.apache.org/jira/browse/SPARK-33643
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 3.2.0
Reporter: Anton Okolnychyi


Spark should have a way to execute a plan to find which records must be deleted 
and report them to data sources.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33645) DataSource V2: Support row-level changes in MERGE

2020-12-03 Thread Anton Okolnychyi (Jira)
Anton Okolnychyi created SPARK-33645:


 Summary: DataSource V2: Support row-level changes in MERGE
 Key: SPARK-33645
 URL: https://issues.apache.org/jira/browse/SPARK-33645
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 3.2.0
Reporter: Anton Okolnychyi


Spark should have a way to execute a plan to find which records must be 
deleted, updated, added as a result of executing MERGE command and report them 
to data sources.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33722) Handle DELETE in ReplaceNullWithFalseInPredicate

2020-12-09 Thread Anton Okolnychyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17246617#comment-17246617
 ] 

Anton Okolnychyi commented on SPARK-33722:
--

I'll submit a PR soon.

> Handle DELETE in ReplaceNullWithFalseInPredicate
> 
>
> Key: SPARK-33722
> URL: https://issues.apache.org/jira/browse/SPARK-33722
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> We should handle delete statements in {{ReplaceNullWithFalseInPredicate}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33722) Handle DELETE in ReplaceNullWithFalseInPredicate

2020-12-09 Thread Anton Okolnychyi (Jira)
Anton Okolnychyi created SPARK-33722:


 Summary: Handle DELETE in ReplaceNullWithFalseInPredicate
 Key: SPARK-33722
 URL: https://issues.apache.org/jira/browse/SPARK-33722
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.2.0
Reporter: Anton Okolnychyi


We should handle delete statements in {{ReplaceNullWithFalseInPredicate}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33735) Handle UPDATE in ReplaceNullWithFalseInPredicate

2020-12-10 Thread Anton Okolnychyi (Jira)
Anton Okolnychyi created SPARK-33735:


 Summary: Handle UPDATE in ReplaceNullWithFalseInPredicate
 Key: SPARK-33735
 URL: https://issues.apache.org/jira/browse/SPARK-33735
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.2.0
Reporter: Anton Okolnychyi


We need to handle update statements in {{ReplaceNullWithFalseInPredicate}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33735) Handle UPDATE in ReplaceNullWithFalseInPredicate

2020-12-10 Thread Anton Okolnychyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17247155#comment-17247155
 ] 

Anton Okolnychyi commented on SPARK-33735:
--

I'll submit a PR soon.

> Handle UPDATE in ReplaceNullWithFalseInPredicate
> 
>
> Key: SPARK-33735
> URL: https://issues.apache.org/jira/browse/SPARK-33735
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> We need to handle update statements in {{ReplaceNullWithFalseInPredicate}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33736) Handle MERGE in ReplaceNullWithFalseInPredicate

2020-12-10 Thread Anton Okolnychyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17247156#comment-17247156
 ] 

Anton Okolnychyi commented on SPARK-33736:
--

I'll submit a PR soon.

> Handle MERGE in ReplaceNullWithFalseInPredicate
> ---
>
> Key: SPARK-33736
> URL: https://issues.apache.org/jira/browse/SPARK-33736
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> We need to handle merge statements in {{ReplaceNullWithFalseInPredicate}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33736) Handle MERGE in ReplaceNullWithFalseInPredicate

2020-12-10 Thread Anton Okolnychyi (Jira)
Anton Okolnychyi created SPARK-33736:


 Summary: Handle MERGE in ReplaceNullWithFalseInPredicate
 Key: SPARK-33736
 URL: https://issues.apache.org/jira/browse/SPARK-33736
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.2.0
Reporter: Anton Okolnychyi


We need to handle merge statements in {{ReplaceNullWithFalseInPredicate}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23889) DataSourceV2: Add interfaces to pass required sorting and clustering for writes

2020-12-10 Thread Anton Okolnychyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-23889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17247204#comment-17247204
 ] 

Anton Okolnychyi commented on SPARK-23889:
--

I propose to keep the scope of this JIRA to interfaces only.

> DataSourceV2: Add interfaces to pass required sorting and clustering for 
> writes
> ---
>
> Key: SPARK-23889
> URL: https://issues.apache.org/jira/browse/SPARK-23889
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ryan Blue
>Priority: Major
>
> From a [discussion on the dev 
> list|https://lists.apache.org/thread.html/d8bb72fc9b4be8acc3f49367bfc99cbf029194a58333eba69df49717@%3Cdev.spark.apache.org%3E],
>  there is consensus around adding interfaces to pass required sorting and 
> clustering to Spark. The proposal is to add:
> {code:java}
> interface RequiresClustering {
>   Set requiredClustering();
> }
> interface RequiresSort {
>   List requiredOrdering();
> }
> {code}
> When only {{RequiresSort}} is present, the sort would produce a global sort. 
> The partitioning introduced by that sort would be overridden by 
> {{RequiresClustering}}, making the sort local to each partition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33779) DataSource V2: API to request distribution and ordering on write

2020-12-14 Thread Anton Okolnychyi (Jira)
Anton Okolnychyi created SPARK-33779:


 Summary: DataSource V2: API to request distribution and ordering 
on write
 Key: SPARK-33779
 URL: https://issues.apache.org/jira/browse/SPARK-33779
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 3.2.0
Reporter: Anton Okolnychyi


We need to have proper APIs for requesting a specific distribution and ordering 
on writes for data sources that implement the V2 interface.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-33779) DataSource V2: API to request distribution and ordering on write

2020-12-14 Thread Anton Okolnychyi (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-33779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17249176#comment-17249176
 ] 

Anton Okolnychyi commented on SPARK-33779:
--

This is a part of the work in SPARK-23889.

> DataSource V2: API to request distribution and ordering on write
> 
>
> Key: SPARK-33779
> URL: https://issues.apache.org/jira/browse/SPARK-33779
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: Anton Okolnychyi
>Priority: Major
>
> We need to have proper APIs for requesting a specific distribution and 
> ordering on writes for data sources that implement the V2 interface.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-33807) Data Source V2: Remove read specific distributions

2020-12-16 Thread Anton Okolnychyi (Jira)
Anton Okolnychyi created SPARK-33807:


 Summary: Data Source V2: Remove read specific distributions
 Key: SPARK-33807
 URL: https://issues.apache.org/jira/browse/SPARK-33807
 Project: Spark
  Issue Type: Sub-task
  Components: SQL
Affects Versions: 3.2.0
Reporter: Anton Okolnychyi


We should remove the read-specific distributions for DS V2 as discussed 
[here|https://github.com/apache/spark/pull/30706#discussion_r543059827].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



  1   2   3   >