[GitHub] spark issue #16894: [SPARK-17897] [SQL] [BACKPORT-2.0] Fixed IsNotNull Const...

2017-02-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16894
  
**[Test build #72734 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72734/consoleFull)**
 for PR 16894 at commit 
[`11d2684`](https://github.com/apache/spark/commit/11d2684b1def705ef72b0a64b13c93c8a09d3efc).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16894: [SPARK-17897] [SQL] [BACKPORT-2.0] Fixed IsNotNul...

2017-02-10 Thread gatorsmile
GitHub user gatorsmile opened a pull request:

https://github.com/apache/spark/pull/16894

[SPARK-17897] [SQL] [BACKPORT-2.0] Fixed IsNotNull Constraint Inference Rule

### What changes were proposed in this pull request?

This PR is to backport https://github.com/apache/spark/pull/16067 to Spark 
2.0



The `constraints` of an operator is the expressions that evaluate to `true` 
for all the rows produced. That means, the expression result should be neither 
`false` nor `unknown` (NULL). Thus, we can conclude that `IsNotNull` on all the 
constraints, which are generated by its own predicates or propagated from the 
children. The constraint can be a complex expression. For better usage of these 
constraints, we try to push down `IsNotNull` to the lowest-level expressions 
(i.e., `Attribute`). `IsNotNull` can be pushed through an expression when it is 
null intolerant. (When the input is NULL, the null-intolerant expression always 
evaluates to NULL.)

Below is the existing code we have for `IsNotNull` pushdown.
```Scala
  private def scanNullIntolerantExpr(expr: Expression): Seq[Attribute] = 
expr match {
case a: Attribute => Seq(a)
case _: NullIntolerant | IsNotNull(_: NullIntolerant) =>
  expr.children.flatMap(scanNullIntolerantExpr)
case _ => Seq.empty[Attribute]
  }
```

**`IsNotNull` itself is not null-intolerant.** It converts `null` to 
`false`. If the expression does not include any `Not`-like expression, it 
works; otherwise, it could generate a wrong result. This PR is to fix the above 
function by removing the `IsNotNull` from the inference. After the fix, when a 
constraint has a `IsNotNull` expression, we infer new attribute-specific 
`IsNotNull` constraints if and only if `IsNotNull` appears in the root. 

Without the fix, the following test case will return empty.
```Scala
val data = Seq[java.lang.Integer](1, null).toDF("key")
data.filter("not key is not null").show()
```
Before the fix, the optimized plan is like
```
== Optimized Logical Plan ==
Project [value#1 AS key#3]
+- Filter (isnotnull(value#1) && NOT isnotnull(value#1))
   +- LocalRelation [value#1]
```

After the fix, the optimized plan is like
```
== Optimized Logical Plan ==
Project [value#1 AS key#3]
+- Filter NOT isnotnull(value#1)
   +- LocalRelation [value#1]
```

### How was this patch tested?
Added a test

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/gatorsmile/spark isNotNull2.0

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16894.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16894


commit 11d2684b1def705ef72b0a64b13c93c8a09d3efc
Author: Xiao Li 
Date:   2017-02-11T07:49:17Z

fix.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16893: [SPARK-19555][SQL] Improve the performance of StringUtil...

2017-02-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16893
  
**[Test build #72733 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72733/testReport)**
 for PR 16893 at commit 
[`e68eab0`](https://github.com/apache/spark/commit/e68eab0841bde02c773fb169191361f4eb55d742).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16893: [SPARK-19555][SQL] Improve the performance of Str...

2017-02-10 Thread lins05
GitHub user lins05 opened a pull request:

https://github.com/apache/spark/pull/16893

[SPARK-19555][SQL] Improve the performance of StringUtils.escapeLikeRegex 
method

## What changes were proposed in this pull request?

Copied from [SPARK-19555](https://issues.apache.org/jira/browse/SPARK-19555)

Spark's `StringUtils.escapeLikeRegex()` method is written inefficiently, 
performing tons of object allocations due to the use `zip()`, `flatMap()` , and 
`mkString`. Instead, I think method should be rewritten in an imperative style 
using a Java string builder.

This method can become a performance bottleneck in cases where regex 
expressions are used with non-constant-foldable expressions (e.g. the regex 
expression comes from the data rather than being part of the query).

## How was this patch tested?

Existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lins05/spark 
spark-19555-improve-escape-like-regex

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16893.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16893


commit bbcdeda98c14705b6de3efab70f2c58bc4539bb9
Author: Shuai Lin 
Date:   2017-02-11T07:46:34Z

[SPARK-19555][SQL] Improve the performance of StringUtils.escapeLikeRegex




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16891: [SPARK-19318][SQL] Fix to treat JDBC connection properti...

2017-02-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16891
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72731/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16891: [SPARK-19318][SQL] Fix to treat JDBC connection properti...

2017-02-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16891
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16891: [SPARK-19318][SQL] Fix to treat JDBC connection properti...

2017-02-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16891
  
**[Test build #72731 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72731/testReport)**
 for PR 16891 at commit 
[`41d3362`](https://github.com/apache/spark/commit/41d336251cb70d7be26171c6a1f484e742ba83bd).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16067: [SPARK-17897] [SQL] Fixed IsNotNull Constraint Inference...

2017-02-10 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/16067
  
Sure, let me back port it now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-02-10 Thread kayousterhout
Github user kayousterhout commented on the issue:

https://github.com/apache/spark/pull/16620
  
Also, if you implement the new change I proposed, I think it's relatively 
straightforward to write a new test in DAGSchedulerSuite for the new behavior 
(which will be pretty similar to the test I modified in #16892).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16892: [SPARK-19560] Improve DAGScheduler tests.

2017-02-10 Thread kayousterhout
Github user kayousterhout commented on the issue:

https://github.com/apache/spark/pull/16892
  
cc @mateiz, whose test I deleted / rolled into the existing one


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16876: [SPARK-19537] Move pendingPartitions to ShuffleMa...

2017-02-10 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/16876


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16876: [SPARK-19537] Move pendingPartitions to ShuffleMapStage.

2017-02-10 Thread kayousterhout
Github user kayousterhout commented on the issue:

https://github.com/apache/spark/pull/16876
  
Thanks all for the review; merged this into master


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16876: [SPARK-19537] Move pendingPartitions to ShuffleMapStage.

2017-02-10 Thread kayousterhout
Github user kayousterhout commented on the issue:

https://github.com/apache/spark/pull/16876
  
@jinxing64 I wrote a long comment on your other PR to attempt to explain 
(my current understanding of) these issues!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

2017-02-10 Thread kayousterhout
Github user kayousterhout commented on the issue:

https://github.com/apache/spark/pull/16620
  
tl;dr I don’t think Mark’s change is quite correct, which is why the 
tests were failing.  Instead, I think we need to replace the failedEpoch 
if/else statement and the pendingPartitions update in 
DAGScheduler.handleTaskCompletion with:

`if (stageIdToStage(task.stageId).latestInfo.attemptId == 
task.stageAttemptId) {

  // This task was for the currently running attempt of the stage. Since 
the task

  // completed successfully from the perspective of the TaskSetManager, 
mark it as

  // no longer pending (the TaskSetManager may consider the task 
complete even
  // when the output needs to be ignored because the task's epoch is too 
small below).

  shuffleStage.pendingPartitions -= task.partitionId

}



if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) 
{

  logInfo(s"Ignoring possibly bogus $smt completion from executor 
$execId")

} else {

  // The epoch of the task is acceptable (i.e., the task was launched 
after the most

  // recent failure we're aware of for the executor), so mark the task's 
output as

  // available.

  shuffleStage.addOutputLoc(smt.partitionId, status)

  // Remove the task's partition from pending partitions.  This may have 
already been

  // done above, but will not have been done yet in cases where the task 
attempt was

  // from an earlier attempt of the stage (i.e., not the attempt that's 
currently

  // running).  This allows the DAGScheduler to mark the stage as 
complete when one

  // copy of each task has finished successfully, even if the currently 
active stage

  // still has tasks running.

  shuffleStage.pendingPartitions -= task.partitionId
}
`

I submitted #16892 to attempt to clarify the test case where Mark’s 
change originally failed (this PR shouldn't block on that -- that's just to 
clarify things for ourselves in the future), and also wrote a very long write 
up of what’s going on below.

—————

There are three relevant pieces of state to consider here:

(1) The tasks that the TaskSetManager (TSM) considers currently pending.  
The TSM encodes these pending tasks in its “successful” array.  When a task 
set is launched, all of its tasks are considered pending, and all of the 
entries in the successful array are False.  Tasks are no longer considered 
pending (and are marked as True in the “successful” array) if either (a) a 
copy of the task finishes successfully or (b) a copy of the task fails with a 
fetch failed (in which case the TSM assumes that the task will never complete 
successfully, because the previous stage needs to be re-run).  Additionally, a 
task that previously completed successfully can be re-marked as pending if the 
stage is a shuffle map stage, and the executor where the task ran died (this is 
because the map output needs to be re-generated, and the TSM will re-schedule 
the task).

The TSM notifies the DAGScheduler that the stage has completed if either 
(a) the stage fails (e.g., there’s a fetch failure) or (b) all of the entries 
in “successful” are true (i.e., there are no more pending tasks).

(2)  ShuffleMapStage.pendingPartitions.  This variable is used by the 
DAGScheduler to track the pending tasks for a stage, and mostly is consistent 
with the TSM’s pending tasks (described above).  When a stage begins, the 
DAGScheduler marks all of the partitions that need to be computed as pending, 
and then removes them from pendingPartitions as the TSM notifies the 
DAGScheduler that tasks have successfully completed.  When a TSM determines 
that a task needs to be re-run (because it’s a shuffle map task that ran on a 
now-dead executor), the TSM sends a Resubmitted task completion event to the 
DAGScheduler, which causes the DAGScheduler to re-add the task to 
pendingPartitions (in doing so, the DAGScheduler is keeping pendingPartitions 
consistent with the TSM’s pending tasks).

I believe there are two scenarios (currently) where 
ShuffleMapStage.pendingPartitions and the TSM’s pending tasks become 
inconsistent: 
-Scenario A (performance optimization, as discussed here already): This 
happens if a ShuffleMapStage gets re-run (e.g., because the first time it ran, 
it encountered a fetch failure, so the previous stage needed to be re-run to 
generate the missing output).  Call the original attempt #0 and the currently 
running attempt #1.  If there’s a task from attempt #0 that’s still 
running, and it is running on an executor that *was not* marked as failed (this 
is the condition captured by the failedEpoch if-statement), and it completes 
successfully, this event will be handled by the TSM for attempt #0.  When the 
DAGScheduler hears that the task completed 

[GitHub] spark issue #16892: [SPARK-19560] Improve DAGScheduler tests.

2017-02-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16892
  
**[Test build #72732 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72732/testReport)**
 for PR 16892 at commit 
[`06b26d3`](https://github.com/apache/spark/commit/06b26d3738403da30bbf2c7ef2bfdb86039baa02).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16892: [SPARK-19560] Improve DAGScheduler tests.

2017-02-10 Thread kayousterhout
GitHub user kayousterhout opened a pull request:

https://github.com/apache/spark/pull/16892

[SPARK-19560] Improve DAGScheduler tests.

This commit improves the tests that check the case when a
ShuffleMapTask completes successfully on an executor that has
failed.  This commit improves the commenting around the existing
test for this, and adds some additional checks to make it more
clear what went wrong if the tests fail (the fact that these
tests are hard to understand came up in the context of @markhamstra's
proposed fix for #16620).

This commit also removes a test that I realized tested exactly
the same functionality.

@markhamstra, I verified that the new version of the test still fails (and
in a more helpful way) for your proposed change for #16620.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kayousterhout/spark-1 SPARK-19560

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16892.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16892


commit 06b26d3738403da30bbf2c7ef2bfdb86039baa02
Author: Kay Ousterhout 
Date:   2017-02-11T06:04:11Z

[SPARK-19560] Improve DAGScheduler tests.

This commit improves the tests that check the case when a
ShuffleMapTask completes successfully on an executor that has
failed.  This commit improves the commenting around the existing
test for this, and adds some additional checks to make it more
clear what went wrong if the tests fail (the fact that these
tests are hard to understand came up in the context of #16620).

This commit also removes a test that I realized tested exactly
the same functionality.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16785: [SPARK-19443][SQL] The function to generate constraints ...

2017-02-10 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/16785
  
@cloud-fan yeah, i agreed with you and @hvanhovell.

For too slow constraint propagation, except for attacking 
`getAliasedConstraints` like this change, maybe we can have other way to 
improve the process doing constraint propagation.

If we can't, for such long lineages, I think we should use checkpointing to 
fix it like #16775.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16724: [SPARK-19352][SQL] Keep sort order of rows after externa...

2017-02-10 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/16724
  
@cloud-fan Is the current change not suitable? We can change it to only 
preserve data order when specifying partitioning and no bucketing for the 
output.

This change only adds a new constructor to `UnsafeKVExternalSorter`.  No 
other API change I think. As the data output is going through this external 
sorter, it definitely changes the data order without this change. I think we 
may not be able to preserve data order with a workaround which doesn't touch 
`UnsafeKVExternalSorter`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation

2017-02-10 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/16395#discussion_r100660551
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -0,0 +1,623 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import java.sql.{Date, Timestamp}
+
+import scala.collection.immutable.{HashSet, Map}
+import scala.collection.mutable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * @param plan a LogicalPlan node that must be an instance of Filter
+ * @param catalystConf a configuration showing if CBO is enabled
+ */
+case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) 
extends Logging {
+
+  /**
+   * We use a mutable colStats because we need to update the corresponding 
ColumnStat
+   * for a column after we apply a predicate condition.  For example, 
column c has
+   * [min, max] value as [0, 100].  In a range condition such as (c > 40 
AND c <= 50),
+   * we need to set the column's [min, max] value to [40, 100] after we 
evaluate the
+   * first condition c > 40.  We need to set the column's [min, max] value 
to [40, 50]
+   * after we evaluate the second condition c <= 50.
+   */
+  private var mutableColStats: mutable.Map[ExprId, ColumnStat] = 
mutable.Map.empty
+
+  /**
+   * Returns an option of Statistics for a Filter logical plan node.
+   * For a given compound expression condition, this method computes 
filter selectivity
+   * (or the percentage of rows meeting the filter condition), which
+   * is used to compute row count, size in bytes, and the updated 
statistics after a given
+   * predicated is applied.
+   *
+   * @return Option[Statistics] When there is no statistics collected, it 
returns None.
+   */
+  def estimate: Option[Statistics] = {
+val stats: Statistics = plan.child.stats(catalystConf)
+if (stats.rowCount.isEmpty) return None
+
+// save a mutable copy of colStats so that we can later change it 
recursively
+mutableColStats = mutable.Map(stats.attributeStats.map(kv => 
(kv._1.exprId, kv._2)).toSeq: _*)
+
+// estimate selectivity of this filter predicate
+val filterSelectivity: Double = calculateConditions(plan.condition)
+
+// attributeStats has mapping Attribute-to-ColumnStat.
+// mutableColStats has mapping ExprId-to-ColumnStat.
+// We use an ExprId-to-Attribute map to facilitate the mapping 
Attribute-to-ColumnStat
+val expridToAttrMap: Map[ExprId, Attribute] =
+  stats.attributeStats.map(kv => (kv._1.exprId, kv._1))
+// copy mutableColStats contents to an immutable AttributeMap.
+val mutableAttributeStats: mutable.Map[Attribute, ColumnStat] =
+  mutableColStats.map(kv => expridToAttrMap(kv._1) -> kv._2)
+val newColStats = AttributeMap(mutableAttributeStats.toSeq)
+
+val filteredRowCount: BigInt =
+  EstimationUtils.ceil(BigDecimal(stats.rowCount.get) * 
filterSelectivity)
+val filteredSizeInBytes: BigInt = EstimationUtils.ceil(BigDecimal(
+EstimationUtils.getOutputSize(plan.output, filteredRowCount, 
newColStats)
+))
+
+Some(stats.copy(sizeInBytes = filteredSizeInBytes, rowCount = 
Some(filteredRowCount),
+  attributeStats = newColStats))
+  }
+
+  /**
+   * Returns a percentage of rows meeting a compound condition in Filter 
node.
+   * A compound condition is decomposed into multiple single conditions 
linked with AND, OR, NOT.
+   * For logical AND conditions, we 

[GitHub] spark pull request #16228: [SPARK-17076] [SQL] Cardinality estimation for jo...

2017-02-10 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/16228#discussion_r100660535
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/Range.scala
 ---
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import java.math.{BigDecimal => JDecimal}
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types.{BooleanType, DateType, TimestampType, _}
+
+
+/** Value range of a column. */
+trait Range
+
+/** For simplicity we use decimal to unify operations of numeric ranges. */
+case class NumericRange(min: JDecimal, max: JDecimal) extends Range
+
+/**
+ * This version of Spark does not have min/max for binary/string types, we 
define their default
+ * behaviors by this class.
+ */
+class DefaultRange extends Range
+
+/** This is for columns with only null values. */
+class NullRange extends Range
+
+object Range {
+  def apply(min: Option[Any], max: Option[Any], dataType: DataType): Range 
= dataType match {
+case StringType | BinaryType => new DefaultRange()
+case _ if min.isEmpty || max.isEmpty => new NullRange()
+case _ => toNumericRange(min.get, max.get, dataType)
+  }
+
+  def isIntersected(r1: Range, r2: Range): Boolean = (r1, r2) match {
+case (_, _: DefaultRange) | (_: DefaultRange, _) =>
+  // Skip overlapping check for binary/string types
+  true
+case (_, _: NullRange) | (_: NullRange, _) =>
+  false
+case (n1: NumericRange, n2: NumericRange) =>
+  n1.min.compareTo(n2.max) <= 0 && n1.max.compareTo(n2.min) >= 0
+  }
+
+  /** This is only for two overlapped ranges. */
--- End diff --

ok I'll improve the doc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16228: [SPARK-17076] [SQL] Cardinality estimation for jo...

2017-02-10 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/16228#discussion_r100660524
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
 ---
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import scala.collection.mutable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Join, 
Statistics}
+import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._
+import org.apache.spark.sql.types.DataType
+
+
+object JoinEstimation extends Logging {
+  /**
+   * Estimate statistics after join. Return `None` if the join type is not 
supported, or we don't
+   * have enough statistics for estimation.
+   */
+  def estimate(conf: CatalystConf, join: Join): Option[Statistics] = {
+join.joinType match {
+  case Inner | Cross | LeftOuter | RightOuter | FullOuter =>
+InnerOuterEstimation(conf, join).doEstimate()
+  case LeftSemi | LeftAnti =>
+LeftSemiAntiEstimation(conf, join).doEstimate()
+  case _ =>
+logDebug(s"[CBO] Unsupported join type: ${join.joinType}")
+None
+}
+  }
+}
+
+case class InnerOuterEstimation(conf: CatalystConf, join: Join) extends 
Logging {
+
+  private val leftStats = join.left.stats(conf)
+  private val rightStats = join.right.stats(conf)
+
+  /**
+   * Estimate output size and number of rows after a join operator, and 
update output column stats.
+   */
+  def doEstimate(): Option[Statistics] = join match {
+case _ if !rowCountsExist(conf, join.left, join.right) =>
+  None
+
+case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, 
left, right) =>
+  // 1. Compute join selectivity
+  val joinKeyPairs = extractJoinKeys(leftKeys, rightKeys)
+  val selectivity = joinSelectivity(joinKeyPairs, leftStats, 
rightStats)
+
+  // 2. Estimate the number of output rows
+  val leftRows = leftStats.rowCount.get
+  val rightRows = rightStats.rowCount.get
+  val innerRows = ceil(BigDecimal(leftRows * rightRows) * selectivity)
+
+  // Make sure outputRows won't be too small based on join type.
+  val outputRows = joinType match {
+case LeftOuter =>
+  // All rows from left side should be in the result.
+  leftRows.max(innerRows)
+case RightOuter =>
+  // All rows from right side should be in the result.
+  rightRows.max(innerRows)
+case FullOuter =>
+  // Simulate full outer join as obtaining the number of elements 
in the union of two
+  // finite sets: A \cup B = A + B - A \cap B => A FOJ B = A + B - 
A IJ B.
+  // But the "inner join" part can be much larger than A \cap B, 
making the simulated
+  // result much smaller. To prevent this, we choose the larger 
one between the simulated
+  // part and the inner part.
+  (leftRows + rightRows - innerRows).max(innerRows)
+case _ =>
+  // Don't change for inner or cross join
+  innerRows
+  }
+
+  // 3. Update statistics based on the output of join
+  val intersectedStats = if (selectivity == 0) {
+AttributeMap[ColumnStat](Nil)
+  } else {
+updateIntersectedStats(joinKeyPairs, leftStats, rightStats)
+  }
+  val inputAttrStats = AttributeMap(
+leftStats.attributeStats.toSeq ++ 

[GitHub] spark pull request #16228: [SPARK-17076] [SQL] Cardinality estimation for jo...

2017-02-10 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/16228#discussion_r100660451
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
 ---
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import scala.collection.mutable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Join, 
Statistics}
+import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._
+import org.apache.spark.sql.types.DataType
+
+
+object JoinEstimation extends Logging {
+  /**
+   * Estimate statistics after join. Return `None` if the join type is not 
supported, or we don't
+   * have enough statistics for estimation.
+   */
+  def estimate(conf: CatalystConf, join: Join): Option[Statistics] = {
+join.joinType match {
+  case Inner | Cross | LeftOuter | RightOuter | FullOuter =>
+InnerOuterEstimation(conf, join).doEstimate()
+  case LeftSemi | LeftAnti =>
+LeftSemiAntiEstimation(conf, join).doEstimate()
+  case _ =>
+logDebug(s"[CBO] Unsupported join type: ${join.joinType}")
+None
+}
+  }
+}
+
+case class InnerOuterEstimation(conf: CatalystConf, join: Join) extends 
Logging {
+
+  private val leftStats = join.left.stats(conf)
+  private val rightStats = join.right.stats(conf)
+
+  /**
+   * Estimate output size and number of rows after a join operator, and 
update output column stats.
+   */
+  def doEstimate(): Option[Statistics] = join match {
+case _ if !rowCountsExist(conf, join.left, join.right) =>
+  None
+
+case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, 
left, right) =>
+  // 1. Compute join selectivity
+  val joinKeyPairs = extractJoinKeys(leftKeys, rightKeys)
+  val selectivity = joinSelectivity(joinKeyPairs, leftStats, 
rightStats)
+
+  // 2. Estimate the number of output rows
+  val leftRows = leftStats.rowCount.get
+  val rightRows = rightStats.rowCount.get
+  val innerRows = ceil(BigDecimal(leftRows * rightRows) * selectivity)
+
+  // Make sure outputRows won't be too small based on join type.
+  val outputRows = joinType match {
+case LeftOuter =>
+  // All rows from left side should be in the result.
+  leftRows.max(innerRows)
+case RightOuter =>
+  // All rows from right side should be in the result.
+  rightRows.max(innerRows)
+case FullOuter =>
+  // Simulate full outer join as obtaining the number of elements 
in the union of two
+  // finite sets: A \cup B = A + B - A \cap B => A FOJ B = A + B - 
A IJ B.
+  // But the "inner join" part can be much larger than A \cap B, 
making the simulated
+  // result much smaller. To prevent this, we choose the larger 
one between the simulated
+  // part and the inner part.
+  (leftRows + rightRows - innerRows).max(innerRows)
+case _ =>
+  // Don't change for inner or cross join
+  innerRows
+  }
+
+  // 3. Update statistics based on the output of join
+  val intersectedStats = if (selectivity == 0) {
+AttributeMap[ColumnStat](Nil)
+  } else {
+updateIntersectedStats(joinKeyPairs, leftStats, rightStats)
+  }
+  val inputAttrStats = AttributeMap(
+leftStats.attributeStats.toSeq ++ 

[GitHub] spark pull request #16228: [SPARK-17076] [SQL] Cardinality estimation for jo...

2017-02-10 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/16228#discussion_r100660417
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
 ---
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import scala.collection.mutable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Join, 
Statistics}
+import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._
+import org.apache.spark.sql.types.DataType
+
+
+object JoinEstimation extends Logging {
+  /**
+   * Estimate statistics after join. Return `None` if the join type is not 
supported, or we don't
+   * have enough statistics for estimation.
+   */
+  def estimate(conf: CatalystConf, join: Join): Option[Statistics] = {
+join.joinType match {
+  case Inner | Cross | LeftOuter | RightOuter | FullOuter =>
+InnerOuterEstimation(conf, join).doEstimate()
+  case LeftSemi | LeftAnti =>
+LeftSemiAntiEstimation(conf, join).doEstimate()
+  case _ =>
+logDebug(s"[CBO] Unsupported join type: ${join.joinType}")
+None
+}
+  }
+}
+
+case class InnerOuterEstimation(conf: CatalystConf, join: Join) extends 
Logging {
+
+  private val leftStats = join.left.stats(conf)
+  private val rightStats = join.right.stats(conf)
+
+  /**
+   * Estimate output size and number of rows after a join operator, and 
update output column stats.
+   */
+  def doEstimate(): Option[Statistics] = join match {
+case _ if !rowCountsExist(conf, join.left, join.right) =>
+  None
+
+case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, 
left, right) =>
+  // 1. Compute join selectivity
+  val joinKeyPairs = extractJoinKeys(leftKeys, rightKeys)
+  val selectivity = joinSelectivity(joinKeyPairs, leftStats, 
rightStats)
+
+  // 2. Estimate the number of output rows
+  val leftRows = leftStats.rowCount.get
+  val rightRows = rightStats.rowCount.get
+  val innerRows = ceil(BigDecimal(leftRows * rightRows) * selectivity)
+
+  // Make sure outputRows won't be too small based on join type.
+  val outputRows = joinType match {
+case LeftOuter =>
+  // All rows from left side should be in the result.
+  leftRows.max(innerRows)
+case RightOuter =>
+  // All rows from right side should be in the result.
+  rightRows.max(innerRows)
+case FullOuter =>
+  // Simulate full outer join as obtaining the number of elements 
in the union of two
+  // finite sets: A \cup B = A + B - A \cap B => A FOJ B = A + B - 
A IJ B.
--- End diff --

It's a set operation. Please ignore this, because I'll update computing 
equation for FullOuter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16228: [SPARK-17076] [SQL] Cardinality estimation for jo...

2017-02-10 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/16228#discussion_r100660406
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -340,14 +340,22 @@ case class Join(
 case _ => resolvedExceptNatural
   }
 
-  override def computeStats(conf: CatalystConf): Statistics = joinType 
match {
-case LeftAnti | LeftSemi =>
-  // LeftSemi and LeftAnti won't ever be bigger than left
-  left.stats(conf).copy()
-case _ =>
-  // make sure we don't propagate isBroadcastable in other joins, 
because
-  // they could explode the size.
-  super.computeStats(conf).copy(isBroadcastable = false)
+  override def computeStats(conf: CatalystConf): Statistics = {
+def simpleEstimation: Statistics = joinType match {
+  case LeftAnti | LeftSemi =>
+// LeftSemi and LeftAnti won't ever be bigger than left
+left.stats(conf)
+  case _ =>
+// Make sure we don't propagate isBroadcastable in other joins, 
because
+// they could explode the size.
+super.computeStats(conf).copy(isBroadcastable = false)
--- End diff --

We can't make sure we won't change the behavior of 
super.computeStats(conf), so I think it's safer to do this here.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16847: [SPARK-19318][SQL] Fix to send JDBC connection propertie...

2017-02-10 Thread sureshthalamati
Github user sureshthalamati commented on the issue:

https://github.com/apache/spark/pull/16847
  
@cloud-fan @gatorsmile 
I created two PR for the same issue , as I was not sure what is the best 
approach to fix this issue. I would appreciate your feedback. Thanks



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16891: [SPARK-19318][SQL] Fix to treat JDBC connection properti...

2017-02-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16891
  
**[Test build #72731 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72731/testReport)**
 for PR 16891 at commit 
[`41d3362`](https://github.com/apache/spark/commit/41d336251cb70d7be26171c6a1f484e742ba83bd).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16715: [Spark-18080][ML] Python API & Examples for Locality Sen...

2017-02-10 Thread jkbradley
Github user jkbradley commented on the issue:

https://github.com/apache/spark/pull/16715
  
Could you please add tag "[PYTHON]" to the PR title?
Also, please remove "Please review 
http://spark.apache.org/contributing.html before opening a pull request." from 
the PR description since that will become part of the commit message.
Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16891: [SPARK-19318][SQL] Fix to treat JDBC connection p...

2017-02-10 Thread sureshthalamati
GitHub user sureshthalamati opened a pull request:

https://github.com/apache/spark/pull/16891

[SPARK-19318][SQL] Fix to treat JDBC connection properties specified by the 
user in case-sensitive manner.

## What changes were proposed in this pull request?
The reason for test failure is that the property 
“oracle.jdbc.mapDateToTimestamp” set by the test was getting converted into 
all lower case. Oracle database expects this property in case-sensitive manner.

This test was passing in previous releases because connection properties 
were sent as user specified for the test case scenario. Fixes to handle all 
option uniformly in case-insensitive manner, converted the JDBC connection 
properties also to lower case.

This PR  enhances CaseInsensitiveMap to keep track of input case-sensitive 
keys , and uses those when creating connection properties that are passed to 
the JDBC connection. 

Alternative approach PR https://github.com/apache/spark/pull/16847  is to 
pass original input keys to JDBC data source by adding check in the  Data 
source class and handle case-insensitivity in the JDBC source code.  

## How was this patch tested?
Added new test cases to JdbcSuite , and OracleIntegrationSuite. Ran docker 
integration tests passed on my laptop, all tests passed successfully.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sureshthalamati/spark 
jdbc_case_senstivity_props_fix-SPARK-19318

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16891.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16891


commit b2eba09bff1015d9eccc15a1c2ed7c09b6a9
Author: sureshthalamati 
Date:   2017-02-09T22:36:54Z

[SPARK-19318][SQL} Fix to keep track of JDBC connection properties in the 
user specified options in case-sensitive manner.

commit 41d336251cb70d7be26171c6a1f484e742ba83bd
Author: sureshthalamati 
Date:   2017-02-11T04:28:00Z

removed unnecessary getOrelse as the map is internal one, and the should 
exist




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16870: [SPARK-19496][SQL]to_date udf to return null when input ...

2017-02-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16870
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72730/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16870: [SPARK-19496][SQL]to_date udf to return null when input ...

2017-02-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16870
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16870: [SPARK-19496][SQL]to_date udf to return null when input ...

2017-02-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16870
  
**[Test build #72730 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72730/testReport)**
 for PR 16870 at commit 
[`2dc241e`](https://github.com/apache/spark/commit/2dc241e7c83ca265d0037a1144393478dd2b66aa).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16847: [SPARK-19318][SQL] Fix to send JDBC connection propertie...

2017-02-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16847
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72728/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16847: [SPARK-19318][SQL] Fix to send JDBC connection propertie...

2017-02-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16847
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16847: [SPARK-19318][SQL] Fix to send JDBC connection propertie...

2017-02-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16847
  
**[Test build #72728 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72728/testReport)**
 for PR 16847 at commit 
[`81e9060`](https://github.com/apache/spark/commit/81e906052ea766fe4f228b84db5caea22860dae4).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files

2017-02-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16386
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72729/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files

2017-02-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16386
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files

2017-02-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16386
  
**[Test build #72729 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72729/testReport)**
 for PR 16386 at commit 
[`b0a5cc8`](https://github.com/apache/spark/commit/b0a5cc8f0c6475b4fa603036f11a9c1248295b40).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16826: [WIP][SPARK-19540][SQL] Add ability to clone SparkSessio...

2017-02-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16826
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72727/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16826: [WIP][SPARK-19540][SQL] Add ability to clone SparkSessio...

2017-02-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16826
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16826: [WIP][SPARK-19540][SQL] Add ability to clone SparkSessio...

2017-02-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16826
  
**[Test build #72727 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72727/testReport)**
 for PR 16826 at commit 
[`6da6bda`](https://github.com/apache/spark/commit/6da6bdadc526d71ce887b0bcbef57272a713d6b2).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16871: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support

2017-02-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16871
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72724/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16871: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support

2017-02-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16871
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16871: [SPARK-19550][BUILD][CORE][WIP] Remove Java 7 support

2017-02-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16871
  
**[Test build #72724 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72724/testReport)**
 for PR 16871 at commit 
[`7f8a2cb`](https://github.com/apache/spark/commit/7f8a2cbc984d61c0bbbc98cc4b4f3355bf8ecb73).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16889: [SPARK-17668][SQL] Use Expressions for conversions to/fr...

2017-02-10 Thread kayousterhout
Github user kayousterhout commented on the issue:

https://github.com/apache/spark/pull/16889
  
FYI the test that failed here seems to be flaky and probably not related to 
this PR: https://issues.apache.org/jira/browse/SPARK-19559


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16870: [SPARK-19496][SQL]to_date udf to return null when input ...

2017-02-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16870
  
**[Test build #72730 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72730/testReport)**
 for PR 16870 at commit 
[`2dc241e`](https://github.com/apache/spark/commit/2dc241e7c83ca265d0037a1144393478dd2b66aa).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15009: [SPARK-17443][SPARK-11035] Stop Spark Application if lau...

2017-02-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15009
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72723/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15009: [SPARK-17443][SPARK-11035] Stop Spark Application if lau...

2017-02-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15009
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16870: [SPARK-19496][SQL]to_date udf to return null when...

2017-02-10 Thread windpiger
Github user windpiger commented on a diff in the pull request:

https://github.com/apache/spark/pull/16870#discussion_r100656533
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala
 ---
@@ -465,15 +465,15 @@ case class DateFormatClass(left: Expression, right: 
Expression, timeZoneId: Opti
 copy(timeZoneId = Option(timeZoneId))
 
   override protected def nullSafeEval(timestamp: Any, format: Any): Any = {
-val df = DateTimeUtils.newDateFormat(format.toString, timeZone)
+val df = DateTimeUtils.newDateFormat(format.toString, timeZone, 
isLenient = true)
 UTF8String.fromString(df.format(new 
java.util.Date(timestamp.asInstanceOf[Long] / 1000)))
   }
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
 val dtu = DateTimeUtils.getClass.getName.stripSuffix("$")
 val tz = ctx.addReferenceMinorObj(timeZone)
 defineCodeGen(ctx, ev, (timestamp, format) => {
-  s"""UTF8String.fromString($dtu.newDateFormat($format.toString(), $tz)
+  s"""UTF8String.fromString($dtu.newDateFormat($format.toString(), 
$tz, false)
--- End diff --

oh, sorry let me fix it, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15009: [SPARK-17443][SPARK-11035] Stop Spark Application if lau...

2017-02-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15009
  
**[Test build #72723 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72723/testReport)**
 for PR 15009 at commit 
[`cc2c0be`](https://github.com/apache/spark/commit/cc2c0be9aab069b827fc6d46fb8201a1902c87ba).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-10 Thread kayousterhout
Github user kayousterhout commented on the issue:

https://github.com/apache/spark/pull/16686
  
I recently have noticed a few flaky test failures of 
KafkaSourceSuite.subscribing topic by pattern with topic deletions (e.g., 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72725/testReport/org.apache.spark.sql.kafka010/KafkaSourceSuite/subscribing_topic_by_pattern_with_topic_deletions/).
 Is it possible those were caused by this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16890: when colum is use alias ,the order by result is wrong

2017-02-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16890
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16890: when colum is use alias ,the order by result is w...

2017-02-10 Thread muyannian
GitHub user muyannian opened a pull request:

https://github.com/apache/spark/pull/16890

when colum is use alias ,the order by result is wrong

i write two sql.
the first order by  result is wrong, but the second order by  result is 
right,that may be a bug?

---sql 1
select amtlong as yasname ,usernick,count(*) as cnt,sum(amtdouble) as amt  
from ydb_import_txt  group by usernick, amtlong 
order by  amt desc,cnt,nick,amtlong limit 230
select amtlong as yasname ,usernick,count(*) as cnt,sum(amtdouble) as amt  
from ydb_import_txt  group by usernick, amtlong 
order by  amt desc,cnt,nick,amtlong limit 230
220@ 9189   奚鸿煊  1   99.97
221@ 7105   奚鸿煊  1   99.97

--sql2 
select amtlong as yasname ,usernick,count(*) as cnt,sum(amtdouble) as amt  
from ydb_import_txt  group by usernick, amtlong 
order by  amt desc,cnt,nick,amtlong limit 230
select amtlong as yasname ,usernick,count(*) as cnt,sum(amtdouble) as amt  
from ydb_import_txt  group by usernick, amtlong 
order by  amt desc,cnt,nick,yasname  limit 230
220@ 7105   奚鸿煊  1   99.97
221@ 9189   奚鸿煊  1   99.97


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apache/spark master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16890.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16890


commit 923e594844a7ad406195b91877f0fb374d5a454b
Author: Dongjoon Hyun 
Date:   2017-01-08T02:55:01Z

[SPARK-18941][SQL][DOC] Add a new behavior document on `CREATE/DROP TABLE` 
with `LOCATION`

## What changes were proposed in this pull request?

This PR adds a new behavior change description on `CREATE TABLE ... 
LOCATION` at `sql-programming-guide.md` clearly under `Upgrading From Spark SQL 
1.6 to 2.0`. This change is introduced at Apache Spark 2.0.0 as 
[SPARK-15276](https://issues.apache.org/jira/browse/SPARK-15276).

## How was this patch tested?

```
SKIP_API=1 jekyll build
```

**Newly Added Description**
https://cloud.githubusercontent.com/assets/9700541/21743606/7efe2b12-d4ba-11e6-8a0d-551222718ea2.png;>

Author: Dongjoon Hyun 

Closes #16400 from dongjoon-hyun/SPARK-18941.

commit 6b6b555a1e667a9f03dfe4a21e56c513a353a58d
Author: Yanbo Liang 
Date:   2017-01-08T09:10:36Z

[SPARK-18862][SPARKR][ML] Split SparkR mllib.R into multiple files

## What changes were proposed in this pull request?
SparkR ```mllib.R``` is getting bigger as we add more ML wrappers, I'd like 
to split it into multiple files to make us easy to maintain:
* mllib_classification.R
* mllib_clustering.R
* mllib_recommendation.R
* mllib_regression.R
* mllib_stat.R
* mllib_tree.R
* mllib_utils.R

Note: Only reorg, no actual code change.

## How was this patch tested?
Existing tests.

Author: Yanbo Liang 

Closes #16312 from yanboliang/spark-18862.

commit cd1d00adaff65e8adfebc2342dd422c53f98166b
Author: zuotingbing 
Date:   2017-01-08T09:29:01Z

[SPARK-19026] SPARK_LOCAL_DIRS(multiple directories on different disks) 
cannot be deleted

JIRA Issue: https://issues.apache.org/jira/browse/SPARK-19026

SPARK_LOCAL_DIRS (Standalone) can  be a comma-separated list of multiple 
directories on different disks, e.g. SPARK_LOCAL_DIRS=/dir1,/dir2,/dir3, if 
there is a IOExecption when create sub directory on dir3 , the sub directory 
which have been created successfully on dir1 and dir2 cannot be deleted anymore 
when the application finishes.
So we should catch the IOExecption at Utils.createDirectory  , otherwise 
the variable "appDirectories(appId)" which the function maybeCleanupApplication 
calls will not be set then dir1 and dir2 will not be cleaned up .

Author: zuotingbing 

Closes #16439 from zuotingbing/master.

commit 4351e62207957bec663108a571cff2bfaaa9e7d5
Author: Dilip Biswal 
Date:   2017-01-08T22:09:07Z

[SPARK-19093][SQL] Cached tables are not used in SubqueryExpression

## What changes were proposed in this pull request?
Consider the plans inside subquery expressions while looking up cache 
manager to make
use of cached data. Currently CacheManager.useCachedData does not consider 
the
subquery expressions in the plan.

SQL
```
select * from rows where not exists (select * from rows)
```
Before the fix
```
== Optimized Logical Plan ==
Join LeftAnti
:- InMemoryRelation [_1#3775, _2#3776], true, 1, StorageLevel(disk, 
memory, deserialized, 

[GitHub] spark issue #15604: [SPARK-18066] [CORE] [TESTS] Add Pool usage policies tes...

2017-02-10 Thread erenavsarogullari
Github user erenavsarogullari commented on the issue:

https://github.com/apache/spark/pull/15604
  
build failure looks unrelated (due to timeout at KafkaSourceSuite level)

Jenkins, retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16868: [SPARK-19115] [SQL] Supporting Create External Table Lik...

2017-02-10 Thread ouyangxiaochen
Github user ouyangxiaochen commented on the issue:

https://github.com/apache/spark/pull/16868
  
I have run test cases successfully. Please run the test cases again.Thanks 
a lot! @SparkQA


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15604: [SPARK-18066] [CORE] [TESTS] Add Pool usage policies tes...

2017-02-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15604
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15604: [SPARK-18066] [CORE] [TESTS] Add Pool usage policies tes...

2017-02-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/15604
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72725/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15604: [SPARK-18066] [CORE] [TESTS] Add Pool usage policies tes...

2017-02-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/15604
  
**[Test build #72725 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72725/testReport)**
 for PR 15604 at commit 
[`f84abe7`](https://github.com/apache/spark/commit/f84abe74dd4426cf520b38f6380dff693dacc92b).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files

2017-02-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16386
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files

2017-02-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16386
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72726/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files

2017-02-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16386
  
**[Test build #72726 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72726/testReport)**
 for PR 16386 at commit 
[`0f4686d`](https://github.com/apache/spark/commit/0f4686d39890e84dd8542297eeb688746f80ee55).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16777: [SPARK-19435][SQL] Type coercion between ArrayTypes

2017-02-10 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/16777
  
Let me check other DBMSs and back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on the issue:

https://github.com/apache/spark/pull/16386
  
@cloud-fan I just pushed a few more changes to address some of your 
comments. I'll be back later next week to continue work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16386: [SPARK-18352][SQL] Support parsing multiline json files

2017-02-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16386
  
**[Test build #72729 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72729/testReport)**
 for PR 16386 at commit 
[`b0a5cc8`](https://github.com/apache/spark/commit/b0a5cc8f0c6475b4fa603036f11a9c1248295b40).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100653879
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.save(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  spark
+.createDataFrame(Seq(Tuple1("{}{invalid}")))
+.coalesce(1)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  // no corrupt record column should be created
+  assert(jsonDF.schema === StructType(Seq()))
+  // only the first object should be read
+  assert(jsonDF.count() === 1)
+}
+  }
+
+  test("SPARK-18352: Handle corrupt documents") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val corruptRecordCount = additionalCorruptRecords.count().toInt
+  assert(corruptRecordCount === 5)
+
+  additionalCorruptRecords
+.toDF("value")
+.repartition(corruptRecordCount * 4)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
--- End diff --

Probably, but I'm out of time for today. I'll be out for a few days and can 
pick this back up on Thursday or Friday next week.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100653835
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.save(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  spark
+.createDataFrame(Seq(Tuple1("{}{invalid}")))
+.coalesce(1)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  // no corrupt record column should be created
+  assert(jsonDF.schema === StructType(Seq()))
+  // only the first object should be read
+  assert(jsonDF.count() === 1)
+}
+  }
+
+  test("SPARK-18352: Handle corrupt documents") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val corruptRecordCount = additionalCorruptRecords.count().toInt
+  assert(corruptRecordCount === 5)
+
+  additionalCorruptRecords
+.toDF("value")
+.repartition(corruptRecordCount * 4)
--- End diff --

Probably, but I'm out of time for today. I'll be out for a few days and can 
pick this back up on Thursday or Friday next week.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100653757
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.save(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  spark
+.createDataFrame(Seq(Tuple1("{}{invalid}")))
+.coalesce(1)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  // no corrupt record column should be created
+  assert(jsonDF.schema === StructType(Seq()))
+  // only the first object should be read
+  assert(jsonDF.count() === 1)
+}
+  }
+
+  test("SPARK-18352: Handle corrupt documents") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val corruptRecordCount = additionalCorruptRecords.count().toInt
+  assert(corruptRecordCount === 5)
+
+  additionalCorruptRecords
+.toDF("value")
+.repartition(corruptRecordCount * 4)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  assert(jsonDF.count() === corruptRecordCount)
+  assert(jsonDF.schema === new StructType()
+.add("_corrupt_record", StringType)
+.add("dummy", StringType))
+  val counts = jsonDF
+.join(
+  additionalCorruptRecords.toDF("value"),
+  F.regexp_replace($"_corrupt_record", "(^\\s+|\\s+$)", "") === 
F.trim($"value"),
--- End diff --

This is the same as `F.trim` but it works on all whitespace characters, not 
just 0x20 (ascii space)... if trim removed line endings and not just spaces it 
would have worked instead.


---
If your project is set up for it, you 

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100653580
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.save(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  spark
+.createDataFrame(Seq(Tuple1("{}{invalid}")))
+.coalesce(1)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  // no corrupt record column should be created
+  assert(jsonDF.schema === StructType(Seq()))
+  // only the first object should be read
+  assert(jsonDF.count() === 1)
+}
+  }
+
+  test("SPARK-18352: Handle corrupt documents") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val corruptRecordCount = additionalCorruptRecords.count().toInt
+  assert(corruptRecordCount === 5)
+
+  additionalCorruptRecords
+.toDF("value")
+.repartition(corruptRecordCount * 4)
--- End diff --

Yep, I'll fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16880: [SPARK-19542][SS]Delete the temp checkpoint if a query i...

2017-02-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16880
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72721/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16880: [SPARK-19542][SS]Delete the temp checkpoint if a query i...

2017-02-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16880
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16880: [SPARK-19542][SS]Delete the temp checkpoint if a query i...

2017-02-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16880
  
**[Test build #72721 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72721/testReport)**
 for PR 16880 at commit 
[`cae981f`](https://github.com/apache/spark/commit/cae981f1ab9ee6f3d0d3cd4b53ccf6431551a0c0).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100652445
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.json
+
+import java.io.InputStream
+
+import scala.reflect.ClassTag
+
+import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, 
TextInputFormat}
+
+import org.apache.spark.TaskContext
+import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
+import org.apache.spark.rdd.{BinaryFileRDD, RDD}
+import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, 
JacksonParser, JSONOptions}
+import org.apache.spark.sql.execution.datasources.{CodecStreams, 
HadoopFileLinesReader, PartitionedFile}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.Utils
+
+/**
+ * Common functions for parsing JSON files
+ * @tparam T A datatype containing the unparsed JSON, such as [[Text]] or 
[[String]]
+ */
+abstract class JsonDataSource[T] extends Serializable {
+  def isSplitable: Boolean
+
+  /**
+   * Parse a [[PartitionedFile]] into 0 or more [[InternalRow]] instances
+   */
+  def readFile(
+conf: Configuration,
+file: PartitionedFile,
+parser: JacksonParser): Iterator[InternalRow]
+
+  /**
+   * Create an [[RDD]] that handles the preliminary parsing of [[T]] 
records
+   */
+  protected def createBaseRdd(
+sparkSession: SparkSession,
+inputPaths: Seq[FileStatus]): RDD[T]
+
+  /**
+   * A generic wrapper to invoke the correct [[JsonFactory]] method to 
allocate a [[JsonParser]]
+   * for an instance of [[T]]
+   */
+  def createParser(jsonFactory: JsonFactory, value: T): JsonParser
+
+  final def infer(
+  sparkSession: SparkSession,
+  inputPaths: Seq[FileStatus],
+  parsedOptions: JSONOptions): Option[StructType] = {
+if (inputPaths.nonEmpty) {
+  val jsonSchema = JsonInferSchema.infer(
+createBaseRdd(sparkSession, inputPaths),
+parsedOptions,
+createParser)
+  checkConstraints(jsonSchema)
+  Some(jsonSchema)
+} else {
+  None
+}
+  }
+
+  /** Constraints to be imposed on schema to be stored. */
+  private def checkConstraints(schema: StructType): Unit = {
--- End diff --

IIRC this was a check added because some of the backends (maybe parquet?) 
were writing corrupt files... if this is checked globally now it should be fine 
to remove


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16847: [SPARK-19318][SQL] Fix to send JDBC connection propertie...

2017-02-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16847
  
**[Test build #72728 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72728/testReport)**
 for PR 16847 at commit 
[`81e9060`](https://github.com/apache/spark/commit/81e906052ea766fe4f228b84db5caea22860dae4).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16847: [SPARK-19318][SQL] Fix to send JDBC connection propertie...

2017-02-10 Thread dilipbiswal
Github user dilipbiswal commented on the issue:

https://github.com/apache/spark/pull/16847
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100652259
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
--- End diff --

Right, it's a single value that spans multiple lines. The Python test is 
reusing some Python specific test data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100652192
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
--- End diff --

I'm not sure what you mean?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100651990
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
--- End diff --

Right


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100651910
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
--- End diff --

For a reason that is no longer relevant, I'll switch this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100651706
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala
 ---
@@ -79,7 +80,7 @@ private[sql] object JsonInferSchema {
 
   private[this] val structFieldComparator = new Comparator[StructField] {
 override def compare(o1: StructField, o2: StructField): Int = {
-  o1.name.compare(o2.name)
+  o1.name.compareTo(o2.name)
--- End diff --

`.compare` is a very expensive way of comparing two strings.

`compare`:
```
  public int compare(org.apache.spark.sql.types.StructField, 
org.apache.spark.sql.types.StructField);
Code:
   0: new   #14 // class 
scala/collection/immutable/StringOps
   3: dup
   4: getstatic #20 // Field 
scala/Predef$.MODULE$:Lscala/Predef$;
   7: aload_1
   8: invokevirtual #26 // Method 
org/apache/spark/sql/types/StructField.name:()Ljava/lang/String;
  11: invokevirtual #30 // Method 
scala/Predef$.augmentString:(Ljava/lang/String;)Ljava/lang/String;
  14: invokespecial #34 // Method 
scala/collection/immutable/StringOps."":(Ljava/lang/String;)V
  17: aload_2
  18: invokevirtual #26 // Method 
org/apache/spark/sql/types/StructField.name:()Ljava/lang/String;
  21: invokevirtual #37 // Method 
scala/collection/immutable/StringOps.compare:(Ljava/lang/String;)I
  24: ireturn
```

`compareTo`:
```
  public int compare(org.apache.spark.sql.types.StructField, 
org.apache.spark.sql.types.StructField);
Code:
   0: aload_1
   1: invokevirtual #18 // Method 
org/apache/spark/sql/types/StructField.name:()Ljava/lang/String;
   4: aload_2
   5: invokevirtual #18 // Method 
org/apache/spark/sql/types/StructField.name:()Ljava/lang/String;
   8: invokevirtual #24 // Method 
java/lang/String.compareTo:(Ljava/lang/String;)I
  11: ireturn
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100651385
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.save(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  spark
+.createDataFrame(Seq(Tuple1("{}{invalid}")))
+.coalesce(1)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  // no corrupt record column should be created
+  assert(jsonDF.schema === StructType(Seq()))
+  // only the first object should be read
+  assert(jsonDF.count() === 1)
+}
+  }
+
+  test("SPARK-18352: Handle corrupt documents") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val corruptRecordCount = additionalCorruptRecords.count().toInt
+  assert(corruptRecordCount === 5)
+
+  additionalCorruptRecords
+.toDF("value")
+.repartition(corruptRecordCount * 4)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  assert(jsonDF.count() === corruptRecordCount)
+  assert(jsonDF.schema === new StructType()
+.add("_corrupt_record", StringType)
+.add("dummy", StringType))
+  val counts = jsonDF
+.join(
+  additionalCorruptRecords.toDF("value"),
+  F.regexp_replace($"_corrupt_record", "(^\\s+|\\s+$)", "") === 
F.trim($"value"),
--- End diff --

why we need to remove all white spaces in `_corrupt_record`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and 

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100651178
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.save(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  spark
+.createDataFrame(Seq(Tuple1("{}{invalid}")))
+.coalesce(1)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  // no corrupt record column should be created
+  assert(jsonDF.schema === StructType(Seq()))
+  // only the first object should be read
+  assert(jsonDF.count() === 1)
+}
+  }
+
+  test("SPARK-18352: Handle corrupt documents") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val corruptRecordCount = additionalCorruptRecords.count().toInt
+  assert(corruptRecordCount === 5)
+
+  additionalCorruptRecords
+.toDF("value")
+.repartition(corruptRecordCount * 4)
--- End diff --

do you wanna the 5 rows be distributed to 5 files? how about 
`repartitionBy("value")`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100651103
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.save(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  spark
+.createDataFrame(Seq(Tuple1("{}{invalid}")))
+.coalesce(1)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  // no corrupt record column should be created
+  assert(jsonDF.schema === StructType(Seq()))
+  // only the first object should be read
+  assert(jsonDF.count() === 1)
+}
+  }
+
+  test("SPARK-18352: Handle corrupt documents") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  val corruptRecordCount = additionalCorruptRecords.count().toInt
+  assert(corruptRecordCount === 5)
+
+  additionalCorruptRecords
+.toDF("value")
+.repartition(corruptRecordCount * 4)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
--- End diff --

shall we test all the parse modes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100651055
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (uncompressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.save(jsonDir)
+
+  val compressedFiles = new File(jsonDir).listFiles()
+  assert(compressedFiles.exists(_.getName.endsWith(".json")))
+
+  val jsonCopy = spark.read
+.format("json")
+.load(jsonDir)
+
+  assert(jsonCopy.count === jsonDF.count)
+  val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
+  val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
+  checkAnswer(jsonCopySome, jsonDFSome)
+}
+  }
+
+  test("SPARK-18352: Expect one JSON document per file") {
+// the json parser terminates as soon as it sees a matching END_OBJECT 
or END_ARRAY token.
+// this might not be the optimal behavior but this test verifies that 
only the first value
+// is parsed and the rest are discarded.
+
+// alternatively the parser could continue parsing following objects, 
which may further reduce
+// allocations by skipping the line reader entirely
+
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  spark
+.createDataFrame(Seq(Tuple1("{}{invalid}")))
+.coalesce(1)
+.write
+.text(path)
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
--- End diff --

shall we test all the parse modes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100650756
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
+.save(jsonDir)
+
+  new File(jsonDir).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".json.gz")))
+  }
+
+  val jsonCopy = spark.read
--- End diff --

I'd like to write the whole-file json, then read it and check the answer, 
instead of writing and reading it again, which is too complicated. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100650577
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
+  assert(compressedFiles.exists(_.getName.endsWith(".gz")))
+  }
+
+  val jsonDF = spark.read.option("wholeFile", true).json(path)
+  val jsonDir = new File(dir, "json").getCanonicalPath
+  jsonDF.coalesce(1).write
+.format("json")
+.option("compression", "gZiP")
--- End diff --

we don't support `wholeFile` at write side, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100650500
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
--- End diff --

I checked with `primitiveFieldAndType`, it's not a json array but is 
wrapped with `{ }`. This is different with the [python 
test](https://github.com/apache/spark/pull/16386/files#diff-e8e190e27ba3aee32a59b787696b34c6R1)
 . So if the whole file is not a json array, this file will only produce a 
single row, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100650450
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -394,36 +447,32 @@ class JacksonParser(
   }
 
   /**
-   * Parse the string JSON input to the set of [[InternalRow]]s.
+   * Parse the JSON input to the set of [[InternalRow]]s.
+   *
+   * @param recordLiteral an optional function that will be used to 
generate
+   *   the corrupt record text instead of record.toString
*/
-  def parse(input: String): Seq[InternalRow] = {
-if (input.trim.isEmpty) {
-  Nil
-} else {
-  try {
-Utils.tryWithResource(factory.createParser(input)) { parser =>
-  parser.nextToken()
-  rootConverter.apply(parser) match {
-case null => failedRecord(input)
-case row: InternalRow => row :: Nil
-case array: ArrayData =>
-  // Here, as we support reading top level JSON arrays and 
take every element
-  // in such an array as a row, this case is possible.
-  if (array.numElements() == 0) {
-Nil
-  } else {
-array.toArray[InternalRow](schema)
-  }
-case _ =>
-  failedRecord(input)
+  def parse[T](
+  record: T,
+  createParser: (JsonFactory, T) => JsonParser,
+  recordLiteral: PartialFunction[T, UTF8String] = 
PartialFunction.empty): Seq[InternalRow] = {
--- End diff --

Passing a function in instead of a closure saves an allocation that will be 
held for the duration of parsing, and is likely to be promoted to a later GC 
generation.

If we went the closure route the function signature should be this:

```scala
def parse(
  createParser: JsonFactory => JsonParser,
  recordLiteral: => UTF8String): Seq[InternalRow]
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16889: [SPARK-17668][SQL] Use Expressions for conversions to/fr...

2017-02-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16889
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16889: [SPARK-17668][SQL] Use Expressions for conversions to/fr...

2017-02-10 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/16889
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72720/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16889: [SPARK-17668][SQL] Use Expressions for conversions to/fr...

2017-02-10 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/16889
  
**[Test build #72720 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72720/testReport)**
 for PR 16889 at commit 
[`ac09ad5`](https://github.com/apache/spark/commit/ac09ad519437fe8efb071f354cc4387a4a95c206).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100650024
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 ---
@@ -1764,4 +1769,123 @@ class JsonSuite extends QueryTest with 
SharedSQLContext with TestJsonData {
 val df2 = spark.read.option("PREfersdecimaL", "true").json(records)
 assert(df2.schema == schema)
   }
+
+  test("SPARK-18352: Parse normal multi-line JSON files (compressed)") {
+withTempPath { dir =>
+  val path = dir.getCanonicalPath
+  primitiveFieldAndType
+.toDF("value")
+.write
+.option("compression", "GzIp")
+.text(path)
+
+  new File(path).listFiles() match {
+case compressedFiles =>
--- End diff --

why add this pattern match? is it same with `assert(new 
File(path).listFiles().exists(_.getName.endsWith(".gz")))`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100649836
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -48,69 +47,110 @@ class JacksonParser(
 
   // A `ValueConverter` is responsible for converting a value from 
`JsonParser`
   // to a value in a field for `InternalRow`.
-  private type ValueConverter = (JsonParser) => Any
+  private type ValueConverter = JsonParser => AnyRef
 
   // `ValueConverter`s for the root schema for all fields in the schema
-  private val rootConverter: ValueConverter = makeRootConverter(schema)
+  private val rootConverter = makeRootConverter(schema)
 
   private val factory = new JsonFactory()
   options.setJacksonOptions(factory)
 
   private val emptyRow: Seq[InternalRow] = Seq(new 
GenericInternalRow(schema.length))
 
+  private val corruptFieldIndex = 
schema.getFieldIndex(options.columnNameOfCorruptRecord)
+
+  @transient
+  private[this] var isWarningPrinted: Boolean = false
+
   @transient
-  private[this] var isWarningPrintedForMalformedRecord: Boolean = false
+  private def printWarningForMalformedRecord(record: () => UTF8String): 
Unit = {
+def sampleRecord: String = {
+  if (options.wholeFile) {
+""
+  } else {
+s"Sample record: ${record()}\n"
+  }
+}
+
+def footer: String = {
+  s"""Code example to print all malformed records (scala):
+ |===
+ |// The corrupted record exists in column 
${options.columnNameOfCorruptRecord}.
+ |val parsedJson = spark.read.json("/path/to/json/file/test.json")
+ |
+   """.stripMargin
+}
+
+if (options.permissive) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will 
replace
+   |all malformed records with placeholder null in current 
$PERMISSIVE_MODE parser mode.
+   |To find out which corrupted records have been replaced with 
null, please use the
+   |default inferred schema instead of providing a custom schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+} else if (options.dropMalformed) {
+  logWarning(
+s"""Found at least one malformed record. The JSON reader will drop
+   |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
+   |corrupted records have been dropped, please switch the parser 
mode to $PERMISSIVE_MODE
+   |mode and use the default inferred schema.
+   |
+   |${sampleRecord ++ footer}
+   |
+ """.stripMargin)
+}
+  }
+
+  @transient
+  private def printWarningIfWholeFile(): Unit = {
+if (options.wholeFile && corruptFieldIndex.isDefined) {
+  logWarning(
+s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord 
may result
+   |in very large allocations or OutOfMemoryExceptions being 
raised.
+   |
+ """.stripMargin)
+}
+  }
 
   /**
* This function deals with the cases it fails to parse. This function 
will be called
* when exceptions are caught during converting. This functions also 
deals with `mode` option.
*/
-  private def failedRecord(record: String): Seq[InternalRow] = {
-// create a row even if no corrupt record column is present
-if (options.failFast) {
-  throw new SparkSQLJsonProcessingException(s"Malformed line in 
FAILFAST mode: $record")
-}
-if (options.dropMalformed) {
-  if (!isWarningPrintedForMalformedRecord) {
-logWarning(
-  s"""Found at least one malformed records (sample: $record). The 
JSON reader will drop
- |all malformed records in current $DROP_MALFORMED_MODE parser 
mode. To find out which
- |corrupted records have been dropped, please switch the 
parser mode to $PERMISSIVE_MODE
- |mode and use the default inferred schema.
- |
- |Code example to print all malformed records (scala):
- |===
- |// The corrupted record exists in column 
${columnNameOfCorruptRecord}
- |val parsedJson = 
spark.read.json("/path/to/json/file/test.json")
- |
-   """.stripMargin)
-isWarningPrintedForMalformedRecord = true
-  }
-  Nil
-} else if (schema.getFieldIndex(columnNameOfCorruptRecord).isEmpty) {
-  if 

[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100649847
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala
 ---
@@ -79,7 +80,7 @@ private[sql] object JsonInferSchema {
 
   private[this] val structFieldComparator = new Comparator[StructField] {
 override def compare(o1: StructField, o2: StructField): Int = {
-  o1.name.compare(o2.name)
+  o1.name.compareTo(o2.name)
--- End diff --

why this change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100649748
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
 ---
@@ -97,46 +91,13 @@ class JsonFileFormat extends TextBasedFileFormat with 
DataSourceRegister {
 val broadcastedHadoopConf =
   sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
 
-val parsedOptions: JSONOptions = new JSONOptions(options)
-val columnNameOfCorruptRecord = parsedOptions.columnNameOfCorruptRecord
-  .getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+val parsedOptions = new JSONOptions(options,
+  sparkSession.sessionState.conf.columnNameOfCorruptRecord)
+val readFile = JsonDataSource(parsedOptions).readFile _
--- End diff --

this closure has the reference of the outer pointer, so we will still 
broadcast the `JsonDataSource`. how about
```
val columnNameOfCorruptRecord = 
sparkSession.sessionState.conf.columnNameOfCorruptRecord)

(file: PartitionedFile) => {
  val parsedOptions  =...
  val parser = new JacksonParser(requiredSchema, parsedOptions)
  JsonDataSource(parsedOptions).readFile...
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100649635
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala
 ---
@@ -31,10 +31,17 @@ import 
org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CompressionCodecs
  * Most of these map directly to Jackson's internal options, specified in 
[[JsonParser.Feature]].
  */
 private[sql] class JSONOptions(
-@transient private val parameters: CaseInsensitiveMap)
+@transient private val parameters: CaseInsensitiveMap,
+defaultColumnNameOfCorruptRecord: String)
   extends Logging with Serializable  {
 
-  def this(parameters: Map[String, String]) = this(new 
CaseInsensitiveMap(parameters))
+  def this(
+  parameters: Map[String, String],
+  defaultColumnNameOfCorruptRecord: String = "") = {
--- End diff --

Yes, it's really not a good solution, but it doesn't make sense to have a 
corrupt column name in all use cases. Picking another sentinel could 
inadvertently conflict with a real column. It should be `Option[String] = None` 
but this winds up being a large change that deserves a separate pull request. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11887: [SPARK-13041][Mesos]add driver sandbox uri to the dispat...

2017-02-10 Thread skonto
Github user skonto commented on the issue:

https://github.com/apache/spark/pull/11887
  
@HyukjinKwon probably. Let me ask Michael. @mgummelt what do you think? 
what options do we have?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16161: [SPARK-18717][SQL] Make code generation for Scala Map wo...

2017-02-10 Thread liancheng
Github user liancheng commented on the issue:

https://github.com/apache/spark/pull/16161
  
Thanks. Backported to branch-2.1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100649228
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -394,36 +447,32 @@ class JacksonParser(
   }
 
   /**
-   * Parse the string JSON input to the set of [[InternalRow]]s.
+   * Parse the JSON input to the set of [[InternalRow]]s.
+   *
+   * @param recordLiteral an optional function that will be used to 
generate
+   *   the corrupt record text instead of record.toString
*/
-  def parse(input: String): Seq[InternalRow] = {
-if (input.trim.isEmpty) {
-  Nil
-} else {
-  try {
-Utils.tryWithResource(factory.createParser(input)) { parser =>
-  parser.nextToken()
-  rootConverter.apply(parser) match {
-case null => failedRecord(input)
-case row: InternalRow => row :: Nil
-case array: ArrayData =>
-  // Here, as we support reading top level JSON arrays and 
take every element
-  // in such an array as a row, this case is possible.
-  if (array.numElements() == 0) {
-Nil
-  } else {
-array.toArray[InternalRow](schema)
-  }
-case _ =>
-  failedRecord(input)
+  def parse[T](
+  record: T,
+  createParser: (JsonFactory, T) => JsonParser,
+  recordLiteral: PartialFunction[T, UTF8String] = 
PartialFunction.empty): Seq[InternalRow] = {
--- End diff --

seems the only caller is 
https://github.com/apache/spark/pull/16386/files#diff-5ac20b8d75a20117deaa9ba4af814090R211
 , while doesn't take parameter, so `PartialFunction` is not a good choice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100649101
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -394,36 +447,32 @@ class JacksonParser(
   }
 
   /**
-   * Parse the string JSON input to the set of [[InternalRow]]s.
+   * Parse the JSON input to the set of [[InternalRow]]s.
+   *
+   * @param recordLiteral an optional function that will be used to 
generate
+   *   the corrupt record text instead of record.toString
*/
-  def parse(input: String): Seq[InternalRow] = {
-if (input.trim.isEmpty) {
-  Nil
-} else {
-  try {
-Utils.tryWithResource(factory.createParser(input)) { parser =>
-  parser.nextToken()
-  rootConverter.apply(parser) match {
-case null => failedRecord(input)
-case row: InternalRow => row :: Nil
-case array: ArrayData =>
-  // Here, as we support reading top level JSON arrays and 
take every element
-  // in such an array as a row, this case is possible.
-  if (array.numElements() == 0) {
-Nil
-  } else {
-array.toArray[InternalRow](schema)
-  }
-case _ =>
-  failedRecord(input)
+  def parse[T](
+  record: T,
+  createParser: (JsonFactory, T) => JsonParser,
+  recordLiteral: PartialFunction[T, UTF8String] = 
PartialFunction.empty): Seq[InternalRow] = {
--- End diff --

how about `wholeTextRecord: => Option[UTF8String] = None`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100648863
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -394,36 +447,32 @@ class JacksonParser(
   }
 
   /**
-   * Parse the string JSON input to the set of [[InternalRow]]s.
+   * Parse the JSON input to the set of [[InternalRow]]s.
+   *
+   * @param recordLiteral an optional function that will be used to 
generate
+   *   the corrupt record text instead of record.toString
*/
-  def parse(input: String): Seq[InternalRow] = {
-if (input.trim.isEmpty) {
-  Nil
-} else {
-  try {
-Utils.tryWithResource(factory.createParser(input)) { parser =>
-  parser.nextToken()
-  rootConverter.apply(parser) match {
-case null => failedRecord(input)
-case row: InternalRow => row :: Nil
-case array: ArrayData =>
-  // Here, as we support reading top level JSON arrays and 
take every element
-  // in such an array as a row, this case is possible.
-  if (array.numElements() == 0) {
-Nil
-  } else {
-array.toArray[InternalRow](schema)
-  }
-case _ =>
-  failedRecord(input)
+  def parse[T](
+  record: T,
+  createParser: (JsonFactory, T) => JsonParser,
+  recordLiteral: PartialFunction[T, UTF8String] = 
PartialFunction.empty): Seq[InternalRow] = {
+try {
+  Utils.tryWithResource(createParser(factory, record)) { parser =>
+// a null first token is equivalent to testing for 
input.trim.isEmpty
+// but it works on any token stream and not just strings
+parser.nextToken() match {
+  case null => Nil
+  case _ => rootConverter.apply(parser) match {
+case null => throw new SparkSQLJsonProcessingException("Root 
converter returned null")
+case rows => rows
   }
 }
-  } catch {
-case _: JsonProcessingException =>
-  failedRecord(input)
-case _: SparkSQLJsonProcessingException =>
-  failedRecord(input)
   }
+} catch {
+  case (_: JsonProcessingException) | (_: 
SparkSQLJsonProcessingException) =>
--- End diff --

nit: I think the brackets are not needed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16386: [SPARK-18352][SQL] Support parsing multiline json...

2017-02-10 Thread NathanHowell
Github user NathanHowell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16386#discussion_r100648524
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 ---
@@ -394,36 +447,32 @@ class JacksonParser(
   }
 
   /**
-   * Parse the string JSON input to the set of [[InternalRow]]s.
+   * Parse the JSON input to the set of [[InternalRow]]s.
+   *
+   * @param recordLiteral an optional function that will be used to 
generate
+   *   the corrupt record text instead of record.toString
*/
-  def parse(input: String): Seq[InternalRow] = {
-if (input.trim.isEmpty) {
-  Nil
-} else {
-  try {
-Utils.tryWithResource(factory.createParser(input)) { parser =>
-  parser.nextToken()
-  rootConverter.apply(parser) match {
-case null => failedRecord(input)
-case row: InternalRow => row :: Nil
-case array: ArrayData =>
-  // Here, as we support reading top level JSON arrays and 
take every element
-  // in such an array as a row, this case is possible.
-  if (array.numElements() == 0) {
-Nil
-  } else {
-array.toArray[InternalRow](schema)
-  }
-case _ =>
-  failedRecord(input)
+  def parse[T](
+  record: T,
+  createParser: (JsonFactory, T) => JsonParser,
+  recordLiteral: PartialFunction[T, UTF8String] = 
PartialFunction.empty): Seq[InternalRow] = {
+try {
+  Utils.tryWithResource(createParser(factory, record)) { parser =>
+// a null first token is equivalent to testing for 
input.trim.isEmpty
+// but it works on any token stream and not just strings
+parser.nextToken() match {
+  case null => Nil
+  case _ => rootConverter.apply(parser) match {
+case null => throw new SparkSQLJsonProcessingException("Root 
converter returned null")
+case rows => rows
   }
 }
-  } catch {
-case _: JsonProcessingException =>
-  failedRecord(input)
-case _: SparkSQLJsonProcessingException =>
-  failedRecord(input)
   }
+} catch {
+  case (_: JsonProcessingException) | (_: 
SparkSQLJsonProcessingException) =>
+failedRecord(() => recordLiteral.applyOrElse[T, UTF8String](
--- End diff --

When I do that I usually get review comments to make call by name 
parameters explicit...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   >