[GitHub] spark pull request #21488: SPARK-18057 Update structured streaming kafka fro...
Github user eric-maynard commented on a diff in the pull request: https://github.com/apache/spark/pull/21488#discussion_r193549547 --- Diff: external/kafka-0-10-sql/pom.xml --- @@ -29,7 +29,7 @@ spark-sql-kafka-0-10_2.11 sql-kafka-0-10 -0.10.0.1 +2.0.0-SNAPSHOT jar Kafka 0.10 Source for Structured Streaming --- End diff -- We should change this line to reflect the change too --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21470: [SPARK-24443][SQL] comparison should accept struc...
Github user eric-maynard commented on a diff in the pull request: https://github.com/apache/spark/pull/21470#discussion_r192820148 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -803,18 +803,60 @@ object TypeCoercion { e.copy(left = Cast(e.left, TimestampType)) } - case b @ BinaryOperator(left, right) if left.dataType != right.dataType => -findTightestCommonType(left.dataType, right.dataType).map { commonType => - if (b.inputType.acceptsType(commonType)) { -// If the expression accepts the tightest common type, cast to that. -val newLeft = if (left.dataType == commonType) left else Cast(left, commonType) -val newRight = if (right.dataType == commonType) right else Cast(right, commonType) -b.withNewChildren(Seq(newLeft, newRight)) - } else { -// Otherwise, don't do anything with the expression. -b - } -}.getOrElse(b) // If there is no applicable conversion, leave expression unchanged. + case b @ BinaryOperator(left, right) + if !BinaryOperator.sameType(left.dataType, right.dataType) => +(left.dataType, right.dataType) match { + case (StructType(fields1), StructType(fields2)) => +val commonTypes = scala.collection.mutable.ArrayBuffer.empty[DataType] +val len = fields1.length +var i = 0 +var continue = fields1.length == fields2.length +while (i < len && continue) { --- End diff -- This loop could be refactored functionally, e.g. ``` val commonTypes = (fields1 zip fields2).map(f => findTightestCommonType(f._1, f._2)) if (commonTypes.forall(_.isDefined)) { . . . ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21470: [SPARK-24443][SQL] comparison should accept struc...
Github user eric-maynard commented on a diff in the pull request: https://github.com/apache/spark/pull/21470#discussion_r192819128 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -803,18 +803,60 @@ object TypeCoercion { e.copy(left = Cast(e.left, TimestampType)) } - case b @ BinaryOperator(left, right) if left.dataType != right.dataType => -findTightestCommonType(left.dataType, right.dataType).map { commonType => - if (b.inputType.acceptsType(commonType)) { -// If the expression accepts the tightest common type, cast to that. -val newLeft = if (left.dataType == commonType) left else Cast(left, commonType) -val newRight = if (right.dataType == commonType) right else Cast(right, commonType) -b.withNewChildren(Seq(newLeft, newRight)) - } else { -// Otherwise, don't do anything with the expression. -b - } -}.getOrElse(b) // If there is no applicable conversion, leave expression unchanged. + case b @ BinaryOperator(left, right) + if !BinaryOperator.sameType(left.dataType, right.dataType) => +(left.dataType, right.dataType) match { + case (StructType(fields1), StructType(fields2)) => +val commonTypes = scala.collection.mutable.ArrayBuffer.empty[DataType] +val len = fields1.length +var i = 0 +var continue = fields1.length == fields2.length +while (i < len && continue) { + val commonType = findTightestCommonType(fields1(i).dataType, fields2(i).dataType) + if (commonType.isDefined) { +commonTypes += commonType.get + } else { +continue = false + } + i += 1 +} + +if (continue) { + val newLeftST = new StructType(fields1.zip(commonTypes).map { +case (f, commonType) => f.copy(dataType = commonType) + }) + val newLeft = if (left.dataType == newLeftST) left else Cast(left, newLeftST) + + val newRightST = new StructType(fields2.zip(commonTypes).map { +case (f, commonType) => f.copy(dataType = commonType) + }) + val newRight = if (right.dataType == newRightST) right else Cast(right, newRightST) + + if (b.inputType.acceptsType(newLeftST) && b.inputType.acceptsType(newRightST)) { +b.withNewChildren(Seq(newLeft, newRight)) + } else { +// type not acceptable, don't do anything with the expression. +b + } +} else { + // left struct type and right struct type have different number of fields, or some + // fields don't have a common type, don't do anything with the expression. + b +} + + case _ => +findTightestCommonType(left.dataType, right.dataType).map { commonType => + if (b.inputType.acceptsType(commonType)) { +// If the expression accepts the tightest common type, cast to that. +val newLeft = if (left.dataType == commonType) left else Cast(left, commonType) --- End diff -- This ternary operation seems to crop up a few times in this PR. Maybe we can push it out into a method? ``` private def castIfNeeded(e: Expression, possibleType: DataType): Expression = { if (e.dataType == possibleType) data else Cast(e, possibleType) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21168: added check to ensure main method is found [SPARK...
Github user eric-maynard commented on a diff in the pull request: https://github.com/apache/spark/pull/21168#discussion_r184442769 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala --- @@ -675,9 +675,14 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends val userThread = new Thread { override def run() { try { - mainMethod.invoke(null, userArgs.toArray) - finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) - logDebug("Done running users class") + if(mainMethod == null) { --- End diff -- Yes, I think you are definitely correct. It cannot be null. @vanzin is right, and the check should instead ensure the `main` being invoked is static. PR is updated, but I may decline as something is wrong with the way I am testing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21168: added check to ensure main method is found [SPARK...
Github user eric-maynard commented on a diff in the pull request: https://github.com/apache/spark/pull/21168#discussion_r184435342 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala --- @@ -675,9 +675,14 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments) extends val userThread = new Thread { override def run() { try { - mainMethod.invoke(null, userArgs.toArray) - finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) - logDebug("Done running users class") + if(mainMethod == null) { --- End diff -- Good question -- I was also unsure of this myself. Ultimately I *was* able to replicate the issue described in the JIRA, this PR did solve the issue. Also, the NPE in the JIRA stracktrace does indeed point to the invocation of `mainMethod.invoke`. So tentatively I think the answer is 'yes' --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21168: added check to ensure main method is found [SPARK...
GitHub user eric-maynard opened a pull request: https://github.com/apache/spark/pull/21168 added check to ensure main method is found [SPARK-23830] ## What changes were proposed in this pull request? When a user specifies the wrong class -- or, in fact, a class instead of an object -- Spark throws an NPE which is not useful for debugging. This was reported in [SPARK-23830](https://issues.apache.org/jira/browse/SPARK-23830). This PR adds a check to ensure the main method was found and logs a useful error in the even that it's null. ## How was this patch tested? * Unit tests + Manual testing * The scope of the changes is very limited You can merge this pull request into a Git repository by running: $ git pull https://github.com/eric-maynard/spark feature/SPARK-23830 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21168.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21168 commit 8c68dd7ff0d17e2a5d23583dac22487b292aa00b Author: eric-maynard <emaynard@...> Date: 2018-04-26T14:58:21Z added check to ensure main method is found [SPARK-23830] --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org