[GitHub] spark pull request #21554: [SPARK-24546][SQL] InsertIntoDataSourceCommand ma...

2018-06-15 Thread zheh12
Github user zheh12 closed the pull request at:

https://github.com/apache/spark/pull/21554


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21554: [SPARK-24546][SQL] InsertIntoDataSourceCommand make data...

2018-06-15 Thread zheh12
Github user zheh12 commented on the issue:

https://github.com/apache/spark/pull/21554
  
I know this sql standard.

But I wonder If I use `query.schema`, how it will affect the logic of 
by-position.

I think we should let datasource implement has the ability to decide use 
by-position or by-name.

As the implement of kudu-spark, it decides to use by-name with this map

```
val indices: Array[(Int, Int)] = schema.fields.zipWithIndex.map({ case 
(field, sparkIdx) =>
  sparkIdx -> table.getSchema.getColumnIndex(field.name)
})
```

But now we give a wrong shcmea, it always be something like (0,0), (1,1), 
it always be by-position.

But I think this code want to be by-name. Beacuse kudu schema must put 
primary key first, so it always has different order from other table schema.

When create dataframe with `query.schema`, there will no error by-position, 
but add the possibility to let
datasource to choose by-name or by-position.

But now the datasource must be by-position.

And more, As a developer, I choose to implement InsertableRelation
```
trait InsertableRelation {
  def insert(data: DataFrame, overwrite: Boolean): Unit
}
```

I have the possibility get the wrong schema, and I can't find nothing wrong 
with the dataframe.

@cloud-fan What I think is right?




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21554: [SPARK-24546][SQL] InsertIntoDataSourceCommand make data...

2018-06-13 Thread zheh12
Github user zheh12 commented on the issue:

https://github.com/apache/spark/pull/21554
  
cc @cloud-fan @jiangxb1987, please give me some advise.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21554: [SPARK-24546] InsertIntoDataSourceCommand make da...

2018-06-13 Thread zheh12
GitHub user zheh12 opened a pull request:

https://github.com/apache/spark/pull/21554

[SPARK-24546] InsertIntoDataSourceCommand make data frame with wrong schema 
when use kudu.


## What changes were proposed in this pull request?

I have a hdfs table
```
hdfs_table(a int,b int,c int)
```
then I have a kudu table
```
kudu_table(b int primary key, a int, c int)
```

I want to insert kudu_table
```
insert into kudu_table select * from hdfs_table
```

But the data in kudu is misordered.

I think the reason is the line code 

```
val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, 
logicalRelation.schema)
```
I think the code no check and can break the law

> the row data must with the right schema

When the logicalRelation like kudu with different order schema, we should 
let the kudu code to process the convert as the kudu do like this.

```
val table: KuduTable = syncClient.openTable(tableName)
val indices: Array[(Int, Int)] = schema.fields.zipWithIndex.map({ case 
(field, sparkIdx) =>
  sparkIdx -> table.getSchema.getColumnIndex(field.name)
})
```

So I suggest create data frame with query schema, and write some convert 
code outside spark sql.

## How was this patch tested?

I test with spark-2.3 and kudu


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zheh12/spark SPARK-24546

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21554.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21554


commit bc92fcfbd226468960574c487e8be48bc58bb67d
Author: yangz 
Date:   2018-06-13T10:44:46Z

[SPARK-24546] InsertIntoDataSourceCommand make data frame with wrong schema




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...

2018-05-18 Thread zheh12
Github user zheh12 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21257#discussion_r189422931
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -163,6 +170,15 @@ class HadoopMapReduceCommitProtocol(
   }
 
   override def commitJob(jobContext: JobContext, taskCommits: 
Seq[TaskCommitMessage]): Unit = {
+// first delete the should delete special file
+for (fs <- pathsToDelete.keys) {
+  for (path <- pathsToDelete(fs)) {
+if (fs.exists(path)) {
+  fs.delete(path, true)
--- End diff --

I think we should not delete the data when the task is aborted. The 
semantics of
`descriptionWithJob` should be to delete the data when the `Job` is 
commited.
I change code for handling exceptions.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...

2018-05-15 Thread zheh12
Github user zheh12 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21257#discussion_r188211717
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala ---
@@ -120,7 +120,8 @@ abstract class FileCommitProtocol {
* Specifies that a file should be deleted with the commit of this job. 
The default
* implementation deletes the file immediately.
*/
-  def deleteWithJob(fs: FileSystem, path: Path, recursive: Boolean): 
Boolean = {
+  def deleteWithJob(fs: FileSystem, path: Path, recursive: Boolean,
--- End diff --

I will remove the `recursive` parameter.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21286: [SPARK-24238][SQL] HadoopFsRelation can't append the sam...

2018-05-15 Thread zheh12
Github user zheh12 commented on the issue:

https://github.com/apache/spark/pull/21286
  
I think I may not have described this issue clearly.

First of all,the scene of the problem is this.

When multiple applications simultaneously append data to the same parquet 
datasource table.

They will run simultaneously and share the same output directory.

```
FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))
```

`ouputSepc` is the output table directory `skip_dir/tab1/`

`skip_dir/tab/_temporary` will be created as temporary dir.

But once one Job is successfully committed, it will run cleanupJob

```
Path pendingJobAttemptsPath = getPendingJobAttemptsPath();

fs.delete(pendingJobAttemptsPath, true);
```

The pendingJobAttemptsPath is `skip_dir/tab1/_temporary`

```
Private Path getPendingJobAttemptsPath() {
    Return getPendingJobAttemptsPath(getOutputPath());
}

Private static Path getPendingJobAttemptsPath(Path out) {
    Return new Path(out, PENDING_DIR_NAME);
}

Public static final String PENDING_DIR_NAME = "_temporary";
```

After the job is committed, `skip_dir/tab1/_temporary` will be deleted. 
Then when other jobs attempt to commit, an error will be reported.

Meanwhile, due to all applications share the same app appempt id, they 
write temporary data to the same temporary dir  `skip_dir/tab1/_temporary/0`. 
Data committed by the successful application is also corrupted.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21286: [SPARK-24238][SQL] HadoopFsRelation can't append the sam...

2018-05-14 Thread zheh12
Github user zheh12 commented on the issue:

https://github.com/apache/spark/pull/21286
  
Thanks @cloud-fan @steveloughran for your reply, I will look more detail on 
this problem.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...

2018-05-14 Thread zheh12
Github user zheh12 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21257#discussion_r187960677
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala ---
@@ -120,7 +120,8 @@ abstract class FileCommitProtocol {
* Specifies that a file should be deleted with the commit of this job. 
The default
* implementation deletes the file immediately.
*/
-  def deleteWithJob(fs: FileSystem, path: Path, recursive: Boolean): 
Boolean = {
+  def deleteWithJob(fs: FileSystem, path: Path, recursive: Boolean,
--- End diff --

In the current situation we can delete it, but I feel it better to use a 
default value true.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...

2018-05-14 Thread zheh12
Github user zheh12 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21257#discussion_r187959698
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -163,6 +169,12 @@ class HadoopMapReduceCommitProtocol(
   }
 
   override def commitJob(jobContext: JobContext, taskCommits: 
Seq[TaskCommitMessage]): Unit = {
+// first delete the should delete special file
+val committerFs = 
jobContext.getWorkingDirectory.getFileSystem(jobContext.getConfiguration)
--- End diff --

I change my code. 
I now record every `FileSystem` will delete the path with a map structure. 
And Don't assume that they will use the same `FileSystem`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...

2018-05-14 Thread zheh12
Github user zheh12 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21257#discussion_r187930560
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -163,6 +169,12 @@ class HadoopMapReduceCommitProtocol(
   }
 
   override def commitJob(jobContext: JobContext, taskCommits: 
Seq[TaskCommitMessage]): Unit = {
+// first delete the should delete special file
+val committerFs = 
jobContext.getWorkingDirectory.getFileSystem(jobContext.getConfiguration)
--- End diff --

StagingDir is not always be valid hadoop path, but the JobContext work dir 
always be.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...

2018-05-14 Thread zheh12
Github user zheh12 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21257#discussion_r187929156
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala ---
@@ -898,4 +898,12 @@ object DDLUtils {
 "Cannot overwrite a path that is also being read from.")
 }
   }
+
+  def verifyReadPath(query: LogicalPlan, outputPath: Path): Boolean = {
--- End diff --

isInReadPath or inReadPath or isReadPath better?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21286: [SPARK-24238][SQL] HadoopFsRelation can't append the sam...

2018-05-11 Thread zheh12
Github user zheh12 commented on the issue:

https://github.com/apache/spark/pull/21286
  
I think the Hadoop design does not allow two jobs to share the same output 
folder.

Hadoop has a related patch that can partially solve this problem. You can 
configure the parameters to not clean up the _temporary directory. But I think 
this is not a good solution.

[MAPREDUCE-6478. Add an option to skip cleanupJob stage or ignore cleanup 
failure during 
commitJob.](https://issues.apache.org/jira/browse/MAPREDUCE-6478?attachmentSortBy=fileName)
 

For this problem, we'd better use different temporary output directories 
for different jobs, and then copy the files.

However, the current implementation breaks some unit tests. There are two 
ways to fix it.

1. Add the check of  presence of tempDir in 
`HadoopMapReduceCommitProtocal.commitJob`, but this requires an external set 
`FileOutputFormat.setOutputPath(job, s".temp-${commiter.getJobId()}")`

2. Another approach is that we enable the tempDir directory for all 
`HadoopMapReduceCommitProtocal`.
  The shield tempDir setting problem, but for all jobs will be one more 
files move.

cc @cloud-fan.  Which do you think is better?  Please give me some advice?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21286: [SPARK-24238][SQL] HadoopFsRelation can't append the sam...

2018-05-09 Thread zheh12
Github user zheh12 commented on the issue:

https://github.com/apache/spark/pull/21286
  
relates to #21257 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21286: [SPARK-24238][SQL] HadoopFsRelation can't append the sam...

2018-05-09 Thread zheh12
Github user zheh12 commented on the issue:

https://github.com/apache/spark/pull/21286
  
cc @cloud-fan @jiangxb1987
Is there some drawbacks for this idea? Please give some advice when you 
have time.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21286: [SPARK-24194] HadoopFsRelation cannot overwrite a...

2018-05-09 Thread zheh12
GitHub user zheh12 opened a pull request:

https://github.com/apache/spark/pull/21286

[SPARK-24194] HadoopFsRelation cannot overwrite a path that is also b…

## What changes were proposed in this pull request?

When there are multiple tasks at the same time append a `HadoopFsRelation`, 
there will be an error, there are the following two errors: 

1. A task will succeed, but the data will be wrong and more data than 
excepted will appear
2. Other tasks will fail with `java.io.FileNotFoundException: Failed to get 
file status skip_dir/_temporary/0`

The main reason for this problem is because multiple job will use the same 
`_temporary` directory.

So the core idea of this `PR` is to create a different temporary directory 
with jobId for the different Job in the `output` folder , so that conflicts can 
be avoided.

## How was this patch tested?

I manually tested. 
But I don't know how to write a unit test for this situation. Please help 
me.


Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zheh12/spark SPARK-24238

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21286.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21286


commit b676a36af110b0b7d7dfc47ab292d09c441f6a0f
Author: yangz 
Date:   2018-05-10T01:46:49Z

[SPARK-24194] HadoopFsRelation cannot overwrite a path that is also being 
read from




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwrite a p...

2018-05-09 Thread zheh12
Github user zheh12 commented on the issue:

https://github.com/apache/spark/pull/21257
  
cc @cloud-fan, Jenkins has some error, please help me retest, thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwrite a p...

2018-05-09 Thread zheh12
Github user zheh12 commented on the issue:

https://github.com/apache/spark/pull/21257
  
Jenkins, retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...

2018-05-08 Thread zheh12
Github user zheh12 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21257#discussion_r186936032
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 ---
@@ -207,9 +207,25 @@ case class InsertIntoHadoopFsRelationCommand(
 }
 // first clear the path determined by the static partition keys (e.g. 
/table/foo=1)
 val staticPrefixPath = 
qualifiedOutputPath.suffix(staticPartitionPrefix)
-if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, 
staticPrefixPath, true)) {
-  throw new IOException(s"Unable to clear output " +
-s"directory $staticPrefixPath prior to writing to it")
+
+// check if delete the dir or just sub files
+if (fs.exists(staticPrefixPath)) {
+  // check if is he table root, and record the file to delete
+  if (staticPartitionPrefix.isEmpty) {
+val files = fs.listFiles(staticPrefixPath, false)
+while (files.hasNext) {
+  val file = files.next()
+  if (!committer.deleteWithJob(fs, file.getPath, true)) {
--- End diff --

That's a good idea. I change my code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...

2018-05-08 Thread zheh12
Github user zheh12 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21257#discussion_r186915569
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 ---
@@ -207,9 +207,25 @@ case class InsertIntoHadoopFsRelationCommand(
 }
 // first clear the path determined by the static partition keys (e.g. 
/table/foo=1)
 val staticPrefixPath = 
qualifiedOutputPath.suffix(staticPartitionPrefix)
-if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, 
staticPrefixPath, true)) {
-  throw new IOException(s"Unable to clear output " +
-s"directory $staticPrefixPath prior to writing to it")
+
+// check if delete the dir or just sub files
+if (fs.exists(staticPrefixPath)) {
+  // check if is he table root, and record the file to delete
+  if (staticPartitionPrefix.isEmpty) {
+val files = fs.listFiles(staticPrefixPath, false)
+while (files.hasNext) {
+  val file = files.next()
+  if (!committer.deleteWithJob(fs, file.getPath, true)) {
--- End diff --

The key is that the data is already in the `output` directory before 
committing job, and we can't delete the `output` directory anymore.

We overloaded `FileCommitProtocol` in the `HadoopMapReduceCommitProtocol` 
with the `deleteWithJob` method. Now it will not delete the file immediately, 
but it will wait until the entire job is committed.

We did delete the files with committed the job, but the temporary output 
files were generated when the task was started. These temporary output files 
are in the `output` directory.  And the data will be move out to the `output` 
directory. 

After the job starts, there is no safe time to delete the entire `output` 
directory.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...

2018-05-08 Thread zheh12
Github user zheh12 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21257#discussion_r186719888
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 ---
@@ -207,9 +207,25 @@ case class InsertIntoHadoopFsRelationCommand(
 }
 // first clear the path determined by the static partition keys (e.g. 
/table/foo=1)
 val staticPrefixPath = 
qualifiedOutputPath.suffix(staticPartitionPrefix)
-if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, 
staticPrefixPath, true)) {
-  throw new IOException(s"Unable to clear output " +
-s"directory $staticPrefixPath prior to writing to it")
+
+// check if delete the dir or just sub files
+if (fs.exists(staticPrefixPath)) {
+  // check if is he table root, and record the file to delete
+  if (staticPartitionPrefix.isEmpty) {
+val files = fs.listFiles(staticPrefixPath, false)
+while (files.hasNext) {
+  val file = files.next()
+  if (!committer.deleteWithJob(fs, file.getPath, true)) {
--- End diff --

If I do this, when the job is committed, it will delete the entire `output` 
directory. And there will be no data. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...

2018-05-07 Thread zheh12
Github user zheh12 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21257#discussion_r186609102
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
 ---
@@ -235,4 +247,20 @@ class HadoopMapReduceCommitProtocol(
   tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false)
 }
   }
+
+  /**
+   * now just record the file to be delete
+   */
+  override def deleteWithJob(fs: FileSystem,
--- End diff --

I have changed this code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...

2018-05-07 Thread zheh12
Github user zheh12 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21257#discussion_r186608143
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 ---
@@ -207,9 +207,25 @@ case class InsertIntoHadoopFsRelationCommand(
 }
 // first clear the path determined by the static partition keys (e.g. 
/table/foo=1)
 val staticPrefixPath = 
qualifiedOutputPath.suffix(staticPartitionPrefix)
-if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, 
staticPrefixPath, true)) {
-  throw new IOException(s"Unable to clear output " +
-s"directory $staticPrefixPath prior to writing to it")
+
+// check if delete the dir or just sub files
+if (fs.exists(staticPrefixPath)) {
+  // check if is he table root, and record the file to delete
+  if (staticPartitionPrefix.isEmpty) {
+val files = fs.listFiles(staticPrefixPath, false)
+while (files.hasNext) {
+  val file = files.next()
+  if (!committer.deleteWithJob(fs, file.getPath, true)) {
--- End diff --

We choose to postpone deletion. Whether or not `output` is the same as 
`input`,
now the `_temporary` directory is created in the `output` directory before 
deletion,
so that it is not possible to delete the root directory directly.

The original implementation was able to delete the root directory directly 
because it was deleted before the job was created, and then the root directory 
was rebuilt. Then the `_temporary` directory was created. Failure of any `task` 
in `job` in the original implementation will result in the loss of `output` 
data.

 I can't figure out how to separate the two situations. Do you have any 
good ideas?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...

2018-05-07 Thread zheh12
Github user zheh12 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21257#discussion_r186603145
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 ---
@@ -207,9 +207,25 @@ case class InsertIntoHadoopFsRelationCommand(
 }
 // first clear the path determined by the static partition keys (e.g. 
/table/foo=1)
 val staticPrefixPath = 
qualifiedOutputPath.suffix(staticPartitionPrefix)
-if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, 
staticPrefixPath, true)) {
-  throw new IOException(s"Unable to clear output " +
-s"directory $staticPrefixPath prior to writing to it")
+
+// check if delete the dir or just sub files
+if (fs.exists(staticPrefixPath)) {
+  // check if is he table root, and record the file to delete
+  if (staticPartitionPrefix.isEmpty) {
+val files = fs.listFiles(staticPrefixPath, false)
+while (files.hasNext) {
+  val file = files.next()
+  if (!committer.deleteWithJob(fs, file.getPath, true)) {
--- End diff --

1. From the code point of view, the current implementation is 
`deleteMatchingPartitions` happend only if `overwrite` is specified.
2. Using `dynamicPartitionOverwrite` will not solve this problem,because it 
will also generate a `.stage` directory under the table root directory. We 
still need to record all the files we want to delete, but we cannot directly 
delete the root directories.
The dynamic partition overwrite is actually recording all the partitions 
that need to be deleted and then deleted one by one. And the entire table 
`overwrite` deletes all the data of the entire directory, it needs to record 
all deleted partition directory files,so in fact the implementation of the code 
is similar with `dynamicPartitionOverwrite` .


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwr...

2018-05-07 Thread zheh12
Github user zheh12 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21257#discussion_r186599760
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
 ---
@@ -207,9 +207,25 @@ case class InsertIntoHadoopFsRelationCommand(
 }
 // first clear the path determined by the static partition keys (e.g. 
/table/foo=1)
 val staticPrefixPath = 
qualifiedOutputPath.suffix(staticPartitionPrefix)
-if (fs.exists(staticPrefixPath) && !committer.deleteWithJob(fs, 
staticPrefixPath, true)) {
-  throw new IOException(s"Unable to clear output " +
-s"directory $staticPrefixPath prior to writing to it")
+
+// check if delete the dir or just sub files
+if (fs.exists(staticPrefixPath)) {
+  // check if is he table root, and record the file to delete
+  if (staticPartitionPrefix.isEmpty) {
+val files = fs.listFiles(staticPrefixPath, false)
+while (files.hasNext) {
+  val file = files.next()
+  if (!committer.deleteWithJob(fs, file.getPath, true)) {
--- End diff --

First of all, if it is the root directory of the table, I must record all 
the files in the directory, and wait until the job is commited to delete. 
Because the `_temporary` of the entire job is also in the directory, I cannot 
directly delete the entire directory.

Second, when we record the files that need to be deleted, we just list the 
files in the root directory non-recursively. Under normal circumstances, the 
number of files in the first-level directory of the partition table will not be 
too much.

In the end, this will certainly be slower than directly deleting the entire 
directory, but under the current implementation, we cannot directly delete the 
entire table directory.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21257: [SPARK-24194] [SQL]HadoopFsRelation cannot overwrite a p...

2018-05-07 Thread zheh12
Github user zheh12 commented on the issue:

https://github.com/apache/spark/pull/21257
  
cc @rxin @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21257: [SPARK-24194] HadoopFsRelation cannot overwrite a...

2018-05-07 Thread zheh12
GitHub user zheh12 opened a pull request:

https://github.com/apache/spark/pull/21257

[SPARK-24194] HadoopFsRelation cannot overwrite a path that is also b…

## What changes were proposed in this pull request?

When insert overwrite in a parquet table. There will be a error check 

```
  if (overwrite) DDLUtils.verifyNotReadPath(actualQuery, outputPath)
```
But we can do this for a hive table.

The reason why we can't overwrite a **HadoopFsRelation** with output same 
as input is we delete the output path first. I think we can fix this with 
latter delete, just mark path should be deleted after the job 
commit.  

## How was this patch tested?

I change the test code **InsertSuite** and **MetastoreDataSourceSuite**. 
They now are input and output table can be same test. 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zheh12/spark SPARK-24194

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21257.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21257






---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org