lirui-apache commented on a change in pull request #12168:
URL: https://github.com/apache/flink/pull/12168#discussion_r426141927



##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveRowDataPartitionComputer.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive;
+
+import org.apache.flink.table.catalog.hive.client.HiveShim;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.filesystem.RowDataPartitionComputer;
+import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
+import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+
+/**
+ * A {@link RowDataPartitionComputer} that converts Flink objects to Hive 
objects before computing the partition value strings.
+ */
+public class HiveRowDataPartitionComputer extends RowDataPartitionComputer {
+
+       private final DataFormatConverters.DataFormatConverter[] 
partitionConverters;
+       private final HiveObjectConversion[] partColConversions;

Review comment:
       We already have `partitionConverters`. I think we can just name this one 
as `hiveObjectConversions`.

##########
File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/write/HadoopPathBasedPartFileWriterTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.write;
+
+import org.apache.flink.api.common.serialization.PathBasedBulkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.TestStreamingFileSinkFactory;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.streaming.util.FiniteTestSource;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Base class for testing writing data to the hadoop file system with 
different configurations.
+ */
+public class HadoopPathBasedPartFileWriterTest extends AbstractTestBase {
+       @Rule
+       public final Timeout timeoutPerTest = Timeout.seconds(2000);
+
+       @Test
+       public void testWriteFile() throws Exception {
+               File file = TEMPORARY_FOLDER.newFolder();
+               Path basePath = new Path(file.toURI());
+
+               List<String> data = Arrays.asList(
+                       "first line",
+                       "second line",
+                       "third line");
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
+               env.enableCheckpointing(100);
+
+               DataStream<String> stream = env.addSource(
+                       new FiniteTestSource<>(data), 
TypeInformation.of(String.class));
+               Configuration configuration = new Configuration();
+
+               HadoopPathBasedBulkFormatBuilder<String, String, ?> builder =
+                       new HadoopPathBasedBulkFormatBuilder<>(
+                               basePath,
+                               new TestHadoopPathBasedBulkWriterFactory(),
+                               configuration,
+                               new DateTimeBucketAssigner<>());
+               TestStreamingFileSinkFactory<String> streamingFileSinkFactory = 
new TestStreamingFileSinkFactory<>();
+               stream.addSink(streamingFileSinkFactory.createSink(builder, 
1000));
+
+               env.execute();
+               validateResult(data, configuration, basePath);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private void validateResult(List<String> expected, Configuration 
config, Path basePath) throws IOException {
+               FileSystem fileSystem = FileSystem.get(basePath.toUri(), 
config);
+               FileStatus[] buckets = fileSystem.listStatus(basePath);
+               assertNotNull(buckets);
+               assertEquals(1, buckets.length);
+
+               FileStatus[] partFiles = 
fileSystem.listStatus(buckets[0].getPath());
+               assertNotNull(partFiles);
+               assertEquals(2, partFiles.length);

Review comment:
       Why will there be two files?

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/write/HiveRecordWriterFactory.java
##########
@@ -167,56 +151,65 @@ public HiveOutputFormat createOutputFormat(Path outPath) {
                                }
                        }
 
-                       RecordWriter recordWriter = 
hiveShim.getHiveRecordWriter(
+                       return hiveShim.getHiveRecordWriter(
                                        conf,
                                        hiveOutputFormatClz,
                                        recordSerDe.getSerializedClass(),
                                        isCompressed,
                                        tableProperties,
-                                       HadoopFileSystem.toHadoopPath(outPath));
-                       return new HiveOutputFormat(recordWriter);
+                                       path);
                } catch (Exception e) {
                        throw new FlinkHiveException(e);
                }
        }
 
-       private class HiveOutputFormat implements 
org.apache.flink.api.common.io.OutputFormat<Row> {
-
-               private final RecordWriter recordWriter;
+       public JobConf getJobConf() {
+               return confWrapper.conf();
+       }
 
-               private HiveOutputFormat(RecordWriter recordWriter) {
-                       this.recordWriter = recordWriter;
-               }
+       private void initialize() throws Exception {
+               JobConf jobConf = confWrapper.conf();
+               Object serdeLib = 
Class.forName(serDeInfo.getSerializationLib()).newInstance();
+               Preconditions.checkArgument(serdeLib instanceof Serializer && 
serdeLib instanceof Deserializer,
+                               "Expect a SerDe lib implementing both 
Serializer and Deserializer, but actually got "
+                                               + 
serdeLib.getClass().getName());
+               this.recordSerDe = (Serializer) serdeLib;
+               ReflectionUtils.setConf(recordSerDe, jobConf);
 
-               // converts a Row to a list of Hive objects so that Hive can 
serialize it
-               private Object getConvertedRow(Row record) {
-                       List<Object> res = new 
ArrayList<>(numNonPartitionColumns);
-                       for (int i = 0; i < numNonPartitionColumns; i++) {
-                               
res.add(hiveConversions[i].toHiveObject(record.getField(i)));
-                       }
-                       return res;
-               }
+               // TODO: support partition properties, for now assume they're 
same as table properties
+               SerDeUtils.initializeSerDe((Deserializer) recordSerDe, jobConf, 
tableProperties, null);
 
-               @Override
-               public void configure(Configuration parameters) {
+               this.formatFields = allColumns.length - partitionColumns.length;
+               this.hiveConversions = new HiveObjectConversion[formatFields];
+               this.converters = new DataFormatConverter[formatFields];
+               List<ObjectInspector> objectInspectors = new 
ArrayList<>(hiveConversions.length);
+               for (int i = 0; i < formatFields; i++) {
+                       DataType type = allTypes[i];
+                       ObjectInspector objectInspector = 
HiveInspectors.getObjectInspector(type);
+                       objectInspectors.add(objectInspector);
+                       hiveConversions[i] = HiveInspectors.getConversion(
+                                       objectInspector, type.getLogicalType(), 
hiveShim);
+                       converters[i] = 
DataFormatConverters.getConverterForDataType(type);
                }
 
-               @Override
-               public void open(int taskNumber, int numTasks) throws 
IOException {
-               }
+               this.formatInspector = 
ObjectInspectorFactory.getStandardStructObjectInspector(
+                               Arrays.asList(allColumns).subList(0, 
formatFields),
+                               objectInspectors);
+       }
 
-               @Override
-               public void writeRecord(Row record) throws IOException {
-                       try {
-                               
recordWriter.write(recordSerDe.serialize(getConvertedRow(record), 
rowObjectInspector));
-                       } catch (SerDeException e) {
-                               throw new IOException(e);
-                       }
+       Writable toHiveWritable(Row row) throws SerDeException {

Review comment:
       It seems strange that a "record writer factory" is also responsible for 
converting data between Flink and Hive. Can we move these methods to somewhere 
else? Or perhaps this class shouldn't be a factory in the first place.

##########
File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/write/HadoopPathBasedPartFileWriterTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.hive.write;
+
+import org.apache.flink.api.common.serialization.PathBasedBulkWriter;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.HadoopPathBasedBulkFormatBuilder;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.TestStreamingFileSinkFactory;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import org.apache.flink.streaming.util.FiniteTestSource;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Base class for testing writing data to the hadoop file system with 
different configurations.

Review comment:
       Doesn't seem to be a **base** class

##########
File path: 
flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/HadoopPathBasedBulkFormatBuilder.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import 
org.apache.flink.connectors.hive.write.DefaultHadoopFileCommitterFactory;
+import org.apache.flink.connectors.hive.write.HadoopFileCommitterFactory;
+import org.apache.flink.connectors.hive.write.HadoopPathBasedBulkWriterFactory;
+import org.apache.flink.connectors.hive.write.HadoopPathBasedPartFileWriter;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * Buckets builder to create buckets that use {@link 
HadoopPathBasedPartFileWriter}.
+ */
+public class HadoopPathBasedBulkFormatBuilder<IN, BucketID, T extends 
HadoopPathBasedBulkFormatBuilder<IN, BucketID, T>>

Review comment:
       Why is this a "format builder" instead of a "buckets builder"?

##########
File path: 
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkTest.java
##########
@@ -213,6 +228,95 @@ public void testWriteNullValues() throws Exception {
                }
        }
 
+       @Test(timeout = 120000)
+       public void testPartStreamingWrite() throws Exception {
+               testStreamingWrite(true, (path) -> {
+                       File basePath = new File(path, "d=2020-05-03");
+                       Assert.assertEquals(5, basePath.list().length);
+                       Assert.assertTrue(new File(new File(basePath, "e=7"), 
"_MY_SUCCESS").exists());
+                       Assert.assertTrue(new File(new File(basePath, "e=8"), 
"_MY_SUCCESS").exists());
+                       Assert.assertTrue(new File(new File(basePath, "e=9"), 
"_MY_SUCCESS").exists());
+                       Assert.assertTrue(new File(new File(basePath, "e=10"), 
"_MY_SUCCESS").exists());
+                       Assert.assertTrue(new File(new File(basePath, "e=11"), 
"_MY_SUCCESS").exists());
+               });
+       }
+
+       @Test(timeout = 120000)
+       public void testNonPartStreamingWrite() throws Exception {
+               testStreamingWrite(false, (p) -> {});

Review comment:
       Does this mean success file is not written for non-partitioned tables?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/PartFileWriter.java
##########
@@ -20,122 +20,172 @@
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
-import org.apache.flink.core.fs.RecoverableWriter;
-import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
 
 import java.io.IOException;
 
 /**
- * An abstract writer for the currently open part file in a specific {@link 
Bucket}.
- *
- * <p>Currently, there are two subclasses, of this class:
- * <ol>
- *     <li>One for row-wise formats: the {@link RowWisePartWriter}.</li>
- *     <li>One for bulk encoding formats: the {@link BulkPartWriter}.</li>
- * </ol>
- *
- * <p>This also implements the {@link PartFileInfo}.
+ * The {@link Bucket} uses the {@link PartFileWriter} to write element to a 
part file.
  */
 @Internal
-abstract class PartFileWriter<IN, BucketID> implements PartFileInfo<BucketID> {
+interface PartFileWriter<IN, BucketID> extends PartFileInfo<BucketID> {
 
-       private final BucketID bucketId;
+       /**
+        * Write a element to the part file.
+        * @param element the element to be written.
+        * @param currentTime the writing time.
+        * @throws IOException Thrown if writing the element fails.
+        */
+       void write(final IN element, final long currentTime) throws IOException;
 
-       private final long creationTime;
+       /**
+        * @return The state of the current part file.
+        * @throws IOException Thrown if persisting the part file fails.
+        */
+       InProgressFileRecoverable persist() throws IOException;
 
-       protected final RecoverableFsDataOutputStream currentPartStream;
 
-       private long lastUpdateTime;
+       /**
+        * @return The state of the pending part file. {@link Bucket} uses this 
to commit the pending file.
+        * @throws IOException Thrown if an I/O error occurs.
+        */
+       PendingFileRecoverable closeForCommit() throws IOException;
 
-       protected PartFileWriter(
-                       final BucketID bucketId,
-                       final RecoverableFsDataOutputStream currentPartStream,
-                       final long creationTime) {
+       /**
+        * Dispose the part file.
+        */
+       void dispose();
 
-               Preconditions.checkArgument(creationTime >= 0L);
-               this.bucketId = Preconditions.checkNotNull(bucketId);
-               this.currentPartStream = 
Preconditions.checkNotNull(currentPartStream);
-               this.creationTime = creationTime;
-               this.lastUpdateTime = creationTime;
-       }
+       // 
------------------------------------------------------------------------
 
-       abstract void write(IN element, long currentTime) throws IOException;
+       /**
+        * An interface for factories that create the different {@link 
PartFileWriter writers}.
+        */
+       interface PartFileFactory<IN, BucketID> {
 
-       RecoverableWriter.ResumeRecoverable persist() throws IOException {
-               return currentPartStream.persist();
-       }
+               /**
+                * Used to create a new {@link PartFileWriter}.
+                * @param bucketID the id of the bucket this writer is writing 
to.
+                * @param path the path this writer will write to.
+                * @param creationTime the creation time of the file.
+                * @return the new {@link PartFileWriter}
+                * @throws IOException Thrown if creating a writer fails.
+                */
+               PartFileWriter<IN, BucketID> openNew(
+                       final BucketID bucketID,
+                       final Path path,
+                       final long creationTime) throws IOException;
 
-       RecoverableWriter.CommitRecoverable closeForCommit() throws IOException 
{
-               return currentPartStream.closeForCommit().getRecoverable();
-       }
+               /**
+                * Used to resume a {@link PartFileWriter} from a {@link 
InProgressFileRecoverable}.
+                * @param bucketID the id of the bucket this writer is writing 
to.
+                * @param inProgressFileSnapshot the state of the part file.
+                * @param creationTime the creation time of the file.
+                * @return the resumed {@link PartFileWriter}
+                * @throws IOException Thrown if resuming a writer fails.
+                */
+               PartFileWriter<IN, BucketID> resumeFrom(
+                       final BucketID bucketID,
+                       final InProgressFileRecoverable inProgressFileSnapshot,
+                       final long creationTime) throws IOException;
 
-       void dispose() {
-               // we can suppress exceptions here, because we do not rely on 
close() to
-               // flush or persist any data
-               IOUtils.closeQuietly(currentPartStream);
-       }
+               /**
+                * Recovers a pending file for finalizing and committing.
+                * @param pendingFileRecoverable The handle with the recovery 
information.
+                * @return A pending file
+                * @throws IOException Thrown if recovering a pending file 
fails.
+                */
+               PendingFile recoverPendingFile(final PendingFileRecoverable 
pendingFileRecoverable) throws IOException;
 
-       void markWrite(long now) {
-               this.lastUpdateTime = now;
-       }
+               /**
+                * Marks if requiring to do any additional cleanup/freeing of 
resources occupied
+                * as part of a {@link InProgressFileRecoverable}.
+                *
+                * <p>In case cleanup is required, then {@link 
#cleanupInProgressFileRecoverable(InProgressFileRecoverable)} should
+                * be called.
+                *
+                * @return {@code true} if cleanup is required, {@code false} 
otherwise.
+                */
+               boolean requiresCleanupOfInProgressFileRecoverableState();
 
-       @Override
-       public BucketID getBucketId() {
-               return bucketId;
-       }
+               /**
+                * Frees up any resources that were previously occupied in 
order to be able to
+                * recover from a (potential) failure.
+                *
+                * <p><b>NOTE:</b> This operation should not throw an exception 
if the {@link InProgressFileRecoverable} has already
+                * been cleaned up and the resources have been freed. But the 
contract is that it will throw
+                * an {@link UnsupportedOperationException} if it is called for 
a {@link PartFileFactory}
+                * whose {@link 
#requiresCleanupOfInProgressFileRecoverableState()} returns {@code false}.
+                *
+                * @param inProgressFileRecoverable the {@link 
InProgressFileRecoverable} whose state we want to clean-up.
+                * @return {@code true} if the resources were successfully 
freed, {@code false} otherwise
+                * (e.g. the file to be deleted was not there for any reason - 
already deleted or never created).
+                * @throws IOException if an I/O error occurs
+                */
+               boolean cleanupInProgressFileRecoverable(final 
InProgressFileRecoverable inProgressFileRecoverable) throws IOException;
 
-       @Override
-       public long getCreationTime() {
-               return creationTime;
-       }
 
-       @Override
-       public long getSize() throws IOException {
-               return currentPartStream.getPos();
-       }
+               /**
+                * @return the serializer for the {@link 
PendingFileRecoverable}.
+                */
+               SimpleVersionedSerializer<? extends PendingFileRecoverable> 
getPendingFileRecoverableSerializer();
 
-       @Override
-       public long getLastUpdateTime() {
-               return lastUpdateTime;
+               /**
+                * @return the serializer for the {@link 
InProgressFileRecoverable}.
+                */
+               SimpleVersionedSerializer<? extends InProgressFileRecoverable> 
getInProgressFileRecoverableSerializer();
+
+               /**
+                * Checks whether the {@link PartFileWriter} supports resuming 
(appending to) files after
+                * recovery (via the {@link #resumeFrom(Object, 
InProgressFileRecoverable, long)} method).
+                *
+                * <p>If true, then this writer supports the {@link 
#resumeFrom(Object, InProgressFileRecoverable, long)} method.
+                * If false, then that method may not be supported and file can 
only be recovered via
+                * {@link #recoverPendingFile(PendingFileRecoverable)}.
+                */
+               boolean supportsResume();
        }
 
-       // 
------------------------------------------------------------------------
+        /**
+        * A handle can be used to recover in-progress file..
+        */
+       interface InProgressFileRecoverable extends PendingFileRecoverable {}

Review comment:
       Can we have some comments to explain the difference between `pending 
file` and `in progress file`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to