[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...
Github user wangxiaojing commented on the pull request: https://github.com/apache/spark/pull/2765#issuecomment-114396317 @andrewor14 ok. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...
Github user wangxiaojing closed the pull request at: https://github.com/apache/spark/pull/2765 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...
Github user wangxiaojing commented on the pull request: https://github.com/apache/spark/pull/2765#issuecomment-77104507 @srowen There is two possible solution,no API change. 1.Adding a switch Whether to monitor files in subdirectories,Defaults to false will monitor the directory `dataDirectory` , If set to true,monitor files in subdirectories. 2.Adding a configuration `streaming.monitor.directory.depth` to control the searching depth of directories,Defaults to 1 will monitor the directory `dataDirectory` , If set to greater than 1, monitor files in subdirectories. Can you give some advice? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...
Github user wangxiaojing commented on the pull request: https://github.com/apache/spark/pull/2765#issuecomment-76891500 @srowen Can we test this again please ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...
Github user wangxiaojing commented on the pull request: https://github.com/apache/spark/pull/2765#issuecomment-76871502 @srowen This PR does not affect ·org.apache.spark.JavaAPISuite.aggregateByKey·. Please test this again, thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [Minor] Fix the value represented by spark.exe...
Github user wangxiaojing commented on the pull request: https://github.com/apache/spark/pull/3812#issuecomment-71803946 @srowen @andrewor14 This change can affects for users monitoring driver's metrics. The property is used for metrics value. eg: ``, parser error : Unescaped '<' not allowed in attributes values. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...
Github user wangxiaojing commented on the pull request: https://github.com/apache/spark/pull/2765#issuecomment-70437483 @tdas --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...
Github user wangxiaojing commented on the pull request: https://github.com/apache/spark/pull/2765#issuecomment-69854635 @tdas --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...
Github user wangxiaojing commented on the pull request: https://github.com/apache/spark/pull/2765#issuecomment-69288062 @tdas rebase the latest master and update. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...
Github user wangxiaojing commented on the pull request: https://github.com/apache/spark/pull/2765#issuecomment-69169225 @tdas Thanks a lot . I fix them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...
GitHub user wangxiaojing reopened a pull request: https://github.com/apache/spark/pull/2765 [SPARK-3586][streaming]Support nested directories in Spark Streaming For text files, the method streamingContext.textFileStream(dataDirectory). The improvement of the streaming to Support subdirectories,spark streaming can monitor the subdirectories dataDirectory and process any files created in that directory. eg: streamingContext.textFileStream(/test). Look at the direction contents: /test/file1 /test/file2 /test/dr/file1 if the directory "/test/dr/" have new file "file2" ,spark streaming can process the file You can merge this pull request into a Git repository by running: $ git pull https://github.com/wangxiaojing/spark spark-3586 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2765.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2765 commit 843f905cdf4ecb45aaa2edb9b34dc213e59e65c0 Author: wangxiaojing Date: 2014-10-11T08:22:31Z Support nested directories in Spark Streaming commit 6d30f6373fe06176043364c6bf4f7da81a37cf01 Author: wangxiaojing Date: 2014-10-12T05:27:22Z change Nit commit f00f2822dfbf21501458dd6f59e24eb4e7aac9c9 Author: wangxiaojing Date: 2014-10-17T03:46:01Z support depth commit 703754517d7077b891346b83e821c081215621db Author: wangxiaojing Date: 2014-10-17T06:22:12Z Change space before brace commit 3d9bb2a7ef7ebea3f12866381af4201f4ddc7d60 Author: wangxiaojing Date: 2014-10-17T07:24:38Z change process any files created in nested directories commit 27dd88425b91471fcf9202364ddd9abb970e8223 Author: wangxiaojing Date: 2014-10-24T07:12:17Z reformat code commit 70d1b1fba5bb09636f4f3655771a98287c73b9ee Author: wangxiaojing Date: 2014-10-24T07:54:09Z add a require(depth >= 0) commit 03489f28f4d8cae05564b41c98a839cf88bfba2f Author: wangxiaojing Date: 2014-10-24T08:54:03Z reformat code commit 113c6d4e61e8c7e3fe4dfa4ed65cfe228575508f Author: wangxiaojing Date: 2014-10-28T02:52:01Z change performance commit 2cc32fa187bc371f033da5bb2b67bbc7694964ed Author: wangxiaojing Date: 2014-10-28T08:48:37Z change filter name commit 0ea8eda9831fcb3796720b0bfe95e51c8f1c3ab0 Author: wangxiaojing Date: 2014-11-03T09:55:46Z change line exceeds 100 columns commit 997ae5151d13a56ce0f97078ba4dacf454d43edd Author: wangxiaojing Date: 2014-11-03T10:09:15Z no braces for case clauses commit 2bb9e9a148747c68d780ccbeaa8206e3b55cecfa Author: wangxiaojing Date: 2014-11-10T03:46:00Z Performance optimizationï¼directory records have judgment commit 8bc22af117ca73f10b7f642ccc566493ba6c4a1a Author: wangxiaojing Date: 2014-11-10T05:47:09Z line over 100 commit 15c389371d645a7fa6f13f3909034fb57ea7360c Author: wangxiaojing Date: 2014-12-04T09:27:12Z remove line commit 21f0d82153f2f7d8967256251fe1ef02c84ffa71 Author: wangxiaojing Date: 2014-12-04T10:21:19Z style commit e488919eb3ffe3b4d6509995720f4e33c48c0762 Author: wangxiaojing Date: 2014-12-17T04:49:36Z change get depth commit ce86bcc5be8a790245787f75dfd2cba51ab50f55 Author: wangxiaojing Date: 2014-12-24T06:07:43Z Use 'isDir' to modify the compatibility --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4982][DOC]The `spark.ui.retainedJobs` m...
GitHub user wangxiaojing opened a pull request: https://github.com/apache/spark/pull/3818 [SPARK-4982][DOC]The `spark.ui.retainedJobs` meaning is wrong in `Spark UI` configuration You can merge this pull request into a Git repository by running: $ git pull https://github.com/wangxiaojing/spark SPARK-4982 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/3818.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3818 commit fe2ad5f18617486ff090ae4498117324b7d4be75 Author: wangxiaojing Date: 2014-12-29T03:44:14Z change stages to jobs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...
Github user wangxiaojing closed the pull request at: https://github.com/apache/spark/pull/2765 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4527][SQl]Add BroadcastNestedLoopJoin o...
Github user wangxiaojing commented on the pull request: https://github.com/apache/spark/pull/3395#issuecomment-66860102 @liancheng Sorry for the delay in my styling issues, and thanks for your help. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...
Github user wangxiaojing commented on the pull request: https://github.com/apache/spark/pull/2765#issuecomment-65637093 @liancheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...
Github user wangxiaojing commented on the pull request: https://github.com/apache/spark/pull/2765#issuecomment-65637021 @liancheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...
Github user wangxiaojing commented on the pull request: https://github.com/apache/spark/pull/2765#issuecomment-65376390 @liancheng rebase ok. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...
Github user wangxiaojing commented on the pull request: https://github.com/apache/spark/pull/2765#issuecomment-65362730 @liancheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4570][SQL]add BroadcastLeftSemiJoinHash
Github user wangxiaojing commented on a diff in the pull request: https://github.com/apache/spark/pull/3442#discussion_r21136073 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala --- @@ -377,4 +378,39 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { """.stripMargin), (null, 10) :: Nil) } + test("broadcasted left semi join operator selection") { +clearCache() +sql("CACHE TABLE testData") +val tmp = autoBroadcastJoinThreshold + +sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=10""") --- End diff -- Because the testData2 size is more than `SQLConf.AUTO_BROADCASTJOIN_THRESHOLD` 1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4570][SQL]add BroadcastLeftSemiJoinHash
Github user wangxiaojing commented on the pull request: https://github.com/apache/spark/pull/3442#issuecomment-65030850 @liancheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4570][SQL]add BroadcastLeftSemiJoinHash
Github user wangxiaojing commented on the pull request: https://github.com/apache/spark/pull/3442#issuecomment-65011633 @liancheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4570][SQL]add BroadcastLeftSemiJoinHash
GitHub user wangxiaojing opened a pull request: https://github.com/apache/spark/pull/3442 [SPARK-4570][SQL]add BroadcastLeftSemiJoinHash JIRA issue: [SPARK-4570](https://issues.apache.org/jira/browse/SPARK-4570) We are planning to create a `BroadcastLeftSemiJoinHash` to implement the broadcast join for `left semijoin` In left semijoin : If the size of data from right side is smaller than the user-settable threshold `AUTO_BROADCASTJOIN_THRESHOLD`, the planner would mark it as the `broadcast` relation and mark the other relation as the stream side. The broadcast table will be broadcasted to all of the executors involved in the join, as a `org.apache.spark.broadcast.Broadcast` object. It will use `joins.BroadcastLeftSemiJoinHash`.,else it will use `joins.LeftSemiJoinHash`. The benchmark suggests these made the optimized version 4x faster when `left semijoin` Original: left semi join : 9288 ms Optimized: left semi join : 1963 ms The micro benchmark load `data1/kv3.txt` into a normal Hive table. Benchmark code: def benchmark(f: => Unit) = { val begin = System.currentTimeMillis() f val end = System.currentTimeMillis() end - begin } val sc = new SparkContext( new SparkConf() .setMaster("local") .setAppName(getClass.getSimpleName.stripSuffix("$"))) val hiveContext = new HiveContext(sc) import hiveContext._ sql("drop table if exists left_table") sql("drop table if exists right_table") sql( """create table left_table (key int, value string) """.stripMargin) sql( s"""load data local inpath "/data1/kv3.txt" into table left_table""") sql( """create table right_table (key int, value string) """.stripMargin) sql( """ |from left_table |insert overwrite table right_table |select left_table.key, left_table.value """.stripMargin) val leftSimeJoin = sql( """select a.key from left_table a |left semi join right_table b on a.key = b.key""".stripMargin) val leftSemiJoinDuration = benchmark(leftSimeJoin.count()) println(s"left semi join : $leftSemiJoinDuration ms ") You can merge this pull request into a Git repository by running: $ git pull https://github.com/wangxiaojing/spark SPARK-4570 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/3442.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3442 commit 5d58772aa0bd7fd55a9b9495efbff5cc0b36aeae Author: wangxiaojing Date: 2014-11-25T04:04:05Z add BroadcastLeftSemiJoinHash --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4527][SQl]Add BroadcastNestedLoopJoin o...
GitHub user wangxiaojing opened a pull request: https://github.com/apache/spark/pull/3395 [SPARK-4527][SQl]Add BroadcastNestedLoopJoin operator selection testsuite In `JoinSuite` add BroadcastNestedLoopJoin operator selection testsuite You can merge this pull request into a Git repository by running: $ git pull https://github.com/wangxiaojing/spark SPARK-4527 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/3395.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3395 commit 53c39524703cec6e89886dd3b4d202fbb2141039 Author: wangxiaojing Date: 2014-11-21T03:03:38Z Add BroadcastNestedLoopJoin operator selection testsuite --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL]Add BroadcastHashOuterJoin
Github user wangxiaojing commented on the pull request: https://github.com/apache/spark/pull/3362#issuecomment-63914461 @liancheng Add testsuite. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL]Add BroadcastHashOuterJoin
GitHub user wangxiaojing opened a pull request: https://github.com/apache/spark/pull/3362 [SPARK-4485][SQL]Add BroadcastHashOuterJoin We are planning to create a {{BroadcastHashouterJoin}} to implement the broadcast join for {{left outer join}} and {{right outer join}} We use {{left outer join}} for example: if the size of data from right side is smaller than the user-settable threshold [[AUTO_BROADCASTJOIN_THRESHOLD]] then we make the right side data as a broadcast and implement the join opration reference to {{HashOuterJoin}} and {{BroadcastHashJoin}} else we still use {{HashOuterJoin}} The testsuite is writing. You can merge this pull request into a Git repository by running: $ git pull https://github.com/wangxiaojing/spark SPARK-4485 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/3362.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3362 commit 5591fd284fdc532e52821a6bce477a620abd69bd Author: wangxiaojing Date: 2014-11-19T10:15:09Z Add BroadcastHashOuterJoin --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3907][sql] add truncate table support
Github user wangxiaojing commented on the pull request: https://github.com/apache/spark/pull/2770#issuecomment-61747497 @OopsOutOfMemory Thanks a lot! This bug is also found in the hive. eg: {code}TRUNCATE TABLE default.test; FAILED: ParseException line 1:22 missing EOF at '.' near 'default'{code} --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3586][streaming]Support nested director...
Github user wangxiaojing commented on the pull request: https://github.com/apache/spark/pull/2765#issuecomment-61441563 Rebased with master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement
Github user wangxiaojing commented on a diff in the pull request: https://github.com/apache/spark/pull/2953#discussion_r19527873 --- Diff: sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveWindowFunctionSuite.scala --- @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.hive._ +import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.{Row, SchemaRDD} + +class HiveWindowFunctionSuite extends HiveComparisonTest { + + override def beforeAll() { +sql("DROP TABLE IF EXISTS part").collect() + +sql(""" +|CREATE TABLE part( +|p_partkey INT, +|p_name STRING, +|p_mfgr STRING, +|p_brand STRING, +|p_type STRING, +|p_size INT, +|p_container STRING, +|p_retailprice DOUBLE, +|p_comment STRING +|) + """.stripMargin).collect() + +//remove duplicate data in part_tiny.txt for hive bug --- End diff -- Space after // --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement
Github user wangxiaojing commented on a diff in the pull request: https://github.com/apache/spark/pull/2953#discussion_r19527805 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WindowFunction.scala --- @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.util.HashMap + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.AllTuples +import org.apache.spark.sql.catalyst.plans.physical.ClusteredDistribution +import org.apache.spark.sql.catalyst.errors._ +import scala.collection.mutable.ArrayBuffer +import org.apache.spark.util.collection.CompactBuffer +import org.apache.spark.sql.catalyst.plans.physical.ClusteredDistribution +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.SortPartitions + + +/** + * :: DeveloperApi :: + * Groups input data by `partitionExpressions` and computes the `computeExpressions` for each + * group. + * @param partitionExpressions expressions that are evaluated to determine partition. + * @param functionExpressions expressions that are computed for each partition. + * @param child the input data source. + */ +@DeveloperApi +case class WindowFunction( + partitionExpressions: Seq[Expression], + functionExpressions: Seq[NamedExpression], + child: SparkPlan) + extends UnaryNode { + + override def requiredChildDistribution = +if (partitionExpressions == Nil) { + AllTuples :: Nil +} else { + ClusteredDistribution(partitionExpressions) :: Nil +} + + // HACK: Generators don't correctly preserve their output through serializations so we grab + // out child's output attributes statically here. + private[this] val childOutput = child.output + + override def output = functionExpressions.map(_.toAttribute) + + /** A list of functions that need to be computed for each partition. */ + private[this] val computeExpressions = new ArrayBuffer[AggregateExpression] + + private[this] val otherExpressions = new ArrayBuffer[NamedExpression] + + functionExpressions.foreach { sel => +sel.collect { + case func: AggregateExpression => computeExpressions += func + case other: NamedExpression if (!other.isInstanceOf[Alias]) => otherExpressions += other +} + } + + private[this] val functionAttributes = computeExpressions.map { func => +func -> AttributeReference(s"funcResult:$func", func.dataType, func.nullable)()} + + /** The schema of the result of all evaluations */ + private[this] val resultAttributes = +otherExpressions.map(_.toAttribute) ++ functionAttributes.map(_._2) + + private[this] val resultMap = +(otherExpressions.map { other => other -> other.toAttribute } ++ functionAttributes +).toMap + + + private[this] val resultExpressions = functionExpressions.map { sel => +sel.transform { + case e: Expression if resultMap.contains(e) => resultMap(e) +} + } + + private[this] val sortExpressions = +if (child.isInstanceOf[SortPartitions]) { + child.asInstanceOf[SortPartitions].sortExpressions +} +else if (child.isInstanceOf[Sort]) { + child.asInstanceOf[Sort].sortOrder +} +else null + + /** Creates a new function buffer for a partition. */ + private[this] def newFunctionBuffer(): Array[Aggre
[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement
Github user wangxiaojing commented on a diff in the pull request: https://github.com/apache/spark/pull/2953#discussion_r19527811 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WindowFunction.scala --- @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.util.HashMap + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.AllTuples +import org.apache.spark.sql.catalyst.plans.physical.ClusteredDistribution +import org.apache.spark.sql.catalyst.errors._ +import scala.collection.mutable.ArrayBuffer +import org.apache.spark.util.collection.CompactBuffer +import org.apache.spark.sql.catalyst.plans.physical.ClusteredDistribution +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.SortPartitions + + +/** + * :: DeveloperApi :: + * Groups input data by `partitionExpressions` and computes the `computeExpressions` for each + * group. + * @param partitionExpressions expressions that are evaluated to determine partition. + * @param functionExpressions expressions that are computed for each partition. + * @param child the input data source. + */ +@DeveloperApi +case class WindowFunction( + partitionExpressions: Seq[Expression], + functionExpressions: Seq[NamedExpression], + child: SparkPlan) + extends UnaryNode { + + override def requiredChildDistribution = +if (partitionExpressions == Nil) { + AllTuples :: Nil +} else { + ClusteredDistribution(partitionExpressions) :: Nil +} + + // HACK: Generators don't correctly preserve their output through serializations so we grab + // out child's output attributes statically here. + private[this] val childOutput = child.output + + override def output = functionExpressions.map(_.toAttribute) + + /** A list of functions that need to be computed for each partition. */ + private[this] val computeExpressions = new ArrayBuffer[AggregateExpression] + + private[this] val otherExpressions = new ArrayBuffer[NamedExpression] + + functionExpressions.foreach { sel => +sel.collect { + case func: AggregateExpression => computeExpressions += func + case other: NamedExpression if (!other.isInstanceOf[Alias]) => otherExpressions += other +} + } + + private[this] val functionAttributes = computeExpressions.map { func => +func -> AttributeReference(s"funcResult:$func", func.dataType, func.nullable)()} + + /** The schema of the result of all evaluations */ + private[this] val resultAttributes = +otherExpressions.map(_.toAttribute) ++ functionAttributes.map(_._2) + + private[this] val resultMap = +(otherExpressions.map { other => other -> other.toAttribute } ++ functionAttributes +).toMap + + + private[this] val resultExpressions = functionExpressions.map { sel => +sel.transform { + case e: Expression if resultMap.contains(e) => resultMap(e) +} + } + + private[this] val sortExpressions = +if (child.isInstanceOf[SortPartitions]) { + child.asInstanceOf[SortPartitions].sortExpressions +} +else if (child.isInstanceOf[Sort]) { + child.asInstanceOf[Sort].sortOrder +} +else null + + /** Creates a new function buffer for a partition. */ + private[this] def newFunctionBuffer(): Array[Aggre
[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement
Github user wangxiaojing commented on a diff in the pull request: https://github.com/apache/spark/pull/2953#discussion_r19527196 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala --- @@ -845,6 +858,198 @@ private[hive] object HiveQl { throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ") } + // store the window def of current sql + //use thread id as key to avoid mistake when muti sqls parse at the same time + protected val windowDefMap = new ConcurrentHashMap[Long,Map[String, Seq[ASTNode]]]() + + // store the window spec of current sql + //use thread id as key to avoid mistake when muti sqls parse at the same time + protected val windowPartitionsMap = new ConcurrentHashMap[Long, ArrayBuffer[Node]]() + + protected def initWindow() = { +windowDefMap.put(Thread.currentThread().getId, Map[String, Seq[ASTNode]]()) +windowPartitionsMap.put(Thread.currentThread().getId, new ArrayBuffer[Node]()) + } + protected def checkWindowDef(windowClause: Option[Node]) = { + +var winDefs = windowDefMap.get(Thread.currentThread().getId) + +windowClause match { + case Some(window) => window.getChildren.foreach { +case Token("TOK_WINDOWDEF", Token(alias, Nil) :: Token("TOK_WINDOWSPEC", ws) :: Nil) => { + winDefs += alias -> ws +} + } + case None => //do nothing +} + +windowDefMap.put(Thread.currentThread().getId, winDefs) + } + + protected def translateWindowSpec(windowSpec: Seq[ASTNode]): Seq[ASTNode]= { + +windowSpec match { + case Token(alias, Nil) :: Nil => translateWindowSpec(getWindowSpec(alias)) + case Token(alias, Nil) :: range => { +val (partitionClause :: rowsRange :: valueRange :: Nil) = getClauses( + Seq( +"TOK_PARTITIONINGSPEC", +"TOK_WINDOWRANGE", +"TOK_WINDOWVALUES"), + translateWindowSpec(getWindowSpec(alias))) +partitionClause match { + case Some(partition) => partition.asInstanceOf[ASTNode] :: range + case None => range +} + } + case e => e +} + } + + protected def getWindowSpec(alias: String): Seq[ASTNode]= { +windowDefMap.get(Thread.currentThread().getId).getOrElse( + alias, sys.error("no window def for " + alias)) + } + + protected def addWindowPartitions(partition: Node) = { + +var winPartitions = windowPartitionsMap.get(Thread.currentThread().getId) +winPartitions += partition +windowPartitionsMap.put(Thread.currentThread().getId, winPartitions) + } + + protected def getWindowPartitions(): Seq[Node]= { +windowPartitionsMap.get(Thread.currentThread().getId).toSeq + } + + protected def checkWindowPartitions(): Option[Seq[ASTNode]] = { + +val partitionUnits = new ArrayBuffer[Seq[ASTNode]]() + +getWindowPartitions.map { + case Token("TOK_PARTITIONINGSPEC", partition) => Some(partition) + case _ => None +}.foreach { + case Some(partition) => { +if (partitionUnits.isEmpty) partitionUnits += partition +else { + //only add different window partitions + try { +partition zip partitionUnits.head foreach { + case (l,r) => l checkEquals r +} + } catch { +case re: RuntimeException => partitionUnits += partition + } +} + } + case None => //do nothing +} + +//check whether all window partitions are same, we just support same window partition now --- End diff -- Space after // --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement
Github user wangxiaojing commented on a diff in the pull request: https://github.com/apache/spark/pull/2953#discussion_r19527181 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala --- @@ -845,6 +858,198 @@ private[hive] object HiveQl { throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ") } + // store the window def of current sql + //use thread id as key to avoid mistake when muti sqls parse at the same time + protected val windowDefMap = new ConcurrentHashMap[Long,Map[String, Seq[ASTNode]]]() + + // store the window spec of current sql + //use thread id as key to avoid mistake when muti sqls parse at the same time + protected val windowPartitionsMap = new ConcurrentHashMap[Long, ArrayBuffer[Node]]() + + protected def initWindow() = { +windowDefMap.put(Thread.currentThread().getId, Map[String, Seq[ASTNode]]()) +windowPartitionsMap.put(Thread.currentThread().getId, new ArrayBuffer[Node]()) + } + protected def checkWindowDef(windowClause: Option[Node]) = { + +var winDefs = windowDefMap.get(Thread.currentThread().getId) + +windowClause match { + case Some(window) => window.getChildren.foreach { +case Token("TOK_WINDOWDEF", Token(alias, Nil) :: Token("TOK_WINDOWSPEC", ws) :: Nil) => { + winDefs += alias -> ws +} + } + case None => //do nothing +} + +windowDefMap.put(Thread.currentThread().getId, winDefs) + } + + protected def translateWindowSpec(windowSpec: Seq[ASTNode]): Seq[ASTNode]= { + +windowSpec match { + case Token(alias, Nil) :: Nil => translateWindowSpec(getWindowSpec(alias)) + case Token(alias, Nil) :: range => { +val (partitionClause :: rowsRange :: valueRange :: Nil) = getClauses( + Seq( +"TOK_PARTITIONINGSPEC", +"TOK_WINDOWRANGE", +"TOK_WINDOWVALUES"), + translateWindowSpec(getWindowSpec(alias))) +partitionClause match { + case Some(partition) => partition.asInstanceOf[ASTNode] :: range + case None => range +} + } + case e => e +} + } + + protected def getWindowSpec(alias: String): Seq[ASTNode]= { +windowDefMap.get(Thread.currentThread().getId).getOrElse( + alias, sys.error("no window def for " + alias)) + } + + protected def addWindowPartitions(partition: Node) = { + +var winPartitions = windowPartitionsMap.get(Thread.currentThread().getId) +winPartitions += partition +windowPartitionsMap.put(Thread.currentThread().getId, winPartitions) + } + + protected def getWindowPartitions(): Seq[Node]= { +windowPartitionsMap.get(Thread.currentThread().getId).toSeq + } + + protected def checkWindowPartitions(): Option[Seq[ASTNode]] = { + +val partitionUnits = new ArrayBuffer[Seq[ASTNode]]() + +getWindowPartitions.map { + case Token("TOK_PARTITIONINGSPEC", partition) => Some(partition) + case _ => None +}.foreach { + case Some(partition) => { +if (partitionUnits.isEmpty) partitionUnits += partition +else { + //only add different window partitions --- End diff -- Space after // --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement
Github user wangxiaojing commented on a diff in the pull request: https://github.com/apache/spark/pull/2953#discussion_r19527163 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala --- @@ -845,6 +858,198 @@ private[hive] object HiveQl { throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ") } + // store the window def of current sql + //use thread id as key to avoid mistake when muti sqls parse at the same time + protected val windowDefMap = new ConcurrentHashMap[Long,Map[String, Seq[ASTNode]]]() + + // store the window spec of current sql + //use thread id as key to avoid mistake when muti sqls parse at the same time + protected val windowPartitionsMap = new ConcurrentHashMap[Long, ArrayBuffer[Node]]() + + protected def initWindow() = { +windowDefMap.put(Thread.currentThread().getId, Map[String, Seq[ASTNode]]()) +windowPartitionsMap.put(Thread.currentThread().getId, new ArrayBuffer[Node]()) + } + protected def checkWindowDef(windowClause: Option[Node]) = { + +var winDefs = windowDefMap.get(Thread.currentThread().getId) + +windowClause match { + case Some(window) => window.getChildren.foreach { +case Token("TOK_WINDOWDEF", Token(alias, Nil) :: Token("TOK_WINDOWSPEC", ws) :: Nil) => { + winDefs += alias -> ws +} + } + case None => //do nothing --- End diff -- Space after // --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement
Github user wangxiaojing commented on a diff in the pull request: https://github.com/apache/spark/pull/2953#discussion_r19527190 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala --- @@ -845,6 +858,198 @@ private[hive] object HiveQl { throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ") } + // store the window def of current sql + //use thread id as key to avoid mistake when muti sqls parse at the same time + protected val windowDefMap = new ConcurrentHashMap[Long,Map[String, Seq[ASTNode]]]() + + // store the window spec of current sql + //use thread id as key to avoid mistake when muti sqls parse at the same time + protected val windowPartitionsMap = new ConcurrentHashMap[Long, ArrayBuffer[Node]]() + + protected def initWindow() = { +windowDefMap.put(Thread.currentThread().getId, Map[String, Seq[ASTNode]]()) +windowPartitionsMap.put(Thread.currentThread().getId, new ArrayBuffer[Node]()) + } + protected def checkWindowDef(windowClause: Option[Node]) = { + +var winDefs = windowDefMap.get(Thread.currentThread().getId) + +windowClause match { + case Some(window) => window.getChildren.foreach { +case Token("TOK_WINDOWDEF", Token(alias, Nil) :: Token("TOK_WINDOWSPEC", ws) :: Nil) => { + winDefs += alias -> ws +} + } + case None => //do nothing +} + +windowDefMap.put(Thread.currentThread().getId, winDefs) + } + + protected def translateWindowSpec(windowSpec: Seq[ASTNode]): Seq[ASTNode]= { + +windowSpec match { + case Token(alias, Nil) :: Nil => translateWindowSpec(getWindowSpec(alias)) + case Token(alias, Nil) :: range => { +val (partitionClause :: rowsRange :: valueRange :: Nil) = getClauses( + Seq( +"TOK_PARTITIONINGSPEC", +"TOK_WINDOWRANGE", +"TOK_WINDOWVALUES"), + translateWindowSpec(getWindowSpec(alias))) +partitionClause match { + case Some(partition) => partition.asInstanceOf[ASTNode] :: range + case None => range +} + } + case e => e +} + } + + protected def getWindowSpec(alias: String): Seq[ASTNode]= { +windowDefMap.get(Thread.currentThread().getId).getOrElse( + alias, sys.error("no window def for " + alias)) + } + + protected def addWindowPartitions(partition: Node) = { + +var winPartitions = windowPartitionsMap.get(Thread.currentThread().getId) +winPartitions += partition +windowPartitionsMap.put(Thread.currentThread().getId, winPartitions) + } + + protected def getWindowPartitions(): Seq[Node]= { +windowPartitionsMap.get(Thread.currentThread().getId).toSeq + } + + protected def checkWindowPartitions(): Option[Seq[ASTNode]] = { + +val partitionUnits = new ArrayBuffer[Seq[ASTNode]]() + +getWindowPartitions.map { + case Token("TOK_PARTITIONINGSPEC", partition) => Some(partition) + case _ => None +}.foreach { + case Some(partition) => { +if (partitionUnits.isEmpty) partitionUnits += partition +else { + //only add different window partitions + try { +partition zip partitionUnits.head foreach { + case (l,r) => l checkEquals r +} + } catch { +case re: RuntimeException => partitionUnits += partition + } +} + } + case None => //do nothing --- End diff -- Space after // --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement
Github user wangxiaojing commented on a diff in the pull request: https://github.com/apache/spark/pull/2953#discussion_r19527153 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala --- @@ -845,6 +858,198 @@ private[hive] object HiveQl { throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ") } + // store the window def of current sql + //use thread id as key to avoid mistake when muti sqls parse at the same time --- End diff -- Space after // --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-1442] [SQL] window function implement
Github user wangxiaojing commented on a diff in the pull request: https://github.com/apache/spark/pull/2953#discussion_r19527141 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala --- @@ -845,6 +858,198 @@ private[hive] object HiveQl { throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ") } + // store the window def of current sql + //use thread id as key to avoid mistake when muti sqls parse at the same time + protected val windowDefMap = new ConcurrentHashMap[Long,Map[String, Seq[ASTNode]]]() + + // store the window spec of current sql + //use thread id as key to avoid mistake when muti sqls parse at the same time --- End diff -- Space after // --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [WIP][SPARK-4131][SQL] Writing data into the f...
GitHub user wangxiaojing opened a pull request: https://github.com/apache/spark/pull/2997 [WIP][SPARK-4131][SQL] Writing data into the filesystem from queries You can merge this pull request into a Git repository by running: $ git pull https://github.com/wangxiaojing/spark SPARK-4131 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2997.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2997 commit f406f49749d590f230c953194a8c36e760fc9460 Author: wangxiaojing Date: 2014-10-29T08:58:19Z add Token --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [spark-3586][streaming]Support nested director...
Github user wangxiaojing commented on a diff in the pull request: https://github.com/apache/spark/pull/2765#discussion_r19327514 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala --- @@ -230,16 +272,37 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas if (minNewFileModTime < 0 || modTime < minNewFileModTime) { minNewFileModTime = modTime } +if (path.getName().startsWith("_")) { --- End diff -- If use saveAsTextFile("tmp") ,first create a - file prefix,when finished will have the files. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [spark-3586][streaming]Support nested director...
Github user wangxiaojing commented on the pull request: https://github.com/apache/spark/pull/2765#issuecomment-60341312 @liancheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [spark-3586][streaming]Support nested director...
Github user wangxiaojing commented on the pull request: https://github.com/apache/spark/pull/2765#issuecomment-59485316 @jerryshao @tdas First,According to the depth to check all the directory ,then filter the directory if the modification time more then the ignore time.Is this method optimal? thanks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [spark-3907][sql] add truncate table support
Github user wangxiaojing commented on a diff in the pull request: https://github.com/apache/spark/pull/2770#discussion_r19007078 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala --- @@ -121,7 +121,8 @@ private[hive] object HiveQl { // Commands that we do not need to explain. protected val noExplainCommands = Seq( "TOK_CREATETABLE", -"TOK_DESCTABLE" +"TOK_DESCTABLE", +"TOK_TRUNCATETABLE" --- End diff -- If add truncate table in nativeCommands,the sql truncate table test columns(i) will run hive mapreduce.At present, the columns way only support rcfile in hive --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [spark-3586][streaming]Support nested director...
Github user wangxiaojing commented on a diff in the pull request: https://github.com/apache/spark/pull/2765#discussion_r19004975 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala --- @@ -207,6 +220,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas def accept(path: Path): Boolean = { try { +if (fs.getFileStatus(path).isDirectory()){ + return false +} if (!filter(path)) { // Reject file if it does not satisfy filter logDebug("Rejected by filter " + path) return false --- End diff -- Because the logic is not strict ,if not return false,it will continue to run , eventually return to true. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [spark-3907][sql] add truncate table support
Github user wangxiaojing commented on the pull request: https://github.com/apache/spark/pull/2770#issuecomment-59466262 @rxin --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [spark-3586][streaming]Support nested director...
Github user wangxiaojing commented on the pull request: https://github.com/apache/spark/pull/2765#issuecomment-59462998 Hi @jerryshao,It's changing the code to use this parameter to control the searching depth,but if the depth is greater than 1,the ignore time is not reasonable,because if the secondary subdirectories has a new file,the modification time of the first subdirectories is not change.like: The streaming monitor the directory /tmp/ The directory structure is : 2014-10-16 19:17 /tmp/spark1 2014-10-16 19:17 /tmp/spark1/spark2 A files created in /tmp/spark1/spark2 2014-10-16 19:17 /tmp/spark1 2014-10-16 19:18 /tmp/spark1/spark2 2014-10-16 19:18 /tmp/spark1/spark2/file If you use the ignore time to do filtering,the first subdirectories is always ignore,Can you give me some advice? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [spark-3940][sql]sql Print the error code thre...
GitHub user wangxiaojing opened a pull request: https://github.com/apache/spark/pull/2790 [spark-3940][sql]sql Print the error code three times IF wrong sql ï¼the console print error one timesã egï¼ spark-sql> show tabless; show tabless; 14/10/13 21:03:48 INFO ParseDriver: Parsing command: show tabless NoViableAltException(26@[598:1: ddlStatement : ( createDatabaseStatement | switchDatabaseStatement | dropDatabaseStatement | createTableStatement | dropTableStatement | truncateTableStatement | alterStatement | descStatement | showStatement | metastoreCheck | createViewStatement | dropViewStatement | createFunctionStatement | createMacroStatement | createIndexStatement | dropIndexStatement | dropFunctionStatement | dropMacroStatement | analyzeStatement | lockStatement | unlockStatement | createRoleStatement | dropRoleStatement | grantPrivileges | revokePrivileges | showGrants | showRoleGrants | grantRole | revokeRole );]) at org.antlr.runtime.DFA.noViableAlt(DFA.java:158) at org.antlr.runtime.DFA.predict(DFA.java:144) at org.apache.hadoop.hive.ql.parse.HiveParser.ddlStatement(HiveParser.java:1962) at org.apache.hadoop.hive.ql.parse.HiveParser.execStatement(HiveParser.java:1298) at org.apache.hadoop.hive.ql.parse.HiveParser.statement(HiveParser.java:938) at org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:190) at org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:161) at org.apache.spark.sql.hive.HiveQl$.getAst(HiveQl.scala:218) at org.apache.spark.sql.hive.HiveQl$.createPlan(HiveQl.scala:226) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:50) at org.apache.spark.sql.hive.ExtendedHiveQlParser$$anonfun$hiveQl$1.apply(ExtendedHiveQlParser.scala:49) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890) at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:31) at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:130) at org.apache.spark.sql.hive.HiveQl$$anonfun$3.apply(HiveQl.scala:130) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:184) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:183) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:
[GitHub] spark pull request: [spark-3907][sql] add truncate table support
GitHub user wangxiaojing opened a pull request: https://github.com/apache/spark/pull/2770 [spark-3907][sql] add truncate table support JIRA issue: [SPARK-3907]https://issues.apache.org/jira/browse/SPARK-3907 add turncate table support TRUNCATE TABLE table_name [PARTITION partition_spec]; partition_spec: : (partition_col = partition_col_value, partition_col = partiton_col_value, ...) Removes all rows from a table or partition(s). Currently target table should be native/managed table or exception will be thrown. User can specify partial partition_spec for truncating multiple partitions at once and omitting partition_spec will truncate all partitions in the table. You can merge this pull request into a Git repository by running: $ git pull https://github.com/wangxiaojing/spark spark-3907 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2770.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2770 commit 77b1f2022d1a5b287fe43e9823f6d8b7934a969e Author: wangxiaojing Date: 2014-10-11T16:08:26Z add truncate table support --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [spark-3586][streaming]Support nested director...
Github user wangxiaojing commented on a diff in the pull request: https://github.com/apache/spark/pull/2765#discussion_r18740849 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala --- @@ -207,6 +220,9 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas def accept(path: Path): Boolean = { try { +if (fs.getFileStatus(path).isDirectory()){ + return false +} if (!filter(path)) { // Reject file if it does not satisfy filter logDebug("Rejected by filter " + path) return false --- End diff -- Why?if the file is directory ,the file should not consider. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [spark-3586][streaming]Support nested director...
Github user wangxiaojing commented on a diff in the pull request: https://github.com/apache/spark/pull/2765#discussion_r18740834 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala --- @@ -118,6 +119,18 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas (newFiles, filter.minNewFileModTime) } + def getPathList( path:Path, fs:FileSystem):List[Path]={ +val filter = new SubPathFilter() +var pathList = List[Path]() +fs.listStatus(path,filter).map(x=>{ + if(x.isDirectory()){ --- End diff -- Yes,because this only support subdirectoriesï¼because nested all the directoriesï¼processing time is too long --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [spark-3586][streaming]Support nested director...
GitHub user wangxiaojing opened a pull request: https://github.com/apache/spark/pull/2765 [spark-3586][streaming]Support nested directories in Spark Streaming For text files, the method streamingContext.textFileStream(dataDirectory). The improvement of the streaming to Support subdirectories,spark streaming can monitor the subdirectories dataDirectory and process any files created in that directory. eg: streamingContext.textFileStream(/test). Look at the direction contents: /test/file1 /test/file2 /test/dr/file1 if the directory "/test/dr/" have new file "file2" ,spark streaming can process the file You can merge this pull request into a Git repository by running: $ git pull https://github.com/wangxiaojing/spark spark-3586 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2765.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2765 commit 98ead547f90520819b421b0f4436bfe7d8a3d4f4 Author: wangxiaojing Date: 2014-10-11T08:22:31Z Support nested directories in Spark Streaming --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org