Repository: spark
Updated Branches:
  refs/heads/master 32286ba68 -> 4e27578fa


[SPARK-18922][SQL][CORE][STREAMING][TESTS] Fix all identified tests failed due 
to path and resource-not-closed problems on Windows

## What changes were proposed in this pull request?

This PR proposes to fix all the test failures identified by testing with 
AppVeyor.

**Scala - aborted tests**

```
WindowQuerySuite:
  Exception encountered when attempting to run a suite with class name: 
org.apache.spark.sql.hive.execution.WindowQuerySuite *** ABORTED *** (156 
milliseconds)
   org.apache.spark.sql.AnalysisException: LOAD DATA input path does not exist: 
C:projectssparksqlhive   argetscala-2.11   est-classesdatafilespart_tiny.txt;

OrcSourceSuite:
 Exception encountered when attempting to run a suite with class name: 
org.apache.spark.sql.hive.orc.OrcSourceSuite *** ABORTED *** (62 milliseconds)
   org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
MetaException(message:java.lang.IllegalArgumentException: Can not create a Path 
from an empty string);

ParquetMetastoreSuite:
 Exception encountered when attempting to run a suite with class name: 
org.apache.spark.sql.hive.ParquetMetastoreSuite *** ABORTED *** (4 seconds, 703 
milliseconds)
   org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
MetaException(message:java.lang.IllegalArgumentException: Can not create a Path 
from an empty string);

ParquetSourceSuite:
 Exception encountered when attempting to run a suite with class name: 
org.apache.spark.sql.hive.ParquetSourceSuite *** ABORTED *** (3 seconds, 907 
milliseconds)
   org.apache.spark.sql.AnalysisException: Path does not exist: 
file:/C:projectsspark  arget mpspark-581a6575-454f-4f21-a516-a07f95266143;

KafkaRDDSuite:
 Exception encountered when attempting to run a suite with class name: 
org.apache.spark.streaming.kafka.KafkaRDDSuite *** ABORTED *** (5 seconds, 212 
milliseconds)
   java.io.IOException: Failed to delete: 
C:\projects\spark\target\tmp\spark-4722304d-213e-4296-b556-951df1a46807

DirectKafkaStreamSuite:
 Exception encountered when attempting to run a suite with class name: 
org.apache.spark.streaming.kafka.DirectKafkaStreamSuite *** ABORTED *** (7 
seconds, 127 milliseconds)
   java.io.IOException: Failed to delete: 
C:\projects\spark\target\tmp\spark-d0d3eba7-4215-4e10-b40e-bb797e89338e
   at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1010)

ReliableKafkaStreamSuite
 Exception encountered when attempting to run a suite with class name: 
org.apache.spark.streaming.kafka.ReliableKafkaStreamSuite *** ABORTED *** (5 
seconds, 498 milliseconds)
   java.io.IOException: Failed to delete: 
C:\projects\spark\target\tmp\spark-d33e45a0-287e-4bed-acae-ca809a89d888

KafkaStreamSuite:
 Exception encountered when attempting to run a suite with class name: 
org.apache.spark.streaming.kafka.KafkaStreamSuite *** ABORTED *** (2 seconds, 
892 milliseconds)
   java.io.IOException: Failed to delete: 
C:\projects\spark\target\tmp\spark-59c9d169-5a56-4519-9ef0-cefdbd3f2e6c

KafkaClusterSuite:
 Exception encountered when attempting to run a suite with class name: 
org.apache.spark.streaming.kafka.KafkaClusterSuite *** ABORTED *** (1 second, 
690 milliseconds)
   java.io.IOException: Failed to delete: 
C:\projects\spark\target\tmp\spark-3ef402b0-8689-4a60-85ae-e41e274f179d

DirectKafkaStreamSuite:
 Exception encountered when attempting to run a suite with class name: 
org.apache.spark.streaming.kafka010.DirectKafkaStreamSuite *** ABORTED *** (59 
seconds, 626 milliseconds)
   java.io.IOException: Failed to delete: 
C:\projects\spark\target\tmp\spark-426107da-68cf-4d94-b0d6-1f428f1c53f6

KafkaRDDSuite:
Exception encountered when attempting to run a suite with class name: 
org.apache.spark.streaming.kafka010.KafkaRDDSuite *** ABORTED *** (2 minutes, 6 
seconds)
   java.io.IOException: Failed to delete: 
C:\projects\spark\target\tmp\spark-b9ce7929-5dae-46ab-a0c4-9ef6f58fbc2
```

**Java - failed tests**

```
Test org.apache.spark.streaming.kafka.JavaKafkaRDDSuite.testKafkaRDD failed: 
java.io.IOException: Failed to delete: 
C:\projects\spark\target\tmp\spark-1cee32f4-4390-4321-82c9-e8616b3f0fb0, took 
9.61 sec

Test org.apache.spark.streaming.kafka.JavaKafkaStreamSuite.testKafkaStream 
failed: java.io.IOException: Failed to delete: 
C:\projects\spark\target\tmp\spark-f42695dd-242e-4b07-847c-f299b8e4676e, took 
11.797 sec

Test 
org.apache.spark.streaming.kafka.JavaDirectKafkaStreamSuite.testKafkaStream 
failed: java.io.IOException: Failed to delete: 
C:\projects\spark\target\tmp\spark-85c0d062-78cf-459c-a2dd-7973572101ce, took 
1.581 sec

Test org.apache.spark.streaming.kafka010.JavaKafkaRDDSuite.testKafkaRDD failed: 
java.io.IOException: Failed to delete: 
C:\projects\spark\target\tmp\spark-49eb6b5c-8366-47a6-83f2-80c443c48280, took 
17.895 sec

org.apache.spark.streaming.kafka010.JavaDirectKafkaStreamSuite.testKafkaStream 
failed: java.io.IOException: Failed to delete: 
C:\projects\spark\target\tmp\spark-898cf826-d636-4b1c-a61a-c12a364c02e7, took 
8.858 sec
```

**Scala - failed tests**

```
PartitionProviderCompatibilitySuite:
 - insert overwrite partition of new datasource table overwrites just partition 
*** FAILED *** (828 milliseconds)
   java.io.IOException: Failed to delete: 
C:\projects\spark\target\tmp\spark-bb6337b9-4f99-45ab-ad2c-a787ab965c09

 - SPARK-18635 special chars in partition values - partition management true 
*** FAILED *** (5 seconds, 360 milliseconds)
   org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
MetaException(message:java.lang.IllegalArgumentException: Can not create a Path 
from an empty string);

 - SPARK-18635 special chars in partition values - partition management false 
*** FAILED *** (141 milliseconds)
   org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
MetaException(message:java.lang.IllegalArgumentException: Can not create a Path 
from an empty string);
```

```
UtilsSuite:
 - reading offset bytes of a file (compressed) *** FAILED *** (0 milliseconds)
   java.io.IOException: Failed to delete: 
C:\projects\spark\target\tmp\spark-ecb2b7d5-db8b-43a7-b268-1bf242b5a491

 - reading offset bytes across multiple files (compressed) *** FAILED *** (0 
milliseconds)
   java.io.IOException: Failed to delete: 
C:\projects\spark\target\tmp\spark-25cc47a8-1faa-4da5-8862-cf174df63ce0
```

```
StatisticsSuite:
 - MetastoreRelations fallback to HDFS for size estimation *** FAILED *** (110 
milliseconds)
   org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table or view 
'csv_table' not found in database 'default';
```

```
SQLQuerySuite:
 - permanent UDTF *** FAILED *** (125 milliseconds)
   org.apache.spark.sql.AnalysisException: Undefined function: 
'udtf_count_temp'. This function is neither a registered temporary function nor 
a permanent function registered in the database 'default'.; line 1 pos 24

 - describe functions - user defined functions *** FAILED *** (125 milliseconds)
   org.apache.spark.sql.AnalysisException: Undefined function: 'udtf_count'. 
This function is neither a registered temporary function nor a permanent 
function registered in the database 'default'.; line 1 pos 7

 - CTAS without serde with location *** FAILED *** (16 milliseconds)
   java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative 
path in absolute URI: 
file:C:projectsspark%09arget%09mpspark-ed673d73-edfc-404e-829e-2e2b9725d94e/c1

 - derived from Hive query file: drop_database_removes_partition_dirs.q *** 
FAILED *** (47 milliseconds)
   java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative 
path in absolute URI: 
file:C:projectsspark%09arget%09mpspark-d2ddf08e-699e-45be-9ebd-3dfe619680fe/drop_database_removes_partition_dirs_table

 - derived from Hive query file: drop_table_removes_partition_dirs.q *** FAILED 
*** (0 milliseconds)
   java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative 
path in absolute URI: 
file:C:projectsspark%09arget%09mpspark-d2ddf08e-699e-45be-9ebd-3dfe619680fe/drop_table_removes_partition_dirs_table2

 - SPARK-17796 Support wildcard character in filename for LOAD DATA LOCAL 
INPATH *** FAILED *** (109 milliseconds)
   java.nio.file.InvalidPathException: Illegal char <:> at index 2: 
/C:/projects/spark/sql/hive/projectsspark   arget   
mpspark-1a122f8c-dfb3-46c4-bab1-f30764baee0e/*part-r*
```

```
HiveDDLSuite:
 - drop external tables in default database *** FAILED *** (16 milliseconds)
   org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
MetaException(message:java.lang.IllegalArgumentException: Can not create a Path 
from an empty string);

 - add/drop partitions - external table *** FAILED *** (16 milliseconds)
   org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
MetaException(message:java.lang.IllegalArgumentException: Can not create a Path 
from an empty string);

 - create/drop database - location without pre-created directory *** FAILED *** 
(16 milliseconds)
   org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
MetaException(message:java.lang.IllegalArgumentException: Can not create a Path 
from an empty string);

 - create/drop database - location with pre-created directory *** FAILED *** 
(32 milliseconds)
   org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
MetaException(message:java.lang.IllegalArgumentException: Can not create a Path 
from an empty string);

 - drop database containing tables - CASCADE *** FAILED *** (94 milliseconds)
   
CatalogDatabase(db1,,file:/C:/projects/spark/target/tmp/warehouse-d0665ee0-1e39-4805-b471-0b764f7838be/db1.db,Map())
 did not equal 
CatalogDatabase(db1,,file:C:/projects/spark/target/tmp/warehouse-d0665ee0-1e39-4805-b471-0b764f7838be\db1.db,Map())
 (HiveDDLSuite.scala:675)

 - drop an empty database - CASCADE *** FAILED *** (63 milliseconds)
   
CatalogDatabase(db1,,file:/C:/projects/spark/target/tmp/warehouse-d0665ee0-1e39-4805-b471-0b764f7838be/db1.db,Map())
 did not equal 
CatalogDatabase(db1,,file:C:/projects/spark/target/tmp/warehouse-d0665ee0-1e39-4805-b471-0b764f7838be\db1.db,Map())
 (HiveDDLSuite.scala:675)

 - drop database containing tables - RESTRICT *** FAILED *** (47 milliseconds)
   
CatalogDatabase(db1,,file:/C:/projects/spark/target/tmp/warehouse-d0665ee0-1e39-4805-b471-0b764f7838be/db1.db,Map())
 did not equal 
CatalogDatabase(db1,,file:C:/projects/spark/target/tmp/warehouse-d0665ee0-1e39-4805-b471-0b764f7838be\db1.db,Map())
 (HiveDDLSuite.scala:675)

 - drop an empty database - RESTRICT *** FAILED *** (47 milliseconds)
   
CatalogDatabase(db1,,file:/C:/projects/spark/target/tmp/warehouse-d0665ee0-1e39-4805-b471-0b764f7838be/db1.db,Map())
 did not equal 
CatalogDatabase(db1,,file:C:/projects/spark/target/tmp/warehouse-d0665ee0-1e39-4805-b471-0b764f7838be\db1.db,Map())
 (HiveDDLSuite.scala:675)

 - CREATE TABLE LIKE an external data source table *** FAILED *** (140 
milliseconds)
   org.apache.spark.sql.AnalysisException: Path does not exist: 
file:/C:projectsspark   arget   mpspark-c5eba16d-07ae-4186-95bb-21c5811cf888;

 - CREATE TABLE LIKE an external Hive serde table *** FAILED *** (16 
milliseconds)
   org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
MetaException(message:java.lang.IllegalArgumentException: Can not create a Path 
from an empty string);

 - desc table for data source table - no user-defined schema *** FAILED *** 
(125 milliseconds)
   org.apache.spark.sql.AnalysisException: Path does not exist: 
file:/C:projectsspark   arget   mpspark-e8bf5bf5-721a-4cbe-9d6  at 
scala.collection.immutable.List.foreach(List.scala:381)d-5543a8301c1d;
```

```
MetastoreDataSourcesSuite
 - CTAS: persisted bucketed data source table *** FAILED *** (16 milliseconds)
   java.lang.IllegalArgumentException: Can not create a Path from an empty 
string
```

```
ShowCreateTableSuite:
 - simple external hive table *** FAILED *** (0 milliseconds)
   org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
MetaException(message:java.lang.IllegalArgumentException: Can not create a Path 
from an empty string);
```

```
PartitionedTablePerfStatsSuite:
 - hive table: partitioned pruned table reports only selected files *** FAILED 
*** (313 milliseconds)
   org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
MetaException(message:java.lang.IllegalArgumentException: Can not create a Path 
from an empty string);

 - datasource table: partitioned pruned table reports only selected files *** 
FAILED *** (219 milliseconds)
   org.apache.spark.sql.AnalysisException: Path does not exist: 
file:/C:projectsspark   arget   mpspark-311f45f8-d064-4023-a4bb-e28235bff64d;

 - hive table: lazy partition pruning reads only necessary partition data *** 
FAILED *** (203 milliseconds)
   org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
MetaException(message:java.lang.IllegalArgumentException: Can not create a Path 
from an empty string);

 - datasource table: lazy partition pruning reads only necessary partition data 
*** FAILED *** (187 milliseconds)
   org.apache.spark.sql.AnalysisException: Path does not exist: 
file:/C:projectsspark   arget   mpspark-fde874ca-66bd-4d0b-a40f-a043b65bf957;

 - hive table: lazy partition pruning with file status caching enabled *** 
FAILED *** (188 milliseconds)
   org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
MetaException(message:java.lang.IllegalArgumentException: Can not create a Path 
from an empty string);

 - datasource table: lazy partition pruning with file status caching enabled 
*** FAILED *** (187 milliseconds)
   org.apache.spark.sql.AnalysisException: Path does not exist: 
file:/C:projectsspark   arget   mpspark-e6d20183-dd68-4145-acbe-4a509849accd;

 - hive table: file status caching respects refresh table and refreshByPath *** 
FAILED *** (172 milliseconds)
   org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
MetaException(message:java.lang.IllegalArgumentException: Can not create a Path 
from an empty string);

 - datasource table: file status caching respects refresh table and 
refreshByPath *** FAILED *** (203 milliseconds)
   org.apache.spark.sql.AnalysisException: Path does not exist: 
file:/C:projectsspark   arget   mpspark-8b2c9651-2adf-4d58-874f-659007e21463;

 - hive table: file status cache respects size limit *** FAILED *** (219 
milliseconds)
   org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
MetaException(message:java.lang.IllegalArgumentException: Can not create a Path 
from an empty string);

 - datasource table: file status cache respects size limit *** FAILED *** (171 
milliseconds)
   org.apache.spark.sql.AnalysisException: Path does not exist: 
file:/C:projectsspark   arget   mpspark-7835ab57-cb48-4d2c-bb1d-b46d5a4c47e4;

 - datasource table: table setup does not scan filesystem *** FAILED *** (266 
milliseconds)
   org.apache.spark.sql.AnalysisException: Path does not exist: 
file:/C:projectsspark   arget   mpspark-20598d76-c004-42a7-8061-6c56f0eda5e2;

 - hive table: table setup does not scan filesystem *** FAILED *** (266 
milliseconds)
   org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
MetaException(message:java.lang.IllegalArgumentException: Can not create a Path 
from an empty string);

 - hive table: num hive client calls does not scale with partition count *** 
FAILED *** (2 seconds, 281 milliseconds)
   org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
MetaException(message:java.lang.IllegalArgumentException: Can not create a Path 
from an empty string);

 - datasource table: num hive client calls does not scale with partition count 
*** FAILED *** (2 seconds, 422 milliseconds)
   org.apache.spark.sql.AnalysisException: Path does not exist: 
file:/C:projectsspark   arget   mpspark-4cfed321-4d1d-4b48-8d34-5c169afff383;

 - hive table: files read and cached when filesource partition management is 
off *** FAILED *** (234 milliseconds)
   org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
MetaException(message:java.lang.IllegalArgumentException: Can not create a Path 
from an empty string);

 - datasource table: all partition data cached in memory when partition 
management is off *** FAILED *** (203 milliseconds)
   org.apache.spark.sql.AnalysisException: Path does not exist: 
file:/C:projectsspark   arget   mpspark-4bcc0398-15c9-4f6a-811e-12d40f3eec12;

 - SPARK-18700: table loaded only once even when resolved concurrently *** 
FAILED *** (1 second, 266 milliseconds)
   org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
MetaException(message:java.lang.IllegalArgumentException: Can not create a Path 
from an empty string);
```

```
HiveSparkSubmitSuite:
 - temporary Hive UDF: define a UDF and use it *** FAILED *** (2 seconds, 94 
milliseconds)
   java.io.IOException: Cannot run program "./bin/spark-submit" (in directory 
"C:\projects\spark"): CreateProcess error=2, The system cannot find the file 
specified

 - permanent Hive UDF: define a UDF and use it *** FAILED *** (281 milliseconds)
   java.io.IOException: Cannot run program "./bin/spark-submit" (in directory 
"C:\projects\spark"): CreateProcess error=2, The system cannot find the file 
specified

 - permanent Hive UDF: use a already defined permanent function *** FAILED *** 
(718 milliseconds)
   java.io.IOException: Cannot run program "./bin/spark-submit" (in directory 
"C:\projects\spark"): CreateProcess error=2, The system cannot find the file 
specified

 - SPARK-8368: includes jars passed in through --jars *** FAILED *** (3 
seconds, 521 milliseconds)
   java.io.IOException: Cannot run program "./bin/spark-submit" (in directory 
"C:\projects\spark"): CreateProcess error=2, The system cannot find the file 
specified

 - SPARK-8020: set sql conf in spark conf *** FAILED *** (0 milliseconds)
   java.io.IOException: Cannot run program "./bin/spark-submit" (in directory 
"C:\projects\spark"): CreateProcess error=2, The system cannot find the file 
specified

 - SPARK-8489: MissingRequirementError during reflection *** FAILED *** (94 
milliseconds)
   java.io.IOException: Cannot run program "./bin/spark-submit" (in directory 
"C:\projects\spark"): CreateProcess error=2, The system cannot find the file 
specified

 - SPARK-9757 Persist Parquet relation with decimal column *** FAILED *** (16 
milliseconds)
   java.io.IOException: Cannot run program "./bin/spark-submit" (in directory 
"C:\projects\spark"): CreateProcess error=2, The system cannot find the file 
specified

 - SPARK-11009 fix wrong result of Window function in cluster mode *** FAILED 
*** (16 milliseconds)
   java.io.IOException: Cannot run program "./bin/spark-submit" (in directory 
"C:\projects\spark"): CreateProcess error=2, The system cannot find the file 
specified

 - SPARK-14244 fix window partition size attribute binding failure *** FAILED 
*** (78 milliseconds)
   java.io.IOException: Cannot run program "./bin/spark-submit" (in directory 
"C:\projects\spark"): CreateProcess error=2, The system cannot find the file 
specified

 - set spark.sql.warehouse.dir *** FAILED *** (16 milliseconds)
   java.io.IOException: Cannot run program "./bin/spark-submit" (in directory 
"C:\projects\spark"): CreateProcess error=2, The system cannot find the file 
specified

 - set hive.metastore.warehouse.dir *** FAILED *** (15 milliseconds)
   java.io.IOException: Cannot run program "./bin/spark-submit" (in directory 
"C:\projects\spark"): CreateProcess error=2, The system cannot find the file 
specified

 - SPARK-16901: set javax.jdo.option.ConnectionURL *** FAILED *** (16 
milliseconds)
   java.io.IOException: Cannot run program "./bin/spark-submit" (in directory 
"C:\projects\spark"): CreateProcess error=2, The system cannot find the file 
specified

 - SPARK-18360: default table path of tables in default database should depend 
on the location of default database *** FAILED *** (15 milliseconds)
   java.io.IOException: Cannot run program "./bin/spark-submit" (in directory 
"C:\projects\spark"): CreateProcess error=2, The system cannot find the file 
specified
```

```
UtilsSuite:
 - resolveURIs with multiple paths *** FAILED *** (0 milliseconds)
   ".../jar3,file:/C:/pi.py[%23]py.pi,file:/C:/path%..." did not equal 
".../jar3,file:/C:/pi.py[#]py.pi,file:/C:/path%..." (UtilsSuite.scala:468)
```

```
CheckpointSuite:
 - recovery with file input stream *** FAILED *** (10 seconds, 205 milliseconds)
   The code passed to eventually never returned normally. Attempted 660 times 
over 10.014272499999999 seconds. Last failure message: Unexpected internal 
error near index 1
   \
    ^. (CheckpointSuite.scala:680)
```

## How was this patch tested?

Manually via AppVeyor as below:

**Scala - aborted tests**

```
WindowQuerySuite - all passed
OrcSourceSuite:
- SPARK-18220: read Hive orc table with varchar column *** FAILED *** (4 
seconds, 417 milliseconds)
  org.apache.spark.sql.execution.QueryExecutionException: FAILED: Execution 
Error, return code -101 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask. 
org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
  at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$runHive$1.apply(HiveClientImpl.scala:625)
ParquetMetastoreSuite - all passed
ParquetSourceSuite - all passed
KafkaRDDSuite - all passed
DirectKafkaStreamSuite - all passed
ReliableKafkaStreamSuite - all passed
KafkaStreamSuite - all passed
KafkaClusterSuite - all passed
DirectKafkaStreamSuite - all passed
KafkaRDDSuite - all passed
```

**Java - failed tests**

```
org.apache.spark.streaming.kafka.JavaKafkaRDDSuite - all passed
org.apache.spark.streaming.kafka.JavaDirectKafkaStreamSuite - all passed
org.apache.spark.streaming.kafka.JavaKafkaStreamSuite - all passed
org.apache.spark.streaming.kafka010.JavaDirectKafkaStreamSuite - all passed
org.apache.spark.streaming.kafka010.JavaKafkaRDDSuite - all passed
```

**Scala - failed tests**

```
PartitionProviderCompatibilitySuite:
- insert overwrite partition of new datasource table overwrites just partition 
(1 second, 953 milliseconds)
- SPARK-18635 special chars in partition values - partition management true (6 
seconds, 31 milliseconds)
- SPARK-18635 special chars in partition values - partition management false (4 
seconds, 578 milliseconds)
```

```
UtilsSuite:
- reading offset bytes of a file (compressed) (203 milliseconds)
- reading offset bytes across multiple files (compressed) (0 milliseconds)
```

```
StatisticsSuite:
- MetastoreRelations fallback to HDFS for size estimation (94 milliseconds)
```

```
SQLQuerySuite:
 - permanent UDTF (407 milliseconds)
 - describe functions - user defined functions (441 milliseconds)
 - CTAS without serde with location (2 seconds, 831 milliseconds)
 - derived from Hive query file: drop_database_removes_partition_dirs.q (734 
milliseconds)
 - derived from Hive query file: drop_table_removes_partition_dirs.q (563 
milliseconds)
 - SPARK-17796 Support wildcard character in filename for LOAD DATA LOCAL 
INPATH (453 milliseconds)
```

```
HiveDDLSuite:
 - drop external tables in default database (3 seconds, 5 milliseconds)
 - add/drop partitions - external table (2 seconds, 750 milliseconds)
 - create/drop database - location without pre-created directory (500 
milliseconds)
 - create/drop database - location with pre-created directory (407 milliseconds)
 - drop database containing tables - CASCADE (453 milliseconds)
 - drop an empty database - CASCADE (375 milliseconds)
 - drop database containing tables - RESTRICT (328 milliseconds)
 - drop an empty database - RESTRICT (391 milliseconds)
 - CREATE TABLE LIKE an external data source table (953 milliseconds)
 - CREATE TABLE LIKE an external Hive serde table (3 seconds, 782 milliseconds)
 - desc table for data source table - no user-defined schema (1 second, 150 
milliseconds)
```

```
MetastoreDataSourcesSuite
 - CTAS: persisted bucketed data source table (875 milliseconds)
```

```
ShowCreateTableSuite:
 - simple external hive table (78 milliseconds)
```

```
PartitionedTablePerfStatsSuite:
 - hive table: partitioned pruned table reports only selected files (1 second, 
109 milliseconds)
- datasource table: partitioned pruned table reports only selected files (860 
milliseconds)
 - hive table: lazy partition pruning reads only necessary partition data (859 
milliseconds)
 - datasource table: lazy partition pruning reads only necessary partition data 
(1 second, 219 milliseconds)
 - hive table: lazy partition pruning with file status caching enabled (875 
milliseconds)
 - datasource table: lazy partition pruning with file status caching enabled 
(890 milliseconds)
 - hive table: file status caching respects refresh table and refreshByPath 
(922 milliseconds)
 - datasource table: file status caching respects refresh table and 
refreshByPath (640 milliseconds)
 - hive table: file status cache respects size limit (469 milliseconds)
 - datasource table: file status cache respects size limit (453 milliseconds)
 - datasource table: table setup does not scan filesystem (328 milliseconds)
 - hive table: table setup does not scan filesystem (313 milliseconds)
 - hive table: num hive client calls does not scale with partition count (5 
seconds, 431 milliseconds)
 - datasource table: num hive client calls does not scale with partition count 
(4 seconds, 79 milliseconds)
 - hive table: files read and cached when filesource partition management is 
off (656 milliseconds)
 - datasource table: all partition data cached in memory when partition 
management is off (484 milliseconds)
 - SPARK-18700: table loaded only once even when resolved concurrently (2 
seconds, 578 milliseconds)
```

```
HiveSparkSubmitSuite:
 - temporary Hive UDF: define a UDF and use it (1 second, 745 milliseconds)
 - permanent Hive UDF: define a UDF and use it (406 milliseconds)
 - permanent Hive UDF: use a already defined permanent function (375 
milliseconds)
 - SPARK-8368: includes jars passed in through --jars (391 milliseconds)
 - SPARK-8020: set sql conf in spark conf (156 milliseconds)
 - SPARK-8489: MissingRequirementError during reflection (187 milliseconds)
 - SPARK-9757 Persist Parquet relation with decimal column (157 milliseconds)
 - SPARK-11009 fix wrong result of Window function in cluster mode (156 
milliseconds)
 - SPARK-14244 fix window partition size attribute binding failure (156 
milliseconds)
 - set spark.sql.warehouse.dir (172 milliseconds)
 - set hive.metastore.warehouse.dir (156 milliseconds)
 - SPARK-16901: set javax.jdo.option.ConnectionURL (157 milliseconds)
 - SPARK-18360: default table path of tables in default database should depend 
on the location of default database (172 milliseconds)
```

```
UtilsSuite:
 - resolveURIs with multiple paths (0 milliseconds)
```

```
CheckpointSuite:
 - recovery with file input stream (4 seconds, 452 milliseconds)
```

Note: after resolving the aborted tests, there is a test failure identified as 
below:

```
OrcSourceSuite:
- SPARK-18220: read Hive orc table with varchar column *** FAILED *** (4 
seconds, 417 milliseconds)
  org.apache.spark.sql.execution.QueryExecutionException: FAILED: Execution 
Error, return code -101 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask. 
org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
  at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$runHive$1.apply(HiveClientImpl.scala:625)
```

This does not look due to this problem so this PR does not fix it here.

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #16451 from HyukjinKwon/all-path-resource-fixes.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e27578f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e27578f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e27578f

Branch: refs/heads/master
Commit: 4e27578faa67c7a71a9b938aafbaf79bdbf36831
Parents: 32286ba
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Tue Jan 10 13:19:21 2017 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Jan 10 13:19:21 2017 +0000

----------------------------------------------------------------------
 .../scala/org/apache/spark/util/Utils.scala     |  7 ++++-
 .../org/apache/spark/util/UtilsSuite.scala      |  2 +-
 .../spark/sql/kafka010/KafkaTestUtils.scala     | 32 +++++++++++++++++---
 .../streaming/kafka010/KafkaTestUtils.scala     | 32 +++++++++++++++++---
 .../kafka010/DirectKafkaStreamSuite.scala       | 11 ++-----
 .../spark/streaming/kafka/KafkaTestUtils.scala  | 32 +++++++++++++++++---
 .../kafka/DirectKafkaStreamSuite.scala          | 11 ++-----
 .../kafka/ReliableKafkaStreamSuite.scala        |  2 +-
 .../spark/sql/execution/command/tables.scala    | 22 ++++++++++----
 .../apache/spark/sql/hive/test/TestHive.scala   |  2 +-
 .../spark/sql/hive/HiveSparkSubmitSuite.scala   |  8 ++++-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  8 ++---
 .../PartitionProviderCompatibilitySuite.scala   |  2 +-
 .../hive/PartitionedTablePerfStatsSuite.scala   |  4 +--
 .../spark/sql/hive/ShowCreateTableSuite.scala   |  2 +-
 .../apache/spark/sql/hive/StatisticsSuite.scala |  2 +-
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 26 ++++++----------
 .../sql/hive/execution/SQLQuerySuite.scala      | 16 +++++-----
 .../sql/hive/execution/WindowQuerySuite.scala   |  2 +-
 .../spark/sql/hive/orc/OrcSourceSuite.scala     |  6 ++--
 .../apache/spark/sql/hive/parquetSuites.scala   | 28 ++++++++---------
 .../receiver/ReceiverSupervisorImpl.scala       |  6 ++++
 .../spark/streaming/CheckpointSuite.scala       | 14 +++++++--
 23 files changed, 181 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4e27578f/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 0dcf030..2c1d331 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1488,10 +1488,11 @@ private[spark] object Utils extends Logging {
 
   /** Return uncompressed file length of a compressed file. */
   private def getCompressedFileLength(file: File): Long = {
+    var gzInputStream: GZIPInputStream = null
     try {
       // Uncompress .gz file to determine file size.
       var fileSize = 0L
-      val gzInputStream = new GZIPInputStream(new FileInputStream(file))
+      gzInputStream = new GZIPInputStream(new FileInputStream(file))
       val bufSize = 1024
       val buf = new Array[Byte](bufSize)
       var numBytes = ByteStreams.read(gzInputStream, buf, 0, bufSize)
@@ -1504,6 +1505,10 @@ private[spark] object Utils extends Logging {
       case e: Throwable =>
         logError(s"Cannot get file length of ${file}", e)
         throw e
+    } finally {
+      if (gzInputStream != null) {
+        gzInputStream.close()
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4e27578f/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 442a603..6027310 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -505,7 +505,7 @@ class UtilsSuite extends SparkFunSuite with 
ResetSystemProperties with Logging {
       
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4#jar5,file:$cwd/path%20to/jar6")
     if (Utils.isWindows) {
       assertResolves("""hdfs:/jar1,file:/jar2,jar3,C:\pi.py#py.pi,C:\path 
to\jar4""",
-        
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi,file:/C:/path%20to/jar4")
+        
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py%23py.pi,file:/C:/path%20to/jar4")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4e27578f/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index fd1689a..7e60410 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.kafka010
 
-import java.io.File
+import java.io.{File, IOException}
 import java.lang.{Integer => JInt}
 import java.net.InetSocketAddress
 import java.util.{Map => JMap, Properties}
@@ -138,10 +138,21 @@ class KafkaTestUtils extends Logging {
 
     if (server != null) {
       server.shutdown()
+      server.awaitShutdown()
       server = null
     }
 
-    brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
+    // On Windows, `logDirs` is left open even after Kafka server above is 
completely shut down
+    // in some cases. It leads to test failures on Windows if the directory 
deletion failure
+    // throws an exception.
+    brokerConf.logDirs.foreach { f =>
+      try {
+        Utils.deleteRecursively(new File(f))
+      } catch {
+        case e: IOException if Utils.isWindows =>
+          logWarning(e.getMessage)
+      }
+    }
 
     if (zkUtils != null) {
       zkUtils.close()
@@ -374,8 +385,21 @@ class KafkaTestUtils extends Logging {
 
     def shutdown() {
       factory.shutdown()
-      Utils.deleteRecursively(snapshotDir)
-      Utils.deleteRecursively(logDir)
+      // The directories are not closed even if the ZooKeeper server is shut 
down.
+      // Please see ZOOKEEPER-1844, which is fixed in 3.4.6+. It leads to test 
failures
+      // on Windows if the directory deletion failure throws an exception.
+      try {
+        Utils.deleteRecursively(snapshotDir)
+      } catch {
+        case e: IOException if Utils.isWindows =>
+          logWarning(e.getMessage)
+      }
+      try {
+        Utils.deleteRecursively(logDir)
+      } catch {
+        case e: IOException if Utils.isWindows =>
+          logWarning(e.getMessage)
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4e27578f/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index e73823e..8273c2b 100644
--- 
a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ 
b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.kafka010
 
-import java.io.File
+import java.io.{File, IOException}
 import java.lang.{Integer => JInt}
 import java.net.InetSocketAddress
 import java.util.{Map => JMap, Properties}
@@ -134,10 +134,21 @@ private[kafka010] class KafkaTestUtils extends Logging {
 
     if (server != null) {
       server.shutdown()
+      server.awaitShutdown()
       server = null
     }
 
-    brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
+    // On Windows, `logDirs` is left open even after Kafka server above is 
completely shut down
+    // in some cases. It leads to test failures on Windows if the directory 
deletion failure
+    // throws an exception.
+    brokerConf.logDirs.foreach { f =>
+      try {
+        Utils.deleteRecursively(new File(f))
+      } catch {
+        case e: IOException if Utils.isWindows =>
+          logWarning(e.getMessage)
+      }
+    }
 
     if (zkUtils != null) {
       zkUtils.close()
@@ -273,8 +284,21 @@ private[kafka010] class KafkaTestUtils extends Logging {
 
     def shutdown() {
       factory.shutdown()
-      Utils.deleteRecursively(snapshotDir)
-      Utils.deleteRecursively(logDir)
+      // The directories are not closed even if the ZooKeeper server is shut 
down.
+      // Please see ZOOKEEPER-1844, which is fixed in 3.4.6+. It leads to test 
failures
+      // on Windows if the directory deletion failure throws an exception.
+      try {
+        Utils.deleteRecursively(snapshotDir)
+      } catch {
+        case e: IOException if Utils.isWindows =>
+          logWarning(e.getMessage)
+      }
+      try {
+        Utils.deleteRecursively(logDir)
+      } catch {
+        case e: IOException if Utils.isWindows =>
+          logWarning(e.getMessage)
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4e27578f/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index fde3714..88a312a 100644
--- 
a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++ 
b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -53,7 +53,6 @@ class DirectKafkaStreamSuite
     .setMaster("local[4]")
     .setAppName(this.getClass.getSimpleName)
 
-  private var sc: SparkContext = _
   private var ssc: StreamingContext = _
   private var testDir: File = _
 
@@ -73,11 +72,7 @@ class DirectKafkaStreamSuite
 
   after {
     if (ssc != null) {
-      ssc.stop()
-      sc = null
-    }
-    if (sc != null) {
-      sc.stop()
+      ssc.stop(stopSparkContext = true)
     }
     if (testDir != null) {
       Utils.deleteRecursively(testDir)
@@ -372,7 +367,7 @@ class DirectKafkaStreamSuite
       sendData(i)
     }
 
-    eventually(timeout(10 seconds), interval(50 milliseconds)) {
+    eventually(timeout(20 seconds), interval(50 milliseconds)) {
       assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum)
     }
 
@@ -411,7 +406,7 @@ class DirectKafkaStreamSuite
       sendData(i)
     }
 
-    eventually(timeout(10 seconds), interval(50 milliseconds)) {
+    eventually(timeout(20 seconds), interval(50 milliseconds)) {
       assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum)
     }
     ssc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/4e27578f/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
index 03c9ca7..ef19685 100644
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.kafka
 
-import java.io.File
+import java.io.{File, IOException}
 import java.lang.{Integer => JInt}
 import java.net.InetSocketAddress
 import java.util.{Map => JMap, Properties}
@@ -137,10 +137,21 @@ private[kafka] class KafkaTestUtils extends Logging {
 
     if (server != null) {
       server.shutdown()
+      server.awaitShutdown()
       server = null
     }
 
-    brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
+    // On Windows, `logDirs` is left open even after Kafka server above is 
completely shut down
+    // in some cases. It leads to test failures on Windows if the directory 
deletion failure
+    // throws an exception.
+    brokerConf.logDirs.foreach { f =>
+      try {
+        Utils.deleteRecursively(new File(f))
+      } catch {
+        case e: IOException if Utils.isWindows =>
+          logWarning(e.getMessage)
+      }
+    }
 
     if (zkClient != null) {
       zkClient.close()
@@ -268,8 +279,21 @@ private[kafka] class KafkaTestUtils extends Logging {
 
     def shutdown() {
       factory.shutdown()
-      Utils.deleteRecursively(snapshotDir)
-      Utils.deleteRecursively(logDir)
+      // The directories are not closed even if the ZooKeeper server is shut 
down.
+      // Please see ZOOKEEPER-1844, which is fixed in 3.4.6+. It leads to test 
failures
+      // on Windows if the directory deletion failure throws an exception.
+      try {
+        Utils.deleteRecursively(snapshotDir)
+      } catch {
+        case e: IOException if Utils.isWindows =>
+          logWarning(e.getMessage)
+      }
+      try {
+        Utils.deleteRecursively(logDir)
+      } catch {
+        case e: IOException if Utils.isWindows =>
+          logWarning(e.getMessage)
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4e27578f/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 
b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index 8a747a5..f8b3407 100644
--- 
a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ 
b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -52,7 +52,6 @@ class DirectKafkaStreamSuite
     .setMaster("local[4]")
     .setAppName(this.getClass.getSimpleName)
 
-  private var sc: SparkContext = _
   private var ssc: StreamingContext = _
   private var testDir: File = _
 
@@ -72,11 +71,7 @@ class DirectKafkaStreamSuite
 
   after {
     if (ssc != null) {
-      ssc.stop()
-      sc = null
-    }
-    if (sc != null) {
-      sc.stop()
+      ssc.stop(stopSparkContext = true)
     }
     if (testDir != null) {
       Utils.deleteRecursively(testDir)
@@ -276,7 +271,7 @@ class DirectKafkaStreamSuite
       sendData(i)
     }
 
-    eventually(timeout(10 seconds), interval(50 milliseconds)) {
+    eventually(timeout(20 seconds), interval(50 milliseconds)) {
       assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum)
     }
 
@@ -319,7 +314,7 @@ class DirectKafkaStreamSuite
       sendData(i)
     }
 
-    eventually(timeout(10 seconds), interval(50 milliseconds)) {
+    eventually(timeout(20 seconds), interval(50 milliseconds)) {
       assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum)
     }
     ssc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/4e27578f/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
 
b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
index 7b9aee3..57f89cc 100644
--- 
a/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
+++ 
b/external/kafka-0-8/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
@@ -80,7 +80,7 @@ class ReliableKafkaStreamSuite extends SparkFunSuite
 
   after {
     if (ssc != null) {
-      ssc.stop()
+      ssc.stop(stopSparkContext = true)
       ssc = null
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4e27578f/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 012b6ea..ac6c3a8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable.ArrayBuffer
 import scala.util.control.NonFatal
 import scala.util.Try
 
+import org.apache.commons.lang3.StringEscapeUtils
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
@@ -222,25 +223,34 @@ case class LoadDataCommand(
     val loadPath =
       if (isLocal) {
         val uri = Utils.resolveURI(path)
-        val filePath = uri.getPath()
-        val exists = if (filePath.contains("*")) {
+        val file = new File(uri.getPath)
+        val exists = if (file.getAbsolutePath.contains("*")) {
           val fileSystem = FileSystems.getDefault
-          val pathPattern = fileSystem.getPath(filePath)
-          val dir = pathPattern.getParent.toString
+          val dir = file.getParentFile.getAbsolutePath
           if (dir.contains("*")) {
             throw new AnalysisException(
               s"LOAD DATA input path allows only filename wildcard: $path")
           }
 
+          // Note that special characters such as "*" on Windows are not 
allowed as a path.
+          // Calling `WindowsFileSystem.getPath` throws an exception if there 
are in the path.
+          val dirPath = fileSystem.getPath(dir)
+          val pathPattern = new File(dirPath.toAbsolutePath.toString, 
file.getName).toURI.getPath
+          val safePathPattern = if (Utils.isWindows) {
+            // On Windows, the pattern should not start with slashes for 
absolute file paths.
+            pathPattern.stripPrefix("/")
+          } else {
+            pathPattern
+          }
           val files = new File(dir).listFiles()
           if (files == null) {
             false
           } else {
-            val matcher = fileSystem.getPathMatcher("glob:" + 
pathPattern.toAbsolutePath)
+            val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern)
             files.exists(f => 
matcher.matches(fileSystem.getPath(f.getAbsolutePath)))
           }
         } else {
-          new File(filePath).exists()
+          new File(file.getAbsolutePath).exists()
         }
         if (!exists) {
           throw new AnalysisException(s"LOAD DATA input path does not exist: 
$path")

http://git-wip-us.apache.org/repos/asf/spark/blob/4e27578f/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index a8dd510..dcb8e49 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -165,7 +165,7 @@ private[hive] class TestHiveSparkSession(
   System.clearProperty("spark.hostPort")
 
   // For some hive test case which contain ${system:test.tmp.dir}
-  System.setProperty("test.tmp.dir", Utils.createTempDir().getCanonicalPath)
+  System.setProperty("test.tmp.dir", Utils.createTempDir().toURI.getPath)
 
   /** The location of the compiled hive distribution */
   lazy val hiveHome = envVarToFile("HIVE_HOME")

http://git-wip-us.apache.org/repos/asf/spark/blob/4e27578f/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index 9aa9ebf..8f0d5d8 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -339,7 +339,13 @@ class HiveSparkSubmitSuite
   private def runSparkSubmit(args: Seq[String]): Unit = {
     val sparkHome = sys.props.getOrElse("spark.test.home", 
fail("spark.test.home is not set!"))
     val history = ArrayBuffer.empty[String]
-    val commands = Seq("./bin/spark-submit") ++ args
+    val sparkSubmit = if (Utils.isWindows) {
+      // On Windows, `ProcessBuilder.directory` does not change the current 
working directory.
+      new File("..\\..\\bin\\spark-submit.cmd").getAbsolutePath
+    } else {
+      "./bin/spark-submit"
+    }
+    val commands = Seq(sparkSubmit) ++ args
     val commandLine = commands.mkString("'", "' '", "'")
 
     val builder = new ProcessBuilder(commands: _*).directory(new 
File(sparkHome))

http://git-wip-us.apache.org/repos/asf/spark/blob/4e27578f/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 13ef79e..081f6f6 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1071,11 +1071,9 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
   test("CTAS: persisted bucketed data source table") {
     withTempPath { dir =>
       withTable("t") {
-        val path = dir.getCanonicalPath
-
         sql(
           s"""CREATE TABLE t USING PARQUET
-             |OPTIONS (PATH '$path')
+             |OPTIONS (PATH '${dir.toURI}')
              |CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS
              |AS SELECT 1 AS a, 2 AS b
            """.stripMargin
@@ -1093,11 +1091,9 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
 
     withTempPath { dir =>
       withTable("t") {
-        val path = dir.getCanonicalPath
-
         sql(
           s"""CREATE TABLE t USING PARQUET
-             |OPTIONS (PATH '$path')
+             |OPTIONS (PATH '${dir.toURI}')
              |CLUSTERED BY (a) INTO 2 BUCKETS
              |AS SELECT 1 AS a, 2 AS b
            """.stripMargin

http://git-wip-us.apache.org/repos/asf/spark/blob/4e27578f/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
index 44233cf..dca207a 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
@@ -172,7 +172,7 @@ class PartitionProviderCompatibilitySuite
           withTempDir { dir2 =>
             sql(
               s"""alter table test partition (partCol=1)
-                |set location '${dir2.getAbsolutePath}'""".stripMargin)
+                |set location '${dir2.toURI}'""".stripMargin)
             assert(sql("select * from test").count() == 4)
             sql(
               """insert overwrite table test

http://git-wip-us.apache.org/repos/asf/spark/blob/4e27578f/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index 55b72c6..70750c4 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
@@ -77,7 +77,7 @@ class PartitionedTablePerfStatsSuite
       |create external table $tableName (fieldOne long)
       |partitioned by (partCol1 int, partCol2 int)
       |stored as parquet
-      |location "${dir.getAbsolutePath}"""".stripMargin)
+      |location "${dir.toURI}"""".stripMargin)
     if (repair) {
       spark.sql(s"msck repair table $tableName")
     }
@@ -102,7 +102,7 @@ class PartitionedTablePerfStatsSuite
     spark.sql(s"""
       |create table $tableName (fieldOne long, partCol1 int, partCol2 int)
       |using parquet
-      |options (path "${dir.getAbsolutePath}")
+      |options (path "${dir.toURI}")
       |partitioned by (partCol1, partCol2)""".stripMargin)
     if (repair) {
       spark.sql(s"msck repair table $tableName")

http://git-wip-us.apache.org/repos/asf/spark/blob/4e27578f/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
index 68df809..cc26b32 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
@@ -146,7 +146,7 @@ class ShowCreateTableSuite extends QueryTest with 
SQLTestUtils with TestHiveSing
              |  c1 INT COMMENT 'bla',
              |  c2 STRING
              |)
-             |LOCATION '$dir'
+             |LOCATION '${dir.toURI}'
              |TBLPROPERTIES (
              |  'prop1' = 'value1',
              |  'prop2' = 'value2'

http://git-wip-us.apache.org/repos/asf/spark/blob/4e27578f/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 8803ea3..b040f26 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -57,7 +57,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase 
with TestHiveSingleto
               \"separatorChar\" = \",\",
               \"quoteChar\"     = \"\\\"\",
               \"escapeChar\"    = \"\\\\\")
-            LOCATION '$tempDir'
+            LOCATION '${tempDir.toURI}'
           """)
 
         spark.conf.set(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key, true)

http://git-wip-us.apache.org/repos/asf/spark/blob/4e27578f/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 7728528..0af331e 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -87,7 +87,7 @@ class HiveDDLSuite
           s"""
              |create table $tabName
              |stored as parquet
-             |location '$tmpDir'
+             |location '${tmpDir.toURI}'
              |as select 1, '3'
           """.stripMargin)
 
@@ -269,7 +269,7 @@ class HiveDDLSuite
           s"""
              |CREATE EXTERNAL TABLE $externalTab (key INT, value STRING)
              |PARTITIONED BY (ds STRING, hr STRING)
-             |LOCATION '$basePath'
+             |LOCATION '${tmpDir.toURI}'
           """.stripMargin)
 
         // Before data insertion, all the directory are empty
@@ -678,14 +678,10 @@ class HiveDDLSuite
       } else {
         assert(!fs.exists(new Path(tmpDir.toString)))
       }
-      sql(s"CREATE DATABASE $dbName Location '$tmpDir'")
+      sql(s"CREATE DATABASE $dbName Location 
'${tmpDir.toURI.getPath.stripSuffix("/")}'")
       val db1 = catalog.getDatabaseMetadata(dbName)
-      val dbPath = "file:" + tmpDir
-      assert(db1 == CatalogDatabase(
-        dbName,
-        "",
-        if (dbPath.endsWith(File.separator)) dbPath.dropRight(1) else dbPath,
-        Map.empty))
+      val dbPath = tmpDir.toURI.toString.stripSuffix("/")
+      assert(db1 == CatalogDatabase(dbName, "", dbPath, Map.empty))
       sql("USE db1")
 
       sql(s"CREATE TABLE $tabName as SELECT 1")
@@ -713,10 +709,6 @@ class HiveDDLSuite
     }
   }
 
-  private def appendTrailingSlash(path: String): String = {
-    if (!path.endsWith(File.separator)) path + File.separator else path
-  }
-
   private def dropDatabase(cascade: Boolean, tableExists: Boolean): Unit = {
     val dbName = "db1"
     val dbPath = new Path(spark.sessionState.conf.warehousePath)
@@ -724,7 +716,7 @@ class HiveDDLSuite
 
     sql(s"CREATE DATABASE $dbName")
     val catalog = spark.sessionState.catalog
-    val expectedDBLocation = "file:" + appendTrailingSlash(dbPath.toString) + 
s"$dbName.db"
+    val expectedDBLocation = 
s"file:${dbPath.toUri.getPath.stripSuffix("/")}/$dbName.db"
     val db1 = catalog.getDatabaseMetadata(dbName)
     assert(db1 == CatalogDatabase(
       dbName,
@@ -857,7 +849,7 @@ class HiveDDLSuite
         val path = dir.getCanonicalPath
         spark.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
           .write.format("parquet").save(path)
-        sql(s"CREATE TABLE $sourceTabName USING parquet OPTIONS (PATH 
'$path')")
+        sql(s"CREATE TABLE $sourceTabName USING parquet OPTIONS (PATH 
'${dir.toURI}')")
         sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName")
 
         // The source table should be an external data source table
@@ -894,7 +886,7 @@ class HiveDDLSuite
   test("CREATE TABLE LIKE an external Hive serde table") {
     val catalog = spark.sessionState.catalog
     withTempDir { tmpDir =>
-      val basePath = tmpDir.getCanonicalPath
+      val basePath = tmpDir.toURI
       val sourceTabName = "tab1"
       val targetTabName = "tab2"
       withTable(sourceTabName, targetTabName) {
@@ -1112,7 +1104,7 @@ class HiveDDLSuite
     Seq("parquet", "json", "orc").foreach { fileFormat =>
       withTable("t1") {
         withTempPath { dir =>
-          val path = dir.getCanonicalPath
+          val path = dir.toURI.toString
           spark.range(1).write.format(fileFormat).save(path)
           sql(s"CREATE TABLE t1 USING $fileFormat OPTIONS (PATH '$path')")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4e27578f/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index f65b5f4..f47cf4a 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -126,7 +126,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
         s"""
           |CREATE FUNCTION udtf_count_temp
           |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
-          |USING JAR 
'${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}'
+          |USING JAR '${hiveContext.getHiveFile("TestUDTF.jar").toURI}'
         """.stripMargin)
 
       checkAnswer(
@@ -321,7 +321,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
         s"""
            |CREATE FUNCTION udtf_count
            |AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
-           |USING JAR 
'${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}'
+           |USING JAR '${hiveContext.getHiveFile("TestUDTF.jar").toURI}'
         """.stripMargin)
 
       checkKeywordsExist(sql("describe function udtf_count"),
@@ -644,7 +644,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
       withTempDir { dir =>
         val defaultDataSource = sessionState.conf.defaultDataSourceName
 
-        val tempLocation = dir.getCanonicalPath
+        val tempLocation = dir.toURI.getPath.stripSuffix("/")
         sql(s"CREATE TABLE ctas1 LOCATION 'file:$tempLocation/c1'" +
           " AS SELECT key k, value FROM src ORDER BY k, value")
         checkRelation("ctas1", true, defaultDataSource, 
Some(s"file:$tempLocation/c1"))
@@ -1953,16 +1953,18 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
 
   test("SPARK-17796 Support wildcard character in filename for LOAD DATA LOCAL 
INPATH") {
     withTempDir { dir =>
+      val path = dir.toURI.toString.stripSuffix("/")
+      val dirPath = dir.getAbsoluteFile
       for (i <- 1 to 3) {
-        Files.write(s"$i", new File(s"$dir/part-r-0000$i"), 
StandardCharsets.UTF_8)
+        Files.write(s"$i", new File(dirPath, s"part-r-0000$i"), 
StandardCharsets.UTF_8)
       }
       for (i <- 5 to 7) {
-        Files.write(s"$i", new File(s"$dir/part-s-0000$i"), 
StandardCharsets.UTF_8)
+        Files.write(s"$i", new File(dirPath, s"part-s-0000$i"), 
StandardCharsets.UTF_8)
       }
 
       withTable("load_t") {
         sql("CREATE TABLE load_t (a STRING)")
-        sql(s"LOAD DATA LOCAL INPATH '$dir/*part-r*' INTO TABLE load_t")
+        sql(s"LOAD DATA LOCAL INPATH '$path/*part-r*' INTO TABLE load_t")
         checkAnswer(sql("SELECT * FROM load_t"), Seq(Row("1"), Row("2"), 
Row("3")))
 
         val m = intercept[AnalysisException] {
@@ -1971,7 +1973,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
         assert(m.contains("LOAD DATA input path does not exist"))
 
         val m2 = intercept[AnalysisException] {
-          sql(s"LOAD DATA LOCAL INPATH '$dir*/*part*' INTO TABLE load_t")
+          sql(s"LOAD DATA LOCAL INPATH '$path*/*part*' INTO TABLE load_t")
         }.getMessage
         assert(m2.contains("LOAD DATA input path allows only filename 
wildcard"))
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/4e27578f/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala
index 0ff3511..a20c758 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/WindowQuerySuite.scala
@@ -43,7 +43,7 @@ class WindowQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleto
         |  p_retailprice DOUBLE,
         |  p_comment STRING)
       """.stripMargin)
-    val testData1 = 
TestHive.getHiveFile("data/files/part_tiny.txt").getCanonicalPath
+    val testData1 = TestHive.getHiveFile("data/files/part_tiny.txt").toURI
     sql(
       s"""
          |LOAD DATA LOCAL INPATH '$testData1' overwrite into table part

http://git-wip-us.apache.org/repos/asf/spark/blob/4e27578f/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
index 2b40469..fe1e17d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala
@@ -57,7 +57,7 @@ abstract class OrcSuite extends QueryTest with 
TestHiveSingleton with BeforeAndA
          |  stringField STRING
          |)
          |STORED AS ORC
-         |LOCATION '${orcTableAsDir.getCanonicalPath}'
+         |LOCATION '${orcTableAsDir.toURI}'
        """.stripMargin)
 
     sql(
@@ -172,7 +172,7 @@ class OrcSourceSuite extends OrcSuite {
       s"""CREATE TEMPORARY VIEW normal_orc_source
          |USING org.apache.spark.sql.hive.orc
          |OPTIONS (
-         |  PATH '${new File(orcTableAsDir.getAbsolutePath).getCanonicalPath}'
+         |  PATH '${new File(orcTableAsDir.getAbsolutePath).toURI}'
          |)
        """.stripMargin)
 
@@ -180,7 +180,7 @@ class OrcSourceSuite extends OrcSuite {
       s"""CREATE TEMPORARY VIEW normal_orc_as_source
          |USING org.apache.spark.sql.hive.orc
          |OPTIONS (
-         |  PATH '${new File(orcTableAsDir.getAbsolutePath).getCanonicalPath}'
+         |  PATH '${new File(orcTableAsDir.getAbsolutePath).toURI}'
          |)
        """.stripMargin)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4e27578f/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 2ce60fe..d3e04d4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -81,7 +81,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
        STORED AS
        INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
        OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
-      location '${partitionedTableDir.getCanonicalPath}'
+      location '${partitionedTableDir.toURI}'
     """)
 
     sql(s"""
@@ -95,7 +95,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
        STORED AS
        INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
        OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
-      location '${partitionedTableDirWithKey.getCanonicalPath}'
+      location '${partitionedTableDirWithKey.toURI}'
     """)
 
     sql(s"""
@@ -108,7 +108,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest 
{
        STORED AS
        INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
        OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
-      location '${new File(normalTableDir, "normal").getCanonicalPath}'
+      location '${new File(normalTableDir, "normal").toURI}'
     """)
 
     sql(s"""
@@ -124,7 +124,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest 
{
        STORED AS
        INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
        OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
-      LOCATION '${partitionedTableDirWithComplexTypes.getCanonicalPath}'
+      LOCATION '${partitionedTableDirWithComplexTypes.toURI}'
     """)
 
     sql(s"""
@@ -140,7 +140,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest 
{
        STORED AS
        INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
        OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
-      LOCATION '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
+      LOCATION '${partitionedTableDirWithKeyAndComplexTypes.toURI}'
     """)
 
     sql(
@@ -561,7 +561,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest 
{
   test("SPARK-15248: explicitly added partitions should be readable") {
     withTable("test_added_partitions", "test_temp") {
       withTempDir { src =>
-        val partitionDir = new File(src, "partition").getCanonicalPath
+        val partitionDir = new File(src, "partition").toURI
         sql(
           """
             |CREATE TABLE test_added_partitions (a STRING)
@@ -636,7 +636,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
       CREATE TEMPORARY VIEW partitioned_parquet
       USING org.apache.spark.sql.parquet
       OPTIONS (
-        path '${partitionedTableDir.getCanonicalPath}'
+        path '${partitionedTableDir.toURI}'
       )
     """)
 
@@ -644,7 +644,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
       CREATE TEMPORARY VIEW partitioned_parquet_with_key
       USING org.apache.spark.sql.parquet
       OPTIONS (
-        path '${partitionedTableDirWithKey.getCanonicalPath}'
+        path '${partitionedTableDirWithKey.toURI}'
       )
     """)
 
@@ -652,7 +652,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
       CREATE TEMPORARY VIEW normal_parquet
       USING org.apache.spark.sql.parquet
       OPTIONS (
-        path '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
+        path '${new File(partitionedTableDir, "p=1").toURI}'
       )
     """)
 
@@ -660,7 +660,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
       CREATE TEMPORARY VIEW partitioned_parquet_with_key_and_complextypes
       USING org.apache.spark.sql.parquet
       OPTIONS (
-        path '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
+        path '${partitionedTableDirWithKeyAndComplexTypes.toURI}'
       )
     """)
 
@@ -668,7 +668,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
       CREATE TEMPORARY VIEW partitioned_parquet_with_complextypes
       USING org.apache.spark.sql.parquet
       OPTIONS (
-        path '${partitionedTableDirWithComplexTypes.getCanonicalPath}'
+        path '${partitionedTableDirWithComplexTypes.toURI}'
       )
     """)
   }
@@ -701,8 +701,6 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
 
   test("SPARK-8811: compatibility with array of struct in Hive") {
     withTempPath { dir =>
-      val path = dir.getCanonicalPath
-
       withTable("array_of_struct") {
         val conf = Seq(
           HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
@@ -712,7 +710,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
         withSQLConf(conf: _*) {
           sql(
             s"""CREATE TABLE array_of_struct
-               |STORED AS PARQUET LOCATION '$path'
+               |STORED AS PARQUET LOCATION '${dir.toURI}'
                |AS SELECT
                |  '1st' AS a,
                |  '2nd' AS b,
@@ -720,7 +718,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
              """.stripMargin)
 
           checkAnswer(
-            spark.read.parquet(path),
+            spark.read.parquet(dir.getCanonicalPath),
             Row("1st", "2nd", Seq(Row("val_a", "val_b"))))
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/4e27578f/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 5ba09a5..eca7c79 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -175,6 +175,12 @@ private[streaming] class ReceiverSupervisorImpl(
   }
 
   override protected def onStop(message: String, error: Option[Throwable]) {
+    receivedBlockHandler match {
+      case handler: WriteAheadLogBasedBlockHandler =>
+        // Write ahead log should be closed.
+        handler.stop()
+      case _ =>
+    }
     registeredBlockGenerators.asScala.foreach { _.stop() }
     env.rpcEnv.stop(endpoint)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4e27578f/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index a1e9d1e..7fcf45e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming
 
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, 
ObjectOutputStream}
+import java.io._
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.ConcurrentLinkedQueue
 
@@ -629,7 +629,7 @@ class CheckpointSuite extends TestSuiteBase with 
DStreamCheckpointTester
         ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, 
_]]
       val filenames = fileInputDStream.batchTimeToSelectedFiles.synchronized
          { fileInputDStream.batchTimeToSelectedFiles.values.flatten }
-      filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
+      filenames.map(_.split("/").last.toInt).toSeq.sorted
     }
 
     try {
@@ -755,7 +755,15 @@ class CheckpointSuite extends TestSuiteBase with 
DStreamCheckpointTester
         assert(outputBuffer.asScala.flatten.toSet === expectedOutput.toSet)
       }
     } finally {
-      Utils.deleteRecursively(testDir)
+      try {
+        // As the driver shuts down in the middle of processing and the thread 
above sleeps
+        // for a while, `testDir` can be not closed correctly at this point 
which causes the
+        // test failure on Windows.
+        Utils.deleteRecursively(testDir)
+      } catch {
+        case e: IOException if Utils.isWindows =>
+          logWarning(e.getMessage)
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to