[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...

2018-06-06 Thread eric-maynard
Github user eric-maynard commented on a diff in the pull request:

https://github.com/apache/spark/pull/21488#discussion_r193549547
  
--- Diff: external/kafka-0-10-sql/pom.xml ---
@@ -29,7 +29,7 @@
   spark-sql-kafka-0-10_2.11
   
 sql-kafka-0-10
-0.10.0.1
+2.0.0-SNAPSHOT
   
   jar
   Kafka 0.10 Source for Structured Streaming
--- End diff --

We should change this line to reflect the change too


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21470: [SPARK-24443][SQL] comparison should accept struc...

2018-06-04 Thread eric-maynard
Github user eric-maynard commented on a diff in the pull request:

https://github.com/apache/spark/pull/21470#discussion_r192820148
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -803,18 +803,60 @@ object TypeCoercion {
   e.copy(left = Cast(e.left, TimestampType))
 }
 
-  case b @ BinaryOperator(left, right) if left.dataType != 
right.dataType =>
-findTightestCommonType(left.dataType, right.dataType).map { 
commonType =>
-  if (b.inputType.acceptsType(commonType)) {
-// If the expression accepts the tightest common type, cast to 
that.
-val newLeft = if (left.dataType == commonType) left else 
Cast(left, commonType)
-val newRight = if (right.dataType == commonType) right else 
Cast(right, commonType)
-b.withNewChildren(Seq(newLeft, newRight))
-  } else {
-// Otherwise, don't do anything with the expression.
-b
-  }
-}.getOrElse(b)  // If there is no applicable conversion, leave 
expression unchanged.
+  case b @ BinaryOperator(left, right)
+  if !BinaryOperator.sameType(left.dataType, right.dataType) =>
+(left.dataType, right.dataType) match {
+  case (StructType(fields1), StructType(fields2)) =>
+val commonTypes = 
scala.collection.mutable.ArrayBuffer.empty[DataType]
+val len = fields1.length
+var i = 0
+var continue = fields1.length == fields2.length
+while (i < len && continue) {
--- End diff --

This loop could be refactored functionally, e.g.
```
val commonTypes = (fields1 zip fields2).map(f => 
findTightestCommonType(f._1, f._2))
if (commonTypes.forall(_.isDefined)) {
 . . .
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21470: [SPARK-24443][SQL] comparison should accept struc...

2018-06-04 Thread eric-maynard
Github user eric-maynard commented on a diff in the pull request:

https://github.com/apache/spark/pull/21470#discussion_r192819128
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -803,18 +803,60 @@ object TypeCoercion {
   e.copy(left = Cast(e.left, TimestampType))
 }
 
-  case b @ BinaryOperator(left, right) if left.dataType != 
right.dataType =>
-findTightestCommonType(left.dataType, right.dataType).map { 
commonType =>
-  if (b.inputType.acceptsType(commonType)) {
-// If the expression accepts the tightest common type, cast to 
that.
-val newLeft = if (left.dataType == commonType) left else 
Cast(left, commonType)
-val newRight = if (right.dataType == commonType) right else 
Cast(right, commonType)
-b.withNewChildren(Seq(newLeft, newRight))
-  } else {
-// Otherwise, don't do anything with the expression.
-b
-  }
-}.getOrElse(b)  // If there is no applicable conversion, leave 
expression unchanged.
+  case b @ BinaryOperator(left, right)
+  if !BinaryOperator.sameType(left.dataType, right.dataType) =>
+(left.dataType, right.dataType) match {
+  case (StructType(fields1), StructType(fields2)) =>
+val commonTypes = 
scala.collection.mutable.ArrayBuffer.empty[DataType]
+val len = fields1.length
+var i = 0
+var continue = fields1.length == fields2.length
+while (i < len && continue) {
+  val commonType = findTightestCommonType(fields1(i).dataType, 
fields2(i).dataType)
+  if (commonType.isDefined) {
+commonTypes += commonType.get
+  } else {
+continue = false
+  }
+  i += 1
+}
+
+if (continue) {
+  val newLeftST = new StructType(fields1.zip(commonTypes).map {
+case (f, commonType) => f.copy(dataType = commonType)
+  })
+  val newLeft = if (left.dataType == newLeftST) left else 
Cast(left, newLeftST)
+
+  val newRightST = new StructType(fields2.zip(commonTypes).map 
{
+case (f, commonType) => f.copy(dataType = commonType)
+  })
+  val newRight = if (right.dataType == newRightST) right else 
Cast(right, newRightST)
+
+  if (b.inputType.acceptsType(newLeftST) && 
b.inputType.acceptsType(newRightST)) {
+b.withNewChildren(Seq(newLeft, newRight))
+  } else {
+// type not acceptable, don't do anything with the 
expression.
+b
+  }
+} else {
+  // left struct type and right struct type have different 
number of fields, or some
+  // fields don't have a common type, don't do anything with 
the expression.
+  b
+}
+
+  case _ =>
+findTightestCommonType(left.dataType, right.dataType).map { 
commonType =>
+  if (b.inputType.acceptsType(commonType)) {
+// If the expression accepts the tightest common type, 
cast to that.
+val newLeft = if (left.dataType == commonType) left else 
Cast(left, commonType)
--- End diff --

This ternary operation seems to crop up a few times in this PR. Maybe we 
can push it out into a method?
```
private def castIfNeeded(e: Expression, possibleType: DataType): Expression 
= {
  if (e.dataType == possibleType) data else Cast(e, possibleType)
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21168: added check to ensure main method is found [SPARK...

2018-04-26 Thread eric-maynard
Github user eric-maynard commented on a diff in the pull request:

https://github.com/apache/spark/pull/21168#discussion_r184442769
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ---
@@ -675,9 +675,14 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
 val userThread = new Thread {
   override def run() {
 try {
-  mainMethod.invoke(null, userArgs.toArray)
-  finish(FinalApplicationStatus.SUCCEEDED, 
ApplicationMaster.EXIT_SUCCESS)
-  logDebug("Done running users class")
+  if(mainMethod == null) {
--- End diff --

Yes, I think you are definitely correct. It cannot be null. @vanzin is 
right, and the check should instead ensure the `main` being invoked is static. 
PR is updated, but I may decline as something is wrong with the way I am 
testing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21168: added check to ensure main method is found [SPARK...

2018-04-26 Thread eric-maynard
Github user eric-maynard commented on a diff in the pull request:

https://github.com/apache/spark/pull/21168#discussion_r184435342
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 ---
@@ -675,9 +675,14 @@ private[spark] class ApplicationMaster(args: 
ApplicationMasterArguments) extends
 val userThread = new Thread {
   override def run() {
 try {
-  mainMethod.invoke(null, userArgs.toArray)
-  finish(FinalApplicationStatus.SUCCEEDED, 
ApplicationMaster.EXIT_SUCCESS)
-  logDebug("Done running users class")
+  if(mainMethod == null) {
--- End diff --

Good question -- I was also unsure of this myself.

Ultimately I *was* able to replicate the issue described in the JIRA, this 
PR did solve the issue. Also, the NPE in the JIRA stracktrace does indeed point 
to the invocation of `mainMethod.invoke`. So tentatively I think the answer is 
'yes'


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21168: added check to ensure main method is found [SPARK...

2018-04-26 Thread eric-maynard
GitHub user eric-maynard opened a pull request:

https://github.com/apache/spark/pull/21168

added check to ensure main method is found [SPARK-23830]

## What changes were proposed in this pull request?

When a user specifies the wrong class -- or, in fact, a class instead of an 
object -- Spark throws an NPE which is not useful for debugging. This was 
reported in [SPARK-23830](https://issues.apache.org/jira/browse/SPARK-23830). 
This PR adds a check to ensure the main method was found and logs a useful 
error in the even that it's null.

## How was this patch tested?

* Unit tests + Manual testing
* The scope of the changes is very limited


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/eric-maynard/spark feature/SPARK-23830

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21168.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21168


commit 8c68dd7ff0d17e2a5d23583dac22487b292aa00b
Author: eric-maynard <emaynard@...>
Date:   2018-04-26T14:58:21Z

added check to ensure main method is found [SPARK-23830]




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org