[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

2015-06-23 Thread wangxiaojing
Github user wangxiaojing commented on the pull request:

https://github.com/apache/spark/pull/2765#issuecomment-114396317
  
@andrewor14  ok.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

2015-06-23 Thread wangxiaojing
Github user wangxiaojing closed the pull request at:

https://github.com/apache/spark/pull/2765


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

2015-03-03 Thread wangxiaojing
Github user wangxiaojing commented on the pull request:

https://github.com/apache/spark/pull/2765#issuecomment-77104507
  
@srowen There is two possible solution,no API change.
 1.Adding a switch Whether to  monitor files in subdirectories,Defaults 
to false will monitor the directory `dataDirectory` , If set to true,monitor 
files in subdirectories. 
   2.Adding a  configuration `streaming.monitor.directory.depth` to  
control the searching depth of directories,Defaults to 1 will monitor the 
directory `dataDirectory` , If set to  greater than 1, monitor files in 
subdirectories.
Can you give some advice?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

2015-03-02 Thread wangxiaojing
Github user wangxiaojing commented on the pull request:

https://github.com/apache/spark/pull/2765#issuecomment-76891500
  
@srowen Can we test this again please ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

2015-03-02 Thread wangxiaojing
Github user wangxiaojing commented on the pull request:

https://github.com/apache/spark/pull/2765#issuecomment-76871502
  
@srowen This PR  does not affect 
·org.apache.spark.JavaAPISuite.aggregateByKey·. Please test this again, 
thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [Minor] Fix the value represented by spark.exe...

2015-01-28 Thread wangxiaojing
Github user wangxiaojing commented on the pull request:

https://github.com/apache/spark/pull/3812#issuecomment-71803946
  
@srowen @andrewor14 This change can affects for users monitoring driver's 
metrics. The property is used for metrics value.
eg: ``,  parser error 
: Unescaped '<' not allowed in attributes values.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

2015-01-18 Thread wangxiaojing
Github user wangxiaojing commented on the pull request:

https://github.com/apache/spark/pull/2765#issuecomment-70437483
  
@tdas 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

2015-01-13 Thread wangxiaojing
Github user wangxiaojing commented on the pull request:

https://github.com/apache/spark/pull/2765#issuecomment-69854635
  
@tdas 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

2015-01-08 Thread wangxiaojing
Github user wangxiaojing commented on the pull request:

https://github.com/apache/spark/pull/2765#issuecomment-69288062
  
@tdas rebase the latest master and update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

2015-01-08 Thread wangxiaojing
Github user wangxiaojing commented on the pull request:

https://github.com/apache/spark/pull/2765#issuecomment-69169225
  
@tdas Thanks a lot . I fix them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

2015-01-07 Thread wangxiaojing
GitHub user wangxiaojing reopened a pull request:

https://github.com/apache/spark/pull/2765

[SPARK-3586][streaming]Support nested directories in Spark Streaming

For text files, the method streamingContext.textFileStream(dataDirectory). 
The improvement of the streaming to Support subdirectories,spark streaming 
can  monitor the subdirectories dataDirectory and process any files created in 
that directory.
eg:
streamingContext.textFileStream(/test). 
Look at the direction contents:
/test/file1
/test/file2
/test/dr/file1
if the directory "/test/dr/" have new file "file2" ,spark streaming can 
process  the file



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wangxiaojing/spark spark-3586

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2765.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2765


commit 843f905cdf4ecb45aaa2edb9b34dc213e59e65c0
Author: wangxiaojing 
Date:   2014-10-11T08:22:31Z

Support nested directories in Spark Streaming

commit 6d30f6373fe06176043364c6bf4f7da81a37cf01
Author: wangxiaojing 
Date:   2014-10-12T05:27:22Z

change Nit

commit f00f2822dfbf21501458dd6f59e24eb4e7aac9c9
Author: wangxiaojing 
Date:   2014-10-17T03:46:01Z

support depth

commit 703754517d7077b891346b83e821c081215621db
Author: wangxiaojing 
Date:   2014-10-17T06:22:12Z

Change space before brace

commit 3d9bb2a7ef7ebea3f12866381af4201f4ddc7d60
Author: wangxiaojing 
Date:   2014-10-17T07:24:38Z

change process any files created in nested directories

commit 27dd88425b91471fcf9202364ddd9abb970e8223
Author: wangxiaojing 
Date:   2014-10-24T07:12:17Z

reformat code

commit 70d1b1fba5bb09636f4f3655771a98287c73b9ee
Author: wangxiaojing 
Date:   2014-10-24T07:54:09Z

add a require(depth >= 0)

commit 03489f28f4d8cae05564b41c98a839cf88bfba2f
Author: wangxiaojing 
Date:   2014-10-24T08:54:03Z

reformat code

commit 113c6d4e61e8c7e3fe4dfa4ed65cfe228575508f
Author: wangxiaojing 
Date:   2014-10-28T02:52:01Z

change performance

commit 2cc32fa187bc371f033da5bb2b67bbc7694964ed
Author: wangxiaojing 
Date:   2014-10-28T08:48:37Z

change filter name

commit 0ea8eda9831fcb3796720b0bfe95e51c8f1c3ab0
Author: wangxiaojing 
Date:   2014-11-03T09:55:46Z

change line exceeds 100 columns

commit 997ae5151d13a56ce0f97078ba4dacf454d43edd
Author: wangxiaojing 
Date:   2014-11-03T10:09:15Z

no braces for case clauses

commit 2bb9e9a148747c68d780ccbeaa8206e3b55cecfa
Author: wangxiaojing 
Date:   2014-11-10T03:46:00Z

Performance optimization:directory records have judgment

commit 8bc22af117ca73f10b7f642ccc566493ba6c4a1a
Author: wangxiaojing 
Date:   2014-11-10T05:47:09Z

line over 100

commit 15c389371d645a7fa6f13f3909034fb57ea7360c
Author: wangxiaojing 
Date:   2014-12-04T09:27:12Z

remove line

commit 21f0d82153f2f7d8967256251fe1ef02c84ffa71
Author: wangxiaojing 
Date:   2014-12-04T10:21:19Z

style

commit e488919eb3ffe3b4d6509995720f4e33c48c0762
Author: wangxiaojing 
Date:   2014-12-17T04:49:36Z

change get depth

commit ce86bcc5be8a790245787f75dfd2cba51ab50f55
Author: wangxiaojing 
Date:   2014-12-24T06:07:43Z

Use 'isDir' to  modify the compatibility




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4982][DOC]The `spark.ui.retainedJobs` m...

2014-12-28 Thread wangxiaojing
GitHub user wangxiaojing opened a pull request:

https://github.com/apache/spark/pull/3818

[SPARK-4982][DOC]The `spark.ui.retainedJobs` meaning is wrong in `Spark UI` 
configuration



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wangxiaojing/spark SPARK-4982

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/3818.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3818


commit fe2ad5f18617486ff090ae4498117324b7d4be75
Author: wangxiaojing 
Date:   2014-12-29T03:44:14Z

change stages to jobs




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

2014-12-24 Thread wangxiaojing
Github user wangxiaojing closed the pull request at:

https://github.com/apache/spark/pull/2765


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4527][SQl]Add BroadcastNestedLoopJoin o...

2014-12-12 Thread wangxiaojing
Github user wangxiaojing commented on the pull request:

https://github.com/apache/spark/pull/3395#issuecomment-66860102
  
@liancheng Sorry for the delay in my styling issues, and  thanks for  your 
help.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

2014-12-04 Thread wangxiaojing
Github user wangxiaojing commented on the pull request:

https://github.com/apache/spark/pull/2765#issuecomment-65637093
  
@liancheng 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

2014-12-04 Thread wangxiaojing
Github user wangxiaojing commented on the pull request:

https://github.com/apache/spark/pull/2765#issuecomment-65637021
  
@liancheng 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

2014-12-03 Thread wangxiaojing
Github user wangxiaojing commented on the pull request:

https://github.com/apache/spark/pull/2765#issuecomment-65376390
  
@liancheng  rebase ok.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

2014-12-02 Thread wangxiaojing
Github user wangxiaojing commented on the pull request:

https://github.com/apache/spark/pull/2765#issuecomment-65362730
  
@liancheng 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4570][SQL]add BroadcastLeftSemiJoinHash

2014-12-01 Thread wangxiaojing
Github user wangxiaojing commented on a diff in the pull request:

https://github.com/apache/spark/pull/3442#discussion_r21136073
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala ---
@@ -377,4 +378,39 @@ class JoinSuite extends QueryTest with 
BeforeAndAfterEach {
 """.stripMargin),
   (null, 10) :: Nil)
   }
+ test("broadcasted left semi join operator selection") {
+clearCache()
+sql("CACHE TABLE testData")
+val tmp = autoBroadcastJoinThreshold
+
+sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=10""")
--- End diff --

Because the testData2 size is more than 
`SQLConf.AUTO_BROADCASTJOIN_THRESHOLD` 1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4570][SQL]add BroadcastLeftSemiJoinHash

2014-12-01 Thread wangxiaojing
Github user wangxiaojing commented on the pull request:

https://github.com/apache/spark/pull/3442#issuecomment-65030850
  
@liancheng 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4570][SQL]add BroadcastLeftSemiJoinHash

2014-11-30 Thread wangxiaojing
Github user wangxiaojing commented on the pull request:

https://github.com/apache/spark/pull/3442#issuecomment-65011633
  
@liancheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4570][SQL]add BroadcastLeftSemiJoinHash

2014-11-24 Thread wangxiaojing
GitHub user wangxiaojing opened a pull request:

https://github.com/apache/spark/pull/3442

[SPARK-4570][SQL]add BroadcastLeftSemiJoinHash

JIRA issue: [SPARK-4570](https://issues.apache.org/jira/browse/SPARK-4570)
We are planning to create a `BroadcastLeftSemiJoinHash` to implement the 
broadcast join for `left semijoin`
In left semijoin :
If the size of data from right side is smaller than the user-settable 
threshold `AUTO_BROADCASTJOIN_THRESHOLD`, 
the planner would mark it as the `broadcast` relation and mark the other 
relation as the stream side. The broadcast table will be broadcasted to all of 
the executors involved in the join, as a `org.apache.spark.broadcast.Broadcast` 
object. It will use `joins.BroadcastLeftSemiJoinHash`.,else it will use 
`joins.LeftSemiJoinHash`.

The benchmark suggests these  made the optimized version 4x faster  when 
`left semijoin` 

Original:
left semi join : 9288 ms 
Optimized:
left semi join : 1963 ms 

The micro benchmark load `data1/kv3.txt` into a normal Hive table.
Benchmark code:

 def benchmark(f: => Unit) = {
val begin = System.currentTimeMillis()
f
val end = System.currentTimeMillis()
end - begin
  }
  val sc = new SparkContext(
new SparkConf()
  .setMaster("local")
  .setAppName(getClass.getSimpleName.stripSuffix("$")))
  val hiveContext = new HiveContext(sc)
  import hiveContext._
  sql("drop table if exists left_table")
  sql("drop table if exists right_table")
  sql( """create table left_table (key int, value string)
   """.stripMargin)
  sql( s"""load data local inpath "/data1/kv3.txt" into table left_table""")
  sql( """create table right_table (key int, value string)
   """.stripMargin)
  sql(
"""
  |from left_table
  |insert overwrite table right_table
  |select left_table.key, left_table.value
""".stripMargin)

  val leftSimeJoin = sql(
"""select a.key from left_table a
  |left semi join right_table b on a.key = b.key""".stripMargin)
  val leftSemiJoinDuration = benchmark(leftSimeJoin.count())
  println(s"left semi join : $leftSemiJoinDuration ms ")


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wangxiaojing/spark SPARK-4570

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/3442.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3442


commit 5d58772aa0bd7fd55a9b9495efbff5cc0b36aeae
Author: wangxiaojing 
Date:   2014-11-25T04:04:05Z

add BroadcastLeftSemiJoinHash




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4527][SQl]Add BroadcastNestedLoopJoin o...

2014-11-20 Thread wangxiaojing
GitHub user wangxiaojing opened a pull request:

https://github.com/apache/spark/pull/3395

[SPARK-4527][SQl]Add BroadcastNestedLoopJoin operator selection testsuite

In `JoinSuite` add BroadcastNestedLoopJoin operator selection testsuite

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wangxiaojing/spark SPARK-4527

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/3395.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3395


commit 53c39524703cec6e89886dd3b4d202fbb2141039
Author: wangxiaojing 
Date:   2014-11-21T03:03:38Z

Add BroadcastNestedLoopJoin operator selection testsuite




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL]Add BroadcastHashOuterJoin

2014-11-20 Thread wangxiaojing
Github user wangxiaojing commented on the pull request:

https://github.com/apache/spark/pull/3362#issuecomment-63914461
  
@liancheng Add testsuite.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL]Add BroadcastHashOuterJoin

2014-11-19 Thread wangxiaojing
GitHub user wangxiaojing opened a pull request:

https://github.com/apache/spark/pull/3362

[SPARK-4485][SQL]Add BroadcastHashOuterJoin

We are planning to create a {{BroadcastHashouterJoin}} to implement the 
broadcast join for {{left outer join}} and {{right outer join}}

We use {{left outer join}} for example:
  if the size of data from right side is smaller than the user-settable 
threshold [[AUTO_BROADCASTJOIN_THRESHOLD]]
then we make the right side data as a broadcast
and implement the join opration reference to {{HashOuterJoin}} and 
{{BroadcastHashJoin}}
  else
we still use {{HashOuterJoin}}

The testsuite is writing.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wangxiaojing/spark SPARK-4485

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/3362.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3362


commit 5591fd284fdc532e52821a6bce477a620abd69bd
Author: wangxiaojing 
Date:   2014-11-19T10:15:09Z

Add BroadcastHashOuterJoin




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3907][sql] add truncate table support

2014-11-04 Thread wangxiaojing
Github user wangxiaojing commented on the pull request:

https://github.com/apache/spark/pull/2770#issuecomment-61747497
  
@OopsOutOfMemory  Thanks a lot! 
This bug is also found in the hive.
eg:  {code}TRUNCATE TABLE default.test;
FAILED: ParseException line 1:22 missing EOF at '.' near 'default'{code}



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...

2014-11-02 Thread wangxiaojing
Github user wangxiaojing commented on the pull request:

https://github.com/apache/spark/pull/2765#issuecomment-61441563
  
Rebased with master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement

2014-10-29 Thread wangxiaojing
Github user wangxiaojing commented on a diff in the pull request:

https://github.com/apache/spark/pull/2953#discussion_r19527873
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionSuite.scala
 ---
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.hive._
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.hive.test.TestHive._
+import org.apache.spark.sql.{Row, SchemaRDD}
+
+class HiveWindowFunctionSuite extends HiveComparisonTest {
+
+  override def beforeAll() {
+sql("DROP TABLE IF EXISTS part").collect()
+
+sql("""
+|CREATE TABLE part(
+|p_partkey INT,
+|p_name STRING,
+|p_mfgr STRING,
+|p_brand STRING,
+|p_type STRING,
+|p_size INT,
+|p_container STRING,
+|p_retailprice DOUBLE,
+|p_comment STRING
+|)
+  """.stripMargin).collect()
+
+//remove duplicate data in part_tiny.txt for hive bug
--- End diff --

Space after //


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement

2014-10-29 Thread wangxiaojing
Github user wangxiaojing commented on a diff in the pull request:

https://github.com/apache/spark/pull/2953#discussion_r19527805
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WindowFunction.scala ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.HashMap
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.AllTuples
+import org.apache.spark.sql.catalyst.plans.physical.ClusteredDistribution
+import org.apache.spark.sql.catalyst.errors._
+import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.util.collection.CompactBuffer
+import org.apache.spark.sql.catalyst.plans.physical.ClusteredDistribution
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import 
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection
+import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.SortPartitions
+
+
+/**
+ * :: DeveloperApi ::
+ * Groups input data by `partitionExpressions` and computes the 
`computeExpressions` for each
+ * group.
+ * @param partitionExpressions expressions that are evaluated to determine 
partition.
+ * @param functionExpressions expressions that are computed for each 
partition.
+ * @param child the input data source.
+ */
+@DeveloperApi
+case class WindowFunction(
+  partitionExpressions: Seq[Expression],
+  functionExpressions: Seq[NamedExpression],
+  child: SparkPlan)
+  extends UnaryNode {
+
+  override def requiredChildDistribution =
+if (partitionExpressions == Nil) {
+  AllTuples :: Nil
+} else {
+  ClusteredDistribution(partitionExpressions) :: Nil
+}
+
+  // HACK: Generators don't correctly preserve their output through 
serializations so we grab
+  // out child's output attributes statically here.
+  private[this] val childOutput = child.output
+
+  override def output = functionExpressions.map(_.toAttribute)
+
+  /** A list of functions that need to be computed for each partition. */
+  private[this] val computeExpressions = new 
ArrayBuffer[AggregateExpression]
+
+  private[this] val otherExpressions = new ArrayBuffer[NamedExpression]
+
+  functionExpressions.foreach { sel =>
+sel.collect {
+  case func: AggregateExpression => computeExpressions += func
+  case other: NamedExpression if (!other.isInstanceOf[Alias]) => 
otherExpressions += other
+}
+  }
+
+  private[this] val functionAttributes = computeExpressions.map { func =>
+func -> AttributeReference(s"funcResult:$func", func.dataType, 
func.nullable)()}
+
+  /** The schema of the result of all evaluations */
+  private[this] val resultAttributes =
+otherExpressions.map(_.toAttribute) ++ functionAttributes.map(_._2)
+
+  private[this] val resultMap =
+(otherExpressions.map { other => other -> other.toAttribute } ++ 
functionAttributes
+).toMap
+
+
+  private[this] val resultExpressions = functionExpressions.map { sel =>
+sel.transform {
+  case e: Expression if resultMap.contains(e) => resultMap(e)
+}
+  }
+
+  private[this] val sortExpressions =
+if (child.isInstanceOf[SortPartitions]) {
+  child.asInstanceOf[SortPartitions].sortExpressions
+}
+else if (child.isInstanceOf[Sort]) {
+  child.asInstanceOf[Sort].sortOrder
+}
+else null
+
+  /** Creates a new function buffer for a partition. */
+  private[this] def newFunctionBuffer(): Array[Aggre

[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement

2014-10-29 Thread wangxiaojing
Github user wangxiaojing commented on a diff in the pull request:

https://github.com/apache/spark/pull/2953#discussion_r19527811
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WindowFunction.scala ---
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.HashMap
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.AllTuples
+import org.apache.spark.sql.catalyst.plans.physical.ClusteredDistribution
+import org.apache.spark.sql.catalyst.errors._
+import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.util.collection.CompactBuffer
+import org.apache.spark.sql.catalyst.plans.physical.ClusteredDistribution
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import 
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection
+import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.SortPartitions
+
+
+/**
+ * :: DeveloperApi ::
+ * Groups input data by `partitionExpressions` and computes the 
`computeExpressions` for each
+ * group.
+ * @param partitionExpressions expressions that are evaluated to determine 
partition.
+ * @param functionExpressions expressions that are computed for each 
partition.
+ * @param child the input data source.
+ */
+@DeveloperApi
+case class WindowFunction(
+  partitionExpressions: Seq[Expression],
+  functionExpressions: Seq[NamedExpression],
+  child: SparkPlan)
+  extends UnaryNode {
+
+  override def requiredChildDistribution =
+if (partitionExpressions == Nil) {
+  AllTuples :: Nil
+} else {
+  ClusteredDistribution(partitionExpressions) :: Nil
+}
+
+  // HACK: Generators don't correctly preserve their output through 
serializations so we grab
+  // out child's output attributes statically here.
+  private[this] val childOutput = child.output
+
+  override def output = functionExpressions.map(_.toAttribute)
+
+  /** A list of functions that need to be computed for each partition. */
+  private[this] val computeExpressions = new 
ArrayBuffer[AggregateExpression]
+
+  private[this] val otherExpressions = new ArrayBuffer[NamedExpression]
+
+  functionExpressions.foreach { sel =>
+sel.collect {
+  case func: AggregateExpression => computeExpressions += func
+  case other: NamedExpression if (!other.isInstanceOf[Alias]) => 
otherExpressions += other
+}
+  }
+
+  private[this] val functionAttributes = computeExpressions.map { func =>
+func -> AttributeReference(s"funcResult:$func", func.dataType, 
func.nullable)()}
+
+  /** The schema of the result of all evaluations */
+  private[this] val resultAttributes =
+otherExpressions.map(_.toAttribute) ++ functionAttributes.map(_._2)
+
+  private[this] val resultMap =
+(otherExpressions.map { other => other -> other.toAttribute } ++ 
functionAttributes
+).toMap
+
+
+  private[this] val resultExpressions = functionExpressions.map { sel =>
+sel.transform {
+  case e: Expression if resultMap.contains(e) => resultMap(e)
+}
+  }
+
+  private[this] val sortExpressions =
+if (child.isInstanceOf[SortPartitions]) {
+  child.asInstanceOf[SortPartitions].sortExpressions
+}
+else if (child.isInstanceOf[Sort]) {
+  child.asInstanceOf[Sort].sortOrder
+}
+else null
+
+  /** Creates a new function buffer for a partition. */
+  private[this] def newFunctionBuffer(): Array[Aggre

[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement

2014-10-29 Thread wangxiaojing
Github user wangxiaojing commented on a diff in the pull request:

https://github.com/apache/spark/pull/2953#discussion_r19527196
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala ---
@@ -845,6 +858,198 @@ private[hive] object HiveQl {
   throw new NotImplementedError(s"No parse rules for:\n 
${dumpTree(a).toString} ")
   }
 
+  // store the window def of current sql
+  //use thread id as key to avoid mistake when muti sqls parse at the same 
time
+  protected val windowDefMap = new ConcurrentHashMap[Long,Map[String, 
Seq[ASTNode]]]()
+
+  // store the window spec of current sql
+  //use thread id as key to avoid mistake when muti sqls parse at the same 
time
+  protected val windowPartitionsMap = new ConcurrentHashMap[Long, 
ArrayBuffer[Node]]()
+
+  protected def initWindow() = {
+windowDefMap.put(Thread.currentThread().getId, Map[String, 
Seq[ASTNode]]())
+windowPartitionsMap.put(Thread.currentThread().getId, new 
ArrayBuffer[Node]())
+  }
+  protected def checkWindowDef(windowClause: Option[Node]) = {
+
+var winDefs = windowDefMap.get(Thread.currentThread().getId)
+
+windowClause match {
+  case Some(window) => window.getChildren.foreach {
+case Token("TOK_WINDOWDEF", Token(alias, Nil) :: 
Token("TOK_WINDOWSPEC", ws) :: Nil) => {
+  winDefs += alias -> ws
+}
+  }
+  case None => //do nothing
+}
+
+windowDefMap.put(Thread.currentThread().getId, winDefs)
+  }
+
+  protected def translateWindowSpec(windowSpec: Seq[ASTNode]): 
Seq[ASTNode]= {
+
+windowSpec match {
+  case Token(alias, Nil) :: Nil => 
translateWindowSpec(getWindowSpec(alias))
+  case Token(alias, Nil) :: range => {
+val (partitionClause :: rowsRange :: valueRange :: Nil) = 
getClauses(
+  Seq(
+"TOK_PARTITIONINGSPEC",
+"TOK_WINDOWRANGE",
+"TOK_WINDOWVALUES"),
+  translateWindowSpec(getWindowSpec(alias)))
+partitionClause match {
+  case Some(partition) => partition.asInstanceOf[ASTNode] :: range
+  case None => range
+}
+  }
+  case e => e
+}
+  }
+
+  protected def getWindowSpec(alias: String): Seq[ASTNode]= {
+windowDefMap.get(Thread.currentThread().getId).getOrElse(
+  alias, sys.error("no window def for " + alias))
+  }
+
+  protected def addWindowPartitions(partition: Node) = {
+
+var winPartitions = 
windowPartitionsMap.get(Thread.currentThread().getId)
+winPartitions += partition
+windowPartitionsMap.put(Thread.currentThread().getId, winPartitions)
+  }
+
+  protected def getWindowPartitions(): Seq[Node]= {
+windowPartitionsMap.get(Thread.currentThread().getId).toSeq
+  }
+
+  protected def checkWindowPartitions(): Option[Seq[ASTNode]] = {
+
+val partitionUnits = new ArrayBuffer[Seq[ASTNode]]()
+
+getWindowPartitions.map {
+  case Token("TOK_PARTITIONINGSPEC", partition)  => Some(partition)
+  case _ => None
+}.foreach {
+  case Some(partition) => {
+if (partitionUnits.isEmpty) partitionUnits += partition
+else {
+  //only add different window partitions
+  try {
+partition zip partitionUnits.head foreach {
+  case (l,r) => l checkEquals r
+}
+  } catch {
+case re: RuntimeException => partitionUnits += partition
+  }
+}
+  }
+  case None => //do nothing
+}
+
+//check whether all window partitions are same, we just support same 
window partition now
--- End diff --

Space after //


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement

2014-10-29 Thread wangxiaojing
Github user wangxiaojing commented on a diff in the pull request:

https://github.com/apache/spark/pull/2953#discussion_r19527181
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala ---
@@ -845,6 +858,198 @@ private[hive] object HiveQl {
   throw new NotImplementedError(s"No parse rules for:\n 
${dumpTree(a).toString} ")
   }
 
+  // store the window def of current sql
+  //use thread id as key to avoid mistake when muti sqls parse at the same 
time
+  protected val windowDefMap = new ConcurrentHashMap[Long,Map[String, 
Seq[ASTNode]]]()
+
+  // store the window spec of current sql
+  //use thread id as key to avoid mistake when muti sqls parse at the same 
time
+  protected val windowPartitionsMap = new ConcurrentHashMap[Long, 
ArrayBuffer[Node]]()
+
+  protected def initWindow() = {
+windowDefMap.put(Thread.currentThread().getId, Map[String, 
Seq[ASTNode]]())
+windowPartitionsMap.put(Thread.currentThread().getId, new 
ArrayBuffer[Node]())
+  }
+  protected def checkWindowDef(windowClause: Option[Node]) = {
+
+var winDefs = windowDefMap.get(Thread.currentThread().getId)
+
+windowClause match {
+  case Some(window) => window.getChildren.foreach {
+case Token("TOK_WINDOWDEF", Token(alias, Nil) :: 
Token("TOK_WINDOWSPEC", ws) :: Nil) => {
+  winDefs += alias -> ws
+}
+  }
+  case None => //do nothing
+}
+
+windowDefMap.put(Thread.currentThread().getId, winDefs)
+  }
+
+  protected def translateWindowSpec(windowSpec: Seq[ASTNode]): 
Seq[ASTNode]= {
+
+windowSpec match {
+  case Token(alias, Nil) :: Nil => 
translateWindowSpec(getWindowSpec(alias))
+  case Token(alias, Nil) :: range => {
+val (partitionClause :: rowsRange :: valueRange :: Nil) = 
getClauses(
+  Seq(
+"TOK_PARTITIONINGSPEC",
+"TOK_WINDOWRANGE",
+"TOK_WINDOWVALUES"),
+  translateWindowSpec(getWindowSpec(alias)))
+partitionClause match {
+  case Some(partition) => partition.asInstanceOf[ASTNode] :: range
+  case None => range
+}
+  }
+  case e => e
+}
+  }
+
+  protected def getWindowSpec(alias: String): Seq[ASTNode]= {
+windowDefMap.get(Thread.currentThread().getId).getOrElse(
+  alias, sys.error("no window def for " + alias))
+  }
+
+  protected def addWindowPartitions(partition: Node) = {
+
+var winPartitions = 
windowPartitionsMap.get(Thread.currentThread().getId)
+winPartitions += partition
+windowPartitionsMap.put(Thread.currentThread().getId, winPartitions)
+  }
+
+  protected def getWindowPartitions(): Seq[Node]= {
+windowPartitionsMap.get(Thread.currentThread().getId).toSeq
+  }
+
+  protected def checkWindowPartitions(): Option[Seq[ASTNode]] = {
+
+val partitionUnits = new ArrayBuffer[Seq[ASTNode]]()
+
+getWindowPartitions.map {
+  case Token("TOK_PARTITIONINGSPEC", partition)  => Some(partition)
+  case _ => None
+}.foreach {
+  case Some(partition) => {
+if (partitionUnits.isEmpty) partitionUnits += partition
+else {
+  //only add different window partitions
--- End diff --

Space after //


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement

2014-10-29 Thread wangxiaojing
Github user wangxiaojing commented on a diff in the pull request:

https://github.com/apache/spark/pull/2953#discussion_r19527163
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala ---
@@ -845,6 +858,198 @@ private[hive] object HiveQl {
   throw new NotImplementedError(s"No parse rules for:\n 
${dumpTree(a).toString} ")
   }
 
+  // store the window def of current sql
+  //use thread id as key to avoid mistake when muti sqls parse at the same 
time
+  protected val windowDefMap = new ConcurrentHashMap[Long,Map[String, 
Seq[ASTNode]]]()
+
+  // store the window spec of current sql
+  //use thread id as key to avoid mistake when muti sqls parse at the same 
time
+  protected val windowPartitionsMap = new ConcurrentHashMap[Long, 
ArrayBuffer[Node]]()
+
+  protected def initWindow() = {
+windowDefMap.put(Thread.currentThread().getId, Map[String, 
Seq[ASTNode]]())
+windowPartitionsMap.put(Thread.currentThread().getId, new 
ArrayBuffer[Node]())
+  }
+  protected def checkWindowDef(windowClause: Option[Node]) = {
+
+var winDefs = windowDefMap.get(Thread.currentThread().getId)
+
+windowClause match {
+  case Some(window) => window.getChildren.foreach {
+case Token("TOK_WINDOWDEF", Token(alias, Nil) :: 
Token("TOK_WINDOWSPEC", ws) :: Nil) => {
+  winDefs += alias -> ws
+}
+  }
+  case None => //do nothing
--- End diff --

Space after //


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement

2014-10-29 Thread wangxiaojing
Github user wangxiaojing commented on a diff in the pull request:

https://github.com/apache/spark/pull/2953#discussion_r19527190
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala ---
@@ -845,6 +858,198 @@ private[hive] object HiveQl {
   throw new NotImplementedError(s"No parse rules for:\n 
${dumpTree(a).toString} ")
   }
 
+  // store the window def of current sql
+  //use thread id as key to avoid mistake when muti sqls parse at the same 
time
+  protected val windowDefMap = new ConcurrentHashMap[Long,Map[String, 
Seq[ASTNode]]]()
+
+  // store the window spec of current sql
+  //use thread id as key to avoid mistake when muti sqls parse at the same 
time
+  protected val windowPartitionsMap = new ConcurrentHashMap[Long, 
ArrayBuffer[Node]]()
+
+  protected def initWindow() = {
+windowDefMap.put(Thread.currentThread().getId, Map[String, 
Seq[ASTNode]]())
+windowPartitionsMap.put(Thread.currentThread().getId, new 
ArrayBuffer[Node]())
+  }
+  protected def checkWindowDef(windowClause: Option[Node]) = {
+
+var winDefs = windowDefMap.get(Thread.currentThread().getId)
+
+windowClause match {
+  case Some(window) => window.getChildren.foreach {
+case Token("TOK_WINDOWDEF", Token(alias, Nil) :: 
Token("TOK_WINDOWSPEC", ws) :: Nil) => {
+  winDefs += alias -> ws
+}
+  }
+  case None => //do nothing
+}
+
+windowDefMap.put(Thread.currentThread().getId, winDefs)
+  }
+
+  protected def translateWindowSpec(windowSpec: Seq[ASTNode]): 
Seq[ASTNode]= {
+
+windowSpec match {
+  case Token(alias, Nil) :: Nil => 
translateWindowSpec(getWindowSpec(alias))
+  case Token(alias, Nil) :: range => {
+val (partitionClause :: rowsRange :: valueRange :: Nil) = 
getClauses(
+  Seq(
+"TOK_PARTITIONINGSPEC",
+"TOK_WINDOWRANGE",
+"TOK_WINDOWVALUES"),
+  translateWindowSpec(getWindowSpec(alias)))
+partitionClause match {
+  case Some(partition) => partition.asInstanceOf[ASTNode] :: range
+  case None => range
+}
+  }
+  case e => e
+}
+  }
+
+  protected def getWindowSpec(alias: String): Seq[ASTNode]= {
+windowDefMap.get(Thread.currentThread().getId).getOrElse(
+  alias, sys.error("no window def for " + alias))
+  }
+
+  protected def addWindowPartitions(partition: Node) = {
+
+var winPartitions = 
windowPartitionsMap.get(Thread.currentThread().getId)
+winPartitions += partition
+windowPartitionsMap.put(Thread.currentThread().getId, winPartitions)
+  }
+
+  protected def getWindowPartitions(): Seq[Node]= {
+windowPartitionsMap.get(Thread.currentThread().getId).toSeq
+  }
+
+  protected def checkWindowPartitions(): Option[Seq[ASTNode]] = {
+
+val partitionUnits = new ArrayBuffer[Seq[ASTNode]]()
+
+getWindowPartitions.map {
+  case Token("TOK_PARTITIONINGSPEC", partition)  => Some(partition)
+  case _ => None
+}.foreach {
+  case Some(partition) => {
+if (partitionUnits.isEmpty) partitionUnits += partition
+else {
+  //only add different window partitions
+  try {
+partition zip partitionUnits.head foreach {
+  case (l,r) => l checkEquals r
+}
+  } catch {
+case re: RuntimeException => partitionUnits += partition
+  }
+}
+  }
+  case None => //do nothing
--- End diff --

Space after //


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement

2014-10-29 Thread wangxiaojing
Github user wangxiaojing commented on a diff in the pull request:

https://github.com/apache/spark/pull/2953#discussion_r19527153
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala ---
@@ -845,6 +858,198 @@ private[hive] object HiveQl {
   throw new NotImplementedError(s"No parse rules for:\n 
${dumpTree(a).toString} ")
   }
 
+  // store the window def of current sql
+  //use thread id as key to avoid mistake when muti sqls parse at the same 
time
--- End diff --

Space after //


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement

2014-10-29 Thread wangxiaojing
Github user wangxiaojing commented on a diff in the pull request:

https://github.com/apache/spark/pull/2953#discussion_r19527141
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala ---
@@ -845,6 +858,198 @@ private[hive] object HiveQl {
   throw new NotImplementedError(s"No parse rules for:\n 
${dumpTree(a).toString} ")
   }
 
+  // store the window def of current sql
+  //use thread id as key to avoid mistake when muti sqls parse at the same 
time
+  protected val windowDefMap = new ConcurrentHashMap[Long,Map[String, 
Seq[ASTNode]]]()
+
+  // store the window spec of current sql
+  //use thread id as key to avoid mistake when muti sqls parse at the same 
time
--- End diff --

Space after //


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [WIP][SPARK-4131][SQL] Writing data into the f...

2014-10-29 Thread wangxiaojing
GitHub user wangxiaojing opened a pull request:

https://github.com/apache/spark/pull/2997

[WIP][SPARK-4131][SQL] Writing data into the filesystem from queries



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wangxiaojing/spark SPARK-4131

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2997.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2997


commit f406f49749d590f230c953194a8c36e760fc9460
Author: wangxiaojing 
Date:   2014-10-29T08:58:19Z

 add Token




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

2014-10-24 Thread wangxiaojing
Github user wangxiaojing commented on a diff in the pull request:

https://github.com/apache/spark/pull/2765#discussion_r19327514
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 ---
@@ -230,16 +272,37 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: 
NewInputFormat[K,V] : Clas
 if (minNewFileModTime < 0 || modTime < minNewFileModTime) {
   minNewFileModTime = modTime
 }
+if (path.getName().startsWith("_")) {
--- End diff --

If use saveAsTextFile("tmp") ,first create a - 
file prefix,when finished will have the files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

2014-10-23 Thread wangxiaojing
Github user wangxiaojing commented on the pull request:

https://github.com/apache/spark/pull/2765#issuecomment-60341312
  
@liancheng 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

2014-10-17 Thread wangxiaojing
Github user wangxiaojing commented on the pull request:

https://github.com/apache/spark/pull/2765#issuecomment-59485316
  
@jerryshao @tdas  First,According to the depth to check all the directory 
,then filter the directory if the modification time more then  the ignore 
time.Is this method optimal? thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [spark-3907][sql] add truncate table support

2014-10-17 Thread wangxiaojing
Github user wangxiaojing commented on a diff in the pull request:

https://github.com/apache/spark/pull/2770#discussion_r19007078
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala ---
@@ -121,7 +121,8 @@ private[hive] object HiveQl {
   // Commands that we do not need to explain.
   protected val noExplainCommands = Seq(
 "TOK_CREATETABLE",
-"TOK_DESCTABLE"
+"TOK_DESCTABLE",
+"TOK_TRUNCATETABLE"
--- End diff --

If add  truncate table in nativeCommands,the sql 
truncate table test columns(i) will run hive mapreduce.At present, 
 the columns way only support rcfile in hive



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

2014-10-17 Thread wangxiaojing
Github user wangxiaojing commented on a diff in the pull request:

https://github.com/apache/spark/pull/2765#discussion_r19004975
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 ---
@@ -207,6 +220,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: 
NewInputFormat[K,V] : Clas
 
 def accept(path: Path): Boolean = {
   try {
+if (fs.getFileStatus(path).isDirectory()){
+  return false
+}
 if (!filter(path)) {  // Reject file if it does not satisfy filter
   logDebug("Rejected by filter " + path)
   return false
--- End diff --

Because  the logic is not strict ,if not return false,it will 
continue to run , eventually return to true.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [spark-3907][sql] add truncate table support

2014-10-16 Thread wangxiaojing
Github user wangxiaojing commented on the pull request:

https://github.com/apache/spark/pull/2770#issuecomment-59466262
  
@rxin 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

2014-10-16 Thread wangxiaojing
Github user wangxiaojing commented on the pull request:

https://github.com/apache/spark/pull/2765#issuecomment-59462998
  
Hi @jerryshao,It's changing the code to use this parameter to control the 
searching depth,but if the depth is greater than 1,the ignore time  is not 
reasonable,because if the secondary subdirectories has a new file,the 
modification time of the first subdirectories is not change.like:
The streaming monitor the directory /tmp/
The directory structure is :
 2014-10-16 19:17 /tmp/spark1
 2014-10-16 19:17 /tmp/spark1/spark2

A files created in /tmp/spark1/spark2 

 2014-10-16 19:17 /tmp/spark1
 2014-10-16 19:18 /tmp/spark1/spark2
 2014-10-16 19:18 /tmp/spark1/spark2/file

If you use the ignore time to do filtering,the first subdirectories is 
always ignore,Can you give me some advice?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [spark-3940][sql]sql Print the error code thre...

2014-10-13 Thread wangxiaojing
GitHub user wangxiaojing opened a pull request:

https://github.com/apache/spark/pull/2790

[spark-3940][sql]sql Print the error code three times

IF  wrong sql ,the console print error one times。
eg:
spark-sql> show tabless;
show tabless;
14/10/13 21:03:48 INFO ParseDriver: Parsing command: show tabless
NoViableAltException(26@[598:1: ddlStatement : ( createDatabaseStatement | 
switchDatabaseStatement | dropDatabaseStatement | createTableStatement | 
dropTableStatement | truncateTableStatement | alterStatement | descStatement | 
showStatement | metastoreCheck | createViewStatement | dropViewStatement | 
createFunctionStatement | createMacroStatement | createIndexStatement | 
dropIndexStatement | dropFunctionStatement | dropMacroStatement | 
analyzeStatement | lockStatement | unlockStatement | createRoleStatement | 
dropRoleStatement | grantPrivileges | revokePrivileges | showGrants | 
showRoleGrants | grantRole | revokeRole );])
at org.antlr.runtime.DFA.noViableAlt(DFA.java:158)
at org.antlr.runtime.DFA.predict(DFA.java:144)
at 
org.apache.hadoop.hive.ql.parse.HiveParser.ddlStatement(HiveParser.java:1962)
at 
org.apache.hadoop.hive.ql.parse.HiveParser.execStatement(HiveParser.java:1298)
at 
org.apache.hadoop.hive.ql.parse.HiveParser.statement(HiveParser.java:938)
at 
org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:190)
at 
org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:161)
at org.apache.spark.sql.hive.HiveQl$.getAst(HiveQl.scala:218)
at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:226)
at 
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50)
at 
org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at 
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:31)
at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:130)
at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:130)
at 
org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:184)
at 
org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:183)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:

[GitHub] spark pull request: [spark-3907][sql] add truncate table support

2014-10-11 Thread wangxiaojing
GitHub user wangxiaojing opened a pull request:

https://github.com/apache/spark/pull/2770

[spark-3907][sql] add truncate table support

JIRA issue: [SPARK-3907]https://issues.apache.org/jira/browse/SPARK-3907
add turncate table support 
TRUNCATE TABLE table_name [PARTITION partition_spec];
 
partition_spec:
  : (partition_col = partition_col_value, partition_col = 
partiton_col_value, ...)
Removes all rows from a table or partition(s). Currently target table 
should be native/managed table or exception will be thrown. User can specify 
partial partition_spec for truncating multiple partitions at once and omitting 
partition_spec will truncate all partitions in the table.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wangxiaojing/spark spark-3907

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2770.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2770


commit 77b1f2022d1a5b287fe43e9823f6d8b7934a969e
Author: wangxiaojing 
Date:   2014-10-11T16:08:26Z

 add truncate table support




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

2014-10-11 Thread wangxiaojing
Github user wangxiaojing commented on a diff in the pull request:

https://github.com/apache/spark/pull/2765#discussion_r18740849
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 ---
@@ -207,6 +220,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: 
NewInputFormat[K,V] : Clas
 
 def accept(path: Path): Boolean = {
   try {
+if (fs.getFileStatus(path).isDirectory()){
+  return false
+}
 if (!filter(path)) {  // Reject file if it does not satisfy filter
   logDebug("Rejected by filter " + path)
   return false
--- End diff --

Why?if the file is directory ,the file should not consider.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

2014-10-11 Thread wangxiaojing
Github user wangxiaojing commented on a diff in the pull request:

https://github.com/apache/spark/pull/2765#discussion_r18740834
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 ---
@@ -118,6 +119,18 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: 
NewInputFormat[K,V] : Clas
 (newFiles, filter.minNewFileModTime)
   }
 
+  def getPathList( path:Path, fs:FileSystem):List[Path]={
+val filter = new SubPathFilter()
+var pathList = List[Path]()
+fs.listStatus(path,filter).map(x=>{
+  if(x.isDirectory()){
--- End diff --

Yes,because this only support subdirectories,because nested all the 
directories,processing time is too long 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [spark-3586][streaming]Support nested director...

2014-10-11 Thread wangxiaojing
GitHub user wangxiaojing opened a pull request:

https://github.com/apache/spark/pull/2765

[spark-3586][streaming]Support nested directories in Spark Streaming

For text files, the method streamingContext.textFileStream(dataDirectory). 
The improvement of the streaming to Support subdirectories,spark streaming 
can  monitor the subdirectories dataDirectory and process any files created in 
that directory.
eg:
streamingContext.textFileStream(/test). 
Look at the direction contents:
/test/file1
/test/file2
/test/dr/file1
if the directory "/test/dr/" have new file "file2" ,spark streaming can 
process  the file



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/wangxiaojing/spark spark-3586

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2765.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2765


commit 98ead547f90520819b421b0f4436bfe7d8a3d4f4
Author: wangxiaojing 
Date:   2014-10-11T08:22:31Z

Support nested directories in Spark Streaming




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org