gaoyunhaii commented on a change in pull request #18680:
URL: https://github.com/apache/flink/pull/18680#discussion_r803265948



##########
File path: 
flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/OutputStreamBasedCompactingFileWriter.java
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink.filesystem;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * The compactors use the {@link OutputStreamBasedCompactingFileWriter} to 
directly write a
+ * compacting file as an {@link OutputStream}.
+ */
+public interface OutputStreamBasedCompactingFileWriter extends 
CompactingFileWriter {
+    /**
+     * Get the output stream underlying the writer. The close method of the 
returned stream should

Review comment:
       `Get` -> `Gets`

##########
File path: 
flink-connectors/flink-file-sink-common/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketWriter.java
##########
@@ -39,6 +39,32 @@
     InProgressFileWriter<IN, BucketID> openNewInProgressFile(
             final BucketID bucketID, final Path path, final long creationTime) 
throws IOException;
 
+    /**
+     * Used to create a new {@link CompactingFileWriter} of the requesting 
type. A {@link
+     * InProgressFileWriter} will be created by default, which supports only 
the RECORD_WISE type.
+     * Requesting a writer of an unsupported type will result in an 
UnsupportedOperationException.

Review comment:
       `an UnsupportedOperationException` -> `UnsupportedOperationException`? 
   
   Also I think we do not create the `InProgressFileWriter` by default, might 
change to `A {@link
        * InProgressFileWriter} will be created by default` for RECORD_WISE 
type`. 

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor.operator;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Coordinator that coordinates file compaction for the {@link FileSink}.
+ *
+ * <p>All committable emitted from the writers are collected and packed into 
{@link
+ * CompactorRequest}s. The {@link FileCompactStrategy} defines when the 
requests can be fired. When
+ * a firing condition is met, the requests will be sent to the {@link 
CompactorOperator}.
+ *
+ * <p>The {@link CompactCoordinator} stores the non-fired committable as its 
state, and may emit a
+ * request at any time. A {@link CompactorOperator} must ensure that the 
ownership of the
+ * committable in a compact request is successfully handed from the 
coordinator, before it can
+ * actually perform the compaction.
+ */
+public class CompactCoordinator extends 
AbstractStreamOperator<CompactorRequest>
+        implements OneInputStreamOperator<
+                        CommittableMessage<FileSinkCommittable>, 
CompactorRequest>,
+                BoundedOneInput {
+
+    private static final ListStateDescriptor<byte[]> 
REMAINING_COMMITTABLE_RAW_STATES_DESC =
+            new ListStateDescriptor<>(
+                    "remaining_compact_commt_raw_state", 
BytePrimitiveArraySerializer.INSTANCE);
+
+    private final FileCompactStrategy strategy;
+    private final SimpleVersionedSerializer<FileSinkCommittable> 
committableSerializer;
+
+    private final Map<String, CompactorRequest> packing = new HashMap<>();
+    private final Map<String, CompactTrigger> triggers = new HashMap<>();
+
+    private ListState<FileSinkCommittable> remainingCommittableState;
+
+    public CompactCoordinator(
+            FileCompactStrategy strategy,
+            SimpleVersionedSerializer<FileSinkCommittable> 
committableSerializer) {
+        this.strategy = strategy;
+        this.committableSerializer = checkNotNull(committableSerializer);
+    }
+
+    @Override
+    public void 
processElement(StreamRecord<CommittableMessage<FileSinkCommittable>> element)
+            throws Exception {
+        CommittableMessage<FileSinkCommittable> message = element.getValue();
+        if (message instanceof CommittableWithLineage) {
+            FileSinkCommittable committable =
+                    ((CommittableWithLineage<FileSinkCommittable>) 
element.getValue())
+                            .getCommittable();
+            String bucketId = committable.getBucketId();
+            if (packAndTrigger(bucketId, committable)) {
+                fireAndPurge(bucketId);
+            }
+        }
+        // or message instanceof CommittableSummary
+        // info in CommittableSummary is not necessary for compacting at 
present, ignore it
+    }
+
+    private boolean packAndTrigger(String bucketId, FileSinkCommittable 
committable) {
+        CompactorRequest bucketRequest = packing.computeIfAbsent(bucketId, 
CompactorRequest::new);
+        if (committable.hasInProgressFileToCleanup() || 
committable.hasCompactedFileToCleanup()) {

Review comment:
       Perhaps first assert here that such committables must not has 
pendingFiles, checkState(!committable.hasPendingFIle()).

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor.operator;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Coordinator that coordinates file compaction for the {@link FileSink}.
+ *
+ * <p>All committable emitted from the writers are collected and packed into 
{@link
+ * CompactorRequest}s. The {@link FileCompactStrategy} defines when the 
requests can be fired. When
+ * a firing condition is met, the requests will be sent to the {@link 
CompactorOperator}.
+ *
+ * <p>The {@link CompactCoordinator} stores the non-fired committable as its 
state, and may emit a
+ * request at any time. A {@link CompactorOperator} must ensure that the 
ownership of the
+ * committable in a compact request is successfully handed from the 
coordinator, before it can
+ * actually perform the compaction.
+ */
+public class CompactCoordinator extends 
AbstractStreamOperator<CompactorRequest>
+        implements OneInputStreamOperator<
+                        CommittableMessage<FileSinkCommittable>, 
CompactorRequest>,
+                BoundedOneInput {
+
+    private static final ListStateDescriptor<byte[]> 
REMAINING_COMMITTABLE_RAW_STATES_DESC =
+            new ListStateDescriptor<>(
+                    "remaining_compact_commt_raw_state", 
BytePrimitiveArraySerializer.INSTANCE);
+
+    private final FileCompactStrategy strategy;
+    private final SimpleVersionedSerializer<FileSinkCommittable> 
committableSerializer;
+
+    private final Map<String, CompactorRequest> packing = new HashMap<>();
+    private final Map<String, CompactTrigger> triggers = new HashMap<>();
+
+    private ListState<FileSinkCommittable> remainingCommittableState;
+
+    public CompactCoordinator(
+            FileCompactStrategy strategy,
+            SimpleVersionedSerializer<FileSinkCommittable> 
committableSerializer) {
+        this.strategy = strategy;
+        this.committableSerializer = checkNotNull(committableSerializer);
+    }
+
+    @Override
+    public void 
processElement(StreamRecord<CommittableMessage<FileSinkCommittable>> element)
+            throws Exception {
+        CommittableMessage<FileSinkCommittable> message = element.getValue();
+        if (message instanceof CommittableWithLineage) {
+            FileSinkCommittable committable =
+                    ((CommittableWithLineage<FileSinkCommittable>) 
element.getValue())
+                            .getCommittable();
+            String bucketId = committable.getBucketId();
+            if (packAndTrigger(bucketId, committable)) {
+                fireAndPurge(bucketId);
+            }
+        }
+        // or message instanceof CommittableSummary
+        // info in CommittableSummary is not necessary for compacting at 
present, ignore it
+    }
+
+    private boolean packAndTrigger(String bucketId, FileSinkCommittable 
committable) {
+        CompactorRequest bucketRequest = packing.computeIfAbsent(bucketId, 
CompactorRequest::new);
+        if (committable.hasInProgressFileToCleanup() || 
committable.hasCompactedFileToCleanup()) {
+            // cleanup request, pass through directly
+            bucketRequest.addToPassthrough(committable);
+            return false;
+        }
+
+        if (!committable.hasPendingFile()) {
+            throw new RuntimeException("Committable to compact has no 
content.");
+        }
+
+        CompactTrigger trigger =
+                triggers.computeIfAbsent(bucketId, id -> new 
CompactTrigger(strategy));
+        CompactTriggerResult triggerResult = trigger.onElement(committable);
+        switch (triggerResult) {
+            case PASS_THROUGH:
+                bucketRequest.addToPassthrough(committable);
+                return false;
+            case CONTINUE:
+                bucketRequest.addToCompact(committable);
+                return false;
+            case FIRE_AND_PURGE:
+                bucketRequest.addToCompact(committable);
+                return true;
+            default:
+                throw new RuntimeException("Unexpected trigger result:" + 
triggerResult);
+        }
+    }
+
+    private void fireAndPurge(String bucketId) {
+        triggers.remove(bucketId);
+        CompactorRequest request = packing.remove(bucketId);
+        if (request != null) {
+            output.collect(new StreamRecord<>(request));
+        }
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        // emit all requests remained
+        for (CompactorRequest request : packing.values()) {
+            output.collect(new StreamRecord<>(request));
+        }
+        packing.clear();
+        triggers.clear();
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        super.prepareSnapshotPreBarrier(checkpointId);
+
+        // trigger on checkpoint
+        List<String> bucketsToFire = new ArrayList<>(triggers.size());
+        for (Map.Entry<String, CompactTrigger> e : triggers.entrySet()) {
+            String bucketId = e.getKey();
+            CompactTrigger trigger = e.getValue();
+            if (trigger.onCheckpoint(checkpointId) == 
CompactTriggerResult.FIRE_AND_PURGE) {
+                bucketsToFire.add(bucketId);
+            }
+        }
+        bucketsToFire.forEach(this::fireAndPurge);
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+
+        List<FileSinkCommittable> remainingCommittable =
+                packing.values().stream()
+                        .flatMap(r -> r.getCommittableToCompact().stream())
+                        .collect(Collectors.toList());
+        packing.values().stream()
+                .flatMap(r -> r.getCommittableToPassthrough().stream())
+                .forEach(remainingCommittable::add);
+        remainingCommittableState.update(remainingCommittable);
+
+        // triggers will be recomputed when restoring so it's not necessary to 
store
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+
+        remainingCommittableState =
+                new SimpleVersionedListState<>(
+                        context.getOperatorStateStore()
+                                
.getListState(REMAINING_COMMITTABLE_RAW_STATES_DESC),
+                        committableSerializer);
+
+        Iterable<FileSinkCommittable> stateRemaining = 
remainingCommittableState.get();
+        if (stateRemaining != null) {
+            for (FileSinkCommittable committable : stateRemaining) {
+                // restore and redistribute
+                String bucketId = committable.getBucketId();
+                if (packAndTrigger(bucketId, committable)) {
+                    fireAndPurge(bucketId);
+                }
+            }
+        }
+    }
+
+    enum CompactTriggerResult {
+        CONTINUE,
+        FIRE_AND_PURGE,
+        PASS_THROUGH
+    }
+
+    private static class CompactTrigger {
+        private final long threshold;
+        private final boolean compactOnCheckpoint;
+
+        private long size;
+
+        CompactTrigger(FileCompactStrategy strategy) {
+            this.threshold = strategy.getSizeThreshold();
+            this.compactOnCheckpoint = strategy.isCompactOnCheckpoint();
+        }
+
+        public CompactTriggerResult onElement(FileSinkCommittable committable) 
{

Review comment:
       Has we considered the case that when starting from an existing file sink 
some committable might not target at a temporary file? namely path does not 
start with '.'

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor.operator;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.connector.file.sink.compactor.FileCompactor;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.util.Hardware;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+/**
+ * An operator that perform compaction for the {@link FileSink}.
+ *
+ * <p>Requests received from the {@link CompactCoordinator} will firstly be 
held in memory, and
+ * snapshot into the state of a checkpoint. When the checkpoint is 
successfully completed, all
+ * requests received before can be submitted. The results can be emitted at 
the next {@link
+ * #prepareSnapshotPreBarrier} invoking after the compaction is finished, to 
ensure that committers
+ * can receive only one CommittableSummary and the corresponding number of 
Committable for a single
+ * checkpoint.
+ */
+public class CompactorOperator
+        extends AbstractStreamOperator<CommittableMessage<FileSinkCommittable>>
+        implements OneInputStreamOperator<
+                        CompactorRequest, 
CommittableMessage<FileSinkCommittable>>,
+                BoundedOneInput,
+                CheckpointListener {
+
+    private static final String COMPACTED_PREFIX = "compacted-";
+    private static final long SUBMITTED_ID = -1L;
+
+    private static final ListStateDescriptor<byte[]> 
REMAINING_REQUESTS_RAW_STATES_DESC =
+            new ListStateDescriptor<>(
+                    "remaining_requests_raw_state", 
BytePrimitiveArraySerializer.INSTANCE);
+
+    private final int compactThreads;
+    private final SimpleVersionedSerializer<FileSinkCommittable> 
committableSerializer;
+
+    private final FileCompactor fileCompactor;
+    private final BucketWriter<?, String> bucketWriter;
+
+    private transient ExecutorService compactService;
+
+    private List<CompactorRequest> collectingRequests = new ArrayList<>();
+    private final TreeMap<Long, List<CompactorRequest>> snapshotRequests = new 
TreeMap<>();
+    private final List<Tuple2<CompactorRequest, 
CompletableFuture<Iterable<FileSinkCommittable>>>>
+            compactingRequests = new LinkedList<>();
+
+    private ListState<Map<Long, List<CompactorRequest>>> 
remainingRequestsState;
+
+    public CompactorOperator(
+            int compactThreads,
+            SimpleVersionedSerializer<FileSinkCommittable> 
committableSerializer,
+            FileCompactor fileCompactor,
+            BucketWriter<?, String> bucketWriter) {
+        this.compactThreads = compactThreads;
+        this.committableSerializer = committableSerializer;
+        this.fileCompactor = fileCompactor;
+        this.bucketWriter = bucketWriter;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.compactService =
+                Executors.newFixedThreadPool(
+                        Math.max(1, Math.min(compactThreads, 
Hardware.getNumberCPUCores())),
+                        new ExecutorThreadFactory("compact-executor"));
+    }
+
+    @Override
+    public void processElement(StreamRecord<CompactorRequest> element) throws 
Exception {
+        collectingRequests.add(element.getValue());
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        // add collecting requests into the final snapshot
+        snapshotRequests.put(Long.MAX_VALUE, collectingRequests);
+        collectingRequests = new ArrayList<>();
+
+        // submit all requests and wait until they are done
+        submitUntil(Long.MAX_VALUE);
+        assert snapshotRequests.isEmpty();
+
+        getAllTasksFuture().join();
+        emitCompacted(null);
+        assert compactingRequests.isEmpty();
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        super.notifyCheckpointComplete(checkpointId);
+        submitUntil(checkpointId);
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        emitCompacted(checkpointId);
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+
+        // add collecting requests during the checkpoint into the snapshot
+        snapshotRequests.put(context.getCheckpointId(), collectingRequests);
+        collectingRequests = new ArrayList<>();
+
+        // snapshot all compacting requests as well, including the requests 
that are not finished
+        // when invoking prepareSnapshotPreBarrier but finished now, since 
they are not emitted yet
+        Map<Long, List<CompactorRequest>> requests = new 
HashMap<>(snapshotRequests);
+        requests.computeIfAbsent(SUBMITTED_ID, id -> new ArrayList<>())
+                .addAll(compactingRequests.stream().map(r -> 
r.f0).collect(Collectors.toList()));
+        remainingRequestsState.update(Collections.singletonList(requests));
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+
+        remainingRequestsState =
+                new SimpleVersionedListState<>(
+                        context.getOperatorStateStore()
+                                
.getListState(REMAINING_REQUESTS_RAW_STATES_DESC),
+                        new RemainingRequestsSerializer(
+                                new 
CompactorRequestSerializer(committableSerializer)));
+
+        Iterable<Map<Long, List<CompactorRequest>>> stateRemaining = 
remainingRequestsState.get();
+        if (stateRemaining != null) {
+            for (Map<Long, List<CompactorRequest>> requests : stateRemaining) {
+                // elements can be more than one when redistributed after 
parallelism changing
+                for (Map.Entry<Long, List<CompactorRequest>> e : 
requests.entrySet()) {
+                    List<CompactorRequest> list =
+                            snapshotRequests.computeIfAbsent(e.getKey(), id -> 
new ArrayList<>());
+                    list.addAll(e.getValue());
+                }
+            }
+        }
+        // submit all requests that is already submitted before this checkpoint
+        submitUntil(SUBMITTED_ID);
+    }
+
+    private void submitUntil(long checkpointId) {
+        NavigableMap<Long, List<CompactorRequest>> canSubmit =
+                snapshotRequests.subMap(Long.MIN_VALUE, true, checkpointId, 
true);
+        Iterator<Entry<Long, List<CompactorRequest>>> iter = 
canSubmit.entrySet().iterator();

Review comment:
       Also, in this case we might not need to use the iterator directly:
   
   ```
   for (Map.Entry<Long, List<CompactorRequest>> entry : canSubmit.entrySet()) {
     
   }
   
   canSubmit.clear();
   ```

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
##########
@@ -177,6 +189,45 @@ private FileSink(BucketsBuilder<IN, ? extends 
BucketsBuilder<IN, ?>> bucketsBuil
                 basePath, bulkWriterFactory, new DateTimeBucketAssigner<>());
     }
 
+    public BucketWriter<IN, String> createBucketWriter() throws IOException {
+        return bucketsBuilder.createBucketWriter();
+    }
+
+    public FileCompactor getFileCompactor() {
+        return bucketsBuilder.getFileCompactor();
+    }
+
+    @Override
+    public DataStream<CommittableMessage<FileSinkCommittable>> 
addPreCommitTopology(
+            DataStream<CommittableMessage<FileSinkCommittable>> 
committableStream) {
+        FileCompactStrategy strategy = bucketsBuilder.getCompactStrategy();
+        if (strategy == null) {
+            // not enabled
+            return committableStream;
+        }
+
+        SingleOutputStreamOperator<CompactorRequest> coordinatorOp =
+                committableStream
+                        .rescale()
+                        .transform(

Review comment:
       We would need to set the uid in someway for these operators since they 
have state.

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
##########
@@ -177,6 +189,45 @@ private FileSink(BucketsBuilder<IN, ? extends 
BucketsBuilder<IN, ?>> bucketsBuil
                 basePath, bulkWriterFactory, new DateTimeBucketAssigner<>());
     }
 
+    public BucketWriter<IN, String> createBucketWriter() throws IOException {
+        return bucketsBuilder.createBucketWriter();
+    }
+
+    public FileCompactor getFileCompactor() {
+        return bucketsBuilder.getFileCompactor();
+    }
+
+    @Override
+    public DataStream<CommittableMessage<FileSinkCommittable>> 
addPreCommitTopology(
+            DataStream<CommittableMessage<FileSinkCommittable>> 
committableStream) {
+        FileCompactStrategy strategy = bucketsBuilder.getCompactStrategy();
+        if (strategy == null) {
+            // not enabled
+            return committableStream;
+        }
+
+        SingleOutputStreamOperator<CompactorRequest> coordinatorOp =
+                committableStream
+                        .rescale()

Review comment:
       The default `rebalance` would be the same with `rescale` here?

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
##########
@@ -275,6 +339,18 @@ public T withOutputFileConfig(final OutputFileConfig 
outputFileConfig) {
             return self();
         }
 
+        public T enableCompact(final FileCompactStrategy strategy, final 
FileCompactor compactor) {
+            this.compactStrategy = strategy;

Review comment:
       checkNotNull, also for the following fileCompactor.

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor.operator;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.connector.file.sink.compactor.FileCompactor;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.util.Hardware;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+/**
+ * An operator that perform compaction for the {@link FileSink}.
+ *
+ * <p>Requests received from the {@link CompactCoordinator} will firstly be 
held in memory, and
+ * snapshot into the state of a checkpoint. When the checkpoint is 
successfully completed, all
+ * requests received before can be submitted. The results can be emitted at 
the next {@link
+ * #prepareSnapshotPreBarrier} invoking after the compaction is finished, to 
ensure that committers
+ * can receive only one CommittableSummary and the corresponding number of 
Committable for a single
+ * checkpoint.
+ */
+public class CompactorOperator
+        extends AbstractStreamOperator<CommittableMessage<FileSinkCommittable>>
+        implements OneInputStreamOperator<
+                        CompactorRequest, 
CommittableMessage<FileSinkCommittable>>,
+                BoundedOneInput,
+                CheckpointListener {
+
+    private static final String COMPACTED_PREFIX = "compacted-";
+    private static final long SUBMITTED_ID = -1L;
+
+    private static final ListStateDescriptor<byte[]> 
REMAINING_REQUESTS_RAW_STATES_DESC =
+            new ListStateDescriptor<>(
+                    "remaining_requests_raw_state", 
BytePrimitiveArraySerializer.INSTANCE);
+
+    private final int compactThreads;
+    private final SimpleVersionedSerializer<FileSinkCommittable> 
committableSerializer;
+
+    private final FileCompactor fileCompactor;
+    private final BucketWriter<?, String> bucketWriter;
+
+    private transient ExecutorService compactService;
+
+    private List<CompactorRequest> collectingRequests = new ArrayList<>();
+    private final TreeMap<Long, List<CompactorRequest>> snapshotRequests = new 
TreeMap<>();
+    private final List<Tuple2<CompactorRequest, 
CompletableFuture<Iterable<FileSinkCommittable>>>>
+            compactingRequests = new LinkedList<>();
+
+    private ListState<Map<Long, List<CompactorRequest>>> 
remainingRequestsState;
+
+    public CompactorOperator(
+            int compactThreads,
+            SimpleVersionedSerializer<FileSinkCommittable> 
committableSerializer,
+            FileCompactor fileCompactor,
+            BucketWriter<?, String> bucketWriter) {
+        this.compactThreads = compactThreads;
+        this.committableSerializer = committableSerializer;
+        this.fileCompactor = fileCompactor;
+        this.bucketWriter = bucketWriter;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.compactService =

Review comment:
       This service is not shut down? 
   
   We might need to try to close the service on `close()` and `finalize()`. See 
`SystemProcessingTimeService`.

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
##########
@@ -177,6 +189,45 @@ private FileSink(BucketsBuilder<IN, ? extends 
BucketsBuilder<IN, ?>> bucketsBuil
                 basePath, bulkWriterFactory, new DateTimeBucketAssigner<>());
     }
 
+    public BucketWriter<IN, String> createBucketWriter() throws IOException {
+        return bucketsBuilder.createBucketWriter();
+    }
+
+    public FileCompactor getFileCompactor() {
+        return bucketsBuilder.getFileCompactor();
+    }
+
+    @Override
+    public DataStream<CommittableMessage<FileSinkCommittable>> 
addPreCommitTopology(
+            DataStream<CommittableMessage<FileSinkCommittable>> 
committableStream) {
+        FileCompactStrategy strategy = bucketsBuilder.getCompactStrategy();
+        if (strategy == null) {
+            // not enabled
+            return committableStream;
+        }
+
+        SingleOutputStreamOperator<CompactorRequest> coordinatorOp =
+                committableStream
+                        .rescale()
+                        .transform(
+                                "CompactorCoordinator",
+                                new CompactorRequestTypeInfo(
+                                        
bucketsBuilder::getCommittableSerializer),
+                                new CompactCoordinatorFactory(this, strategy))
+                        .setParallelism(1)
+                        .setMaxParallelism(1);
+
+        TypeInformation<CommittableMessage<FileSinkCommittable>> 
committableType =
+                committableStream.getType();
+        return coordinatorOp
+                .shuffle()

Review comment:
       Do we have special reason to use shuffle? `shuffle` might incurs 
creating random numbers, which might cause additional cpu usage. 

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor.operator;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Coordinator that coordinates file compaction for the {@link FileSink}.
+ *
+ * <p>All committable emitted from the writers are collected and packed into 
{@link
+ * CompactorRequest}s. The {@link FileCompactStrategy} defines when the 
requests can be fired. When
+ * a firing condition is met, the requests will be sent to the {@link 
CompactorOperator}.
+ *
+ * <p>The {@link CompactCoordinator} stores the non-fired committable as its 
state, and may emit a
+ * request at any time. A {@link CompactorOperator} must ensure that the 
ownership of the
+ * committable in a compact request is successfully handed from the 
coordinator, before it can
+ * actually perform the compaction.
+ */
+public class CompactCoordinator extends 
AbstractStreamOperator<CompactorRequest>
+        implements OneInputStreamOperator<
+                        CommittableMessage<FileSinkCommittable>, 
CompactorRequest>,
+                BoundedOneInput {
+
+    private static final ListStateDescriptor<byte[]> 
REMAINING_COMMITTABLE_RAW_STATES_DESC =
+            new ListStateDescriptor<>(
+                    "remaining_compact_commt_raw_state", 
BytePrimitiveArraySerializer.INSTANCE);
+
+    private final FileCompactStrategy strategy;
+    private final SimpleVersionedSerializer<FileSinkCommittable> 
committableSerializer;
+
+    private final Map<String, CompactorRequest> packing = new HashMap<>();

Review comment:
       `packing` sounds like a verb? Might rename to compactRequests?

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
##########
@@ -275,6 +339,18 @@ public T withOutputFileConfig(final OutputFileConfig 
outputFileConfig) {
             return self();
         }
 
+        public T enableCompact(final FileCompactStrategy strategy, final 
FileCompactor compactor) {
+            this.compactStrategy = strategy;
+            // we always commit before compacting, so hide the file written by 
writer
+            this.outputFileConfig =

Review comment:
       We might not directly modify the outputFileConfig here since users could 
still override the config after enableCompact. We might move the change to the 
prefix to when we create writers. 

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
##########
@@ -177,6 +189,45 @@ private FileSink(BucketsBuilder<IN, ? extends 
BucketsBuilder<IN, ?>> bucketsBuil
                 basePath, bulkWriterFactory, new DateTimeBucketAssigner<>());
     }
 
+    public BucketWriter<IN, String> createBucketWriter() throws IOException {

Review comment:
       I tend to not expose new methods in `FileSink`. 
   
   Is it possible to pass `SerializableSupplier<FileCompactor>` and so on for 
the fields required to create the `CompactorOperatorFactory` ? Or directly pass 
a `SerializableSupplier<CompactorOperator>`

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactCoordinator.java
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor.operator;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Coordinator that coordinates file compaction for the {@link FileSink}.
+ *
+ * <p>All committable emitted from the writers are collected and packed into 
{@link
+ * CompactorRequest}s. The {@link FileCompactStrategy} defines when the 
requests can be fired. When
+ * a firing condition is met, the requests will be sent to the {@link 
CompactorOperator}.
+ *
+ * <p>The {@link CompactCoordinator} stores the non-fired committable as its 
state, and may emit a
+ * request at any time. A {@link CompactorOperator} must ensure that the 
ownership of the
+ * committable in a compact request is successfully handed from the 
coordinator, before it can
+ * actually perform the compaction.
+ */
+public class CompactCoordinator extends 
AbstractStreamOperator<CompactorRequest>
+        implements OneInputStreamOperator<
+                        CommittableMessage<FileSinkCommittable>, 
CompactorRequest>,
+                BoundedOneInput {
+
+    private static final ListStateDescriptor<byte[]> 
REMAINING_COMMITTABLE_RAW_STATES_DESC =
+            new ListStateDescriptor<>(
+                    "remaining_compact_commt_raw_state", 
BytePrimitiveArraySerializer.INSTANCE);
+
+    private final FileCompactStrategy strategy;
+    private final SimpleVersionedSerializer<FileSinkCommittable> 
committableSerializer;
+
+    private final Map<String, CompactorRequest> packing = new HashMap<>();
+    private final Map<String, CompactTrigger> triggers = new HashMap<>();
+
+    private ListState<FileSinkCommittable> remainingCommittableState;
+
+    public CompactCoordinator(
+            FileCompactStrategy strategy,
+            SimpleVersionedSerializer<FileSinkCommittable> 
committableSerializer) {
+        this.strategy = strategy;
+        this.committableSerializer = checkNotNull(committableSerializer);
+    }
+
+    @Override
+    public void 
processElement(StreamRecord<CommittableMessage<FileSinkCommittable>> element)
+            throws Exception {
+        CommittableMessage<FileSinkCommittable> message = element.getValue();
+        if (message instanceof CommittableWithLineage) {
+            FileSinkCommittable committable =
+                    ((CommittableWithLineage<FileSinkCommittable>) 
element.getValue())
+                            .getCommittable();
+            String bucketId = committable.getBucketId();
+            if (packAndTrigger(bucketId, committable)) {
+                fireAndPurge(bucketId);
+            }
+        }
+        // or message instanceof CommittableSummary
+        // info in CommittableSummary is not necessary for compacting at 
present, ignore it
+    }
+
+    private boolean packAndTrigger(String bucketId, FileSinkCommittable 
committable) {

Review comment:
       nit: I think it might not need to pass `bukcetId` separately, it is a 
part of `committable`

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor.operator;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.connector.file.sink.compactor.FileCompactor;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.util.Hardware;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+/**
+ * An operator that perform compaction for the {@link FileSink}.
+ *
+ * <p>Requests received from the {@link CompactCoordinator} will firstly be 
held in memory, and
+ * snapshot into the state of a checkpoint. When the checkpoint is 
successfully completed, all
+ * requests received before can be submitted. The results can be emitted at 
the next {@link
+ * #prepareSnapshotPreBarrier} invoking after the compaction is finished, to 
ensure that committers
+ * can receive only one CommittableSummary and the corresponding number of 
Committable for a single
+ * checkpoint.
+ */
+public class CompactorOperator
+        extends AbstractStreamOperator<CommittableMessage<FileSinkCommittable>>
+        implements OneInputStreamOperator<
+                        CompactorRequest, 
CommittableMessage<FileSinkCommittable>>,
+                BoundedOneInput,
+                CheckpointListener {
+
+    private static final String COMPACTED_PREFIX = "compacted-";
+    private static final long SUBMITTED_ID = -1L;
+
+    private static final ListStateDescriptor<byte[]> 
REMAINING_REQUESTS_RAW_STATES_DESC =
+            new ListStateDescriptor<>(
+                    "remaining_requests_raw_state", 
BytePrimitiveArraySerializer.INSTANCE);
+
+    private final int compactThreads;
+    private final SimpleVersionedSerializer<FileSinkCommittable> 
committableSerializer;
+
+    private final FileCompactor fileCompactor;
+    private final BucketWriter<?, String> bucketWriter;
+
+    private transient ExecutorService compactService;
+
+    private List<CompactorRequest> collectingRequests = new ArrayList<>();
+    private final TreeMap<Long, List<CompactorRequest>> snapshotRequests = new 
TreeMap<>();
+    private final List<Tuple2<CompactorRequest, 
CompletableFuture<Iterable<FileSinkCommittable>>>>
+            compactingRequests = new LinkedList<>();
+
+    private ListState<Map<Long, List<CompactorRequest>>> 
remainingRequestsState;
+
+    public CompactorOperator(
+            int compactThreads,
+            SimpleVersionedSerializer<FileSinkCommittable> 
committableSerializer,
+            FileCompactor fileCompactor,
+            BucketWriter<?, String> bucketWriter) {
+        this.compactThreads = compactThreads;
+        this.committableSerializer = committableSerializer;
+        this.fileCompactor = fileCompactor;
+        this.bucketWriter = bucketWriter;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.compactService =
+                Executors.newFixedThreadPool(
+                        Math.max(1, Math.min(compactThreads, 
Hardware.getNumberCPUCores())),
+                        new ExecutorThreadFactory("compact-executor"));
+    }
+
+    @Override
+    public void processElement(StreamRecord<CompactorRequest> element) throws 
Exception {
+        collectingRequests.add(element.getValue());
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        // add collecting requests into the final snapshot
+        snapshotRequests.put(Long.MAX_VALUE, collectingRequests);
+        collectingRequests = new ArrayList<>();
+
+        // submit all requests and wait until they are done
+        submitUntil(Long.MAX_VALUE);
+        assert snapshotRequests.isEmpty();
+
+        getAllTasksFuture().join();
+        emitCompacted(null);
+        assert compactingRequests.isEmpty();
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        super.notifyCheckpointComplete(checkpointId);
+        submitUntil(checkpointId);
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        emitCompacted(checkpointId);
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+
+        // add collecting requests during the checkpoint into the snapshot
+        snapshotRequests.put(context.getCheckpointId(), collectingRequests);
+        collectingRequests = new ArrayList<>();
+
+        // snapshot all compacting requests as well, including the requests 
that are not finished
+        // when invoking prepareSnapshotPreBarrier but finished now, since 
they are not emitted yet
+        Map<Long, List<CompactorRequest>> requests = new 
HashMap<>(snapshotRequests);
+        requests.computeIfAbsent(SUBMITTED_ID, id -> new ArrayList<>())
+                .addAll(compactingRequests.stream().map(r -> 
r.f0).collect(Collectors.toList()));
+        remainingRequestsState.update(Collections.singletonList(requests));
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+
+        remainingRequestsState =
+                new SimpleVersionedListState<>(
+                        context.getOperatorStateStore()
+                                
.getListState(REMAINING_REQUESTS_RAW_STATES_DESC),
+                        new RemainingRequestsSerializer(
+                                new 
CompactorRequestSerializer(committableSerializer)));
+
+        Iterable<Map<Long, List<CompactorRequest>>> stateRemaining = 
remainingRequestsState.get();
+        if (stateRemaining != null) {
+            for (Map<Long, List<CompactorRequest>> requests : stateRemaining) {
+                // elements can be more than one when redistributed after 
parallelism changing
+                for (Map.Entry<Long, List<CompactorRequest>> e : 
requests.entrySet()) {
+                    List<CompactorRequest> list =
+                            snapshotRequests.computeIfAbsent(e.getKey(), id -> 
new ArrayList<>());
+                    list.addAll(e.getValue());
+                }
+            }
+        }
+        // submit all requests that is already submitted before this checkpoint
+        submitUntil(SUBMITTED_ID);
+    }
+
+    private void submitUntil(long checkpointId) {
+        NavigableMap<Long, List<CompactorRequest>> canSubmit =
+                snapshotRequests.subMap(Long.MIN_VALUE, true, checkpointId, 
true);
+        Iterator<Entry<Long, List<CompactorRequest>>> iter = 
canSubmit.entrySet().iterator();

Review comment:
       nit: might not use part of word as variable name.

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor.operator;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.connector.file.sink.compactor.FileCompactor;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.util.Hardware;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+/**
+ * An operator that perform compaction for the {@link FileSink}.
+ *
+ * <p>Requests received from the {@link CompactCoordinator} will firstly be 
held in memory, and
+ * snapshot into the state of a checkpoint. When the checkpoint is 
successfully completed, all
+ * requests received before can be submitted. The results can be emitted at 
the next {@link
+ * #prepareSnapshotPreBarrier} invoking after the compaction is finished, to 
ensure that committers
+ * can receive only one CommittableSummary and the corresponding number of 
Committable for a single
+ * checkpoint.
+ */
+public class CompactorOperator
+        extends AbstractStreamOperator<CommittableMessage<FileSinkCommittable>>
+        implements OneInputStreamOperator<
+                        CompactorRequest, 
CommittableMessage<FileSinkCommittable>>,
+                BoundedOneInput,
+                CheckpointListener {
+
+    private static final String COMPACTED_PREFIX = "compacted-";
+    private static final long SUBMITTED_ID = -1L;
+
+    private static final ListStateDescriptor<byte[]> 
REMAINING_REQUESTS_RAW_STATES_DESC =
+            new ListStateDescriptor<>(
+                    "remaining_requests_raw_state", 
BytePrimitiveArraySerializer.INSTANCE);
+
+    private final int compactThreads;
+    private final SimpleVersionedSerializer<FileSinkCommittable> 
committableSerializer;
+
+    private final FileCompactor fileCompactor;
+    private final BucketWriter<?, String> bucketWriter;
+
+    private transient ExecutorService compactService;
+
+    private List<CompactorRequest> collectingRequests = new ArrayList<>();

Review comment:
       Might add some comments to these collections? 

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor.operator;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.connector.file.sink.compactor.FileCompactor;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.util.Hardware;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+/**
+ * An operator that perform compaction for the {@link FileSink}.
+ *
+ * <p>Requests received from the {@link CompactCoordinator} will firstly be 
held in memory, and
+ * snapshot into the state of a checkpoint. When the checkpoint is 
successfully completed, all
+ * requests received before can be submitted. The results can be emitted at 
the next {@link
+ * #prepareSnapshotPreBarrier} invoking after the compaction is finished, to 
ensure that committers
+ * can receive only one CommittableSummary and the corresponding number of 
Committable for a single
+ * checkpoint.
+ */
+public class CompactorOperator
+        extends AbstractStreamOperator<CommittableMessage<FileSinkCommittable>>
+        implements OneInputStreamOperator<
+                        CompactorRequest, 
CommittableMessage<FileSinkCommittable>>,
+                BoundedOneInput,
+                CheckpointListener {
+
+    private static final String COMPACTED_PREFIX = "compacted-";
+    private static final long SUBMITTED_ID = -1L;
+
+    private static final ListStateDescriptor<byte[]> 
REMAINING_REQUESTS_RAW_STATES_DESC =
+            new ListStateDescriptor<>(
+                    "remaining_requests_raw_state", 
BytePrimitiveArraySerializer.INSTANCE);
+
+    private final int compactThreads;
+    private final SimpleVersionedSerializer<FileSinkCommittable> 
committableSerializer;
+
+    private final FileCompactor fileCompactor;
+    private final BucketWriter<?, String> bucketWriter;
+
+    private transient ExecutorService compactService;
+
+    private List<CompactorRequest> collectingRequests = new ArrayList<>();
+    private final TreeMap<Long, List<CompactorRequest>> snapshotRequests = new 
TreeMap<>();
+    private final List<Tuple2<CompactorRequest, 
CompletableFuture<Iterable<FileSinkCommittable>>>>
+            compactingRequests = new LinkedList<>();
+
+    private ListState<Map<Long, List<CompactorRequest>>> 
remainingRequestsState;
+
+    public CompactorOperator(
+            int compactThreads,
+            SimpleVersionedSerializer<FileSinkCommittable> 
committableSerializer,
+            FileCompactor fileCompactor,
+            BucketWriter<?, String> bucketWriter) {
+        this.compactThreads = compactThreads;
+        this.committableSerializer = committableSerializer;
+        this.fileCompactor = fileCompactor;
+        this.bucketWriter = bucketWriter;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.compactService =
+                Executors.newFixedThreadPool(
+                        Math.max(1, Math.min(compactThreads, 
Hardware.getNumberCPUCores())),
+                        new ExecutorThreadFactory("compact-executor"));
+    }
+
+    @Override
+    public void processElement(StreamRecord<CompactorRequest> element) throws 
Exception {
+        collectingRequests.add(element.getValue());
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        // add collecting requests into the final snapshot
+        snapshotRequests.put(Long.MAX_VALUE, collectingRequests);
+        collectingRequests = new ArrayList<>();
+
+        // submit all requests and wait until they are done
+        submitUntil(Long.MAX_VALUE);
+        assert snapshotRequests.isEmpty();
+
+        getAllTasksFuture().join();
+        emitCompacted(null);
+        assert compactingRequests.isEmpty();
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        super.notifyCheckpointComplete(checkpointId);
+        submitUntil(checkpointId);
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        emitCompacted(checkpointId);
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+
+        // add collecting requests during the checkpoint into the snapshot
+        snapshotRequests.put(context.getCheckpointId(), collectingRequests);
+        collectingRequests = new ArrayList<>();
+
+        // snapshot all compacting requests as well, including the requests 
that are not finished
+        // when invoking prepareSnapshotPreBarrier but finished now, since 
they are not emitted yet
+        Map<Long, List<CompactorRequest>> requests = new 
HashMap<>(snapshotRequests);
+        requests.computeIfAbsent(SUBMITTED_ID, id -> new ArrayList<>())
+                .addAll(compactingRequests.stream().map(r -> 
r.f0).collect(Collectors.toList()));
+        remainingRequestsState.update(Collections.singletonList(requests));
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+
+        remainingRequestsState =
+                new SimpleVersionedListState<>(
+                        context.getOperatorStateStore()
+                                
.getListState(REMAINING_REQUESTS_RAW_STATES_DESC),
+                        new RemainingRequestsSerializer(
+                                new 
CompactorRequestSerializer(committableSerializer)));
+
+        Iterable<Map<Long, List<CompactorRequest>>> stateRemaining = 
remainingRequestsState.get();
+        if (stateRemaining != null) {
+            for (Map<Long, List<CompactorRequest>> requests : stateRemaining) {
+                // elements can be more than one when redistributed after 
parallelism changing
+                for (Map.Entry<Long, List<CompactorRequest>> e : 
requests.entrySet()) {
+                    List<CompactorRequest> list =
+                            snapshotRequests.computeIfAbsent(e.getKey(), id -> 
new ArrayList<>());
+                    list.addAll(e.getValue());
+                }
+            }
+        }
+        // submit all requests that is already submitted before this checkpoint
+        submitUntil(SUBMITTED_ID);
+    }
+
+    private void submitUntil(long checkpointId) {
+        NavigableMap<Long, List<CompactorRequest>> canSubmit =
+                snapshotRequests.subMap(Long.MIN_VALUE, true, checkpointId, 
true);
+        Iterator<Entry<Long, List<CompactorRequest>>> iter = 
canSubmit.entrySet().iterator();
+        while (iter.hasNext()) {
+            Entry<Long, List<CompactorRequest>> requestEntry = iter.next();
+            for (CompactorRequest req : requestEntry.getValue()) {
+                CompletableFuture<Iterable<FileSinkCommittable>> resultFuture =
+                        new CompletableFuture<>();
+                compactingRequests.add(new Tuple2<>(req, resultFuture));
+                compactService.submit(
+                        () -> {
+                            try {
+                                Iterable<FileSinkCommittable> result = 
compact(req);
+                                resultFuture.complete(result);
+                            } catch (Exception e) {
+                                resultFuture.completeExceptionally(e);
+                            }
+                        });
+            }
+            iter.remove();
+        }
+    }
+
+    private Iterable<FileSinkCommittable> compact(CompactorRequest request) 
throws Exception {
+        List<FileSinkCommittable> results = new 
ArrayList<>(request.getCommittableToPassthrough());
+
+        List<Path> compactingFiles = getCompactingPath(request, results);
+        if (compactingFiles.isEmpty()) {
+            return results;
+        }
+
+        Path targetPath = assembleCompactedFilePath(compactingFiles.get(0));
+        CompactingFileWriter compactingFileWriter =
+                bucketWriter.openNewCompactingFile(
+                        fileCompactor.getWriterType(),
+                        request.getBucketId(),
+                        targetPath,
+                        System.currentTimeMillis());
+        fileCompactor.compact(compactingFiles, compactingFileWriter);
+        PendingFileRecoverable compactedPendingFile = 
compactingFileWriter.closeForCommit();
+
+        FileSinkCommittable compacted =
+                new FileSinkCommittable(request.getBucketId(), 
compactedPendingFile);
+        results.add(0, compacted);
+        for (Path f : compactingFiles) {
+            // cleanup compacted files
+            results.add(new FileSinkCommittable(request.getBucketId(), f));
+        }
+
+        return results;
+    }
+
+    // results: side output pass through committable
+    private List<Path> getCompactingPath(
+            CompactorRequest request, List<FileSinkCommittable> results) 
throws IOException {
+        List<FileSinkCommittable> compactingCommittable = 
request.getCommittableToCompact();
+        List<Path> compactingFiles = new ArrayList<>();
+
+        for (FileSinkCommittable committable : compactingCommittable) {
+            PendingFileRecoverable pendingFile = committable.getPendingFile();
+            if (pendingFile == null
+                    || pendingFile.getPath() == null
+                    || !pendingFile.getPath().getName().startsWith(".")) {

Review comment:
       Why don't we pass-through these pending files in coordinator? Since it 
might cause the `CompactStrategy` not fulfilled if we consider these files here.

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/compactor/operator/CompactorOperator.java
##########
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.compactor.operator;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.connector.file.sink.compactor.FileCompactor;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerialization;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.util.Hardware;
+import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
+import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
+import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.PendingFileRecoverable;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+/**
+ * An operator that perform compaction for the {@link FileSink}.
+ *
+ * <p>Requests received from the {@link CompactCoordinator} will firstly be 
held in memory, and
+ * snapshot into the state of a checkpoint. When the checkpoint is 
successfully completed, all
+ * requests received before can be submitted. The results can be emitted at 
the next {@link
+ * #prepareSnapshotPreBarrier} invoking after the compaction is finished, to 
ensure that committers
+ * can receive only one CommittableSummary and the corresponding number of 
Committable for a single
+ * checkpoint.
+ */
+public class CompactorOperator
+        extends AbstractStreamOperator<CommittableMessage<FileSinkCommittable>>
+        implements OneInputStreamOperator<
+                        CompactorRequest, 
CommittableMessage<FileSinkCommittable>>,
+                BoundedOneInput,
+                CheckpointListener {
+
+    private static final String COMPACTED_PREFIX = "compacted-";
+    private static final long SUBMITTED_ID = -1L;
+
+    private static final ListStateDescriptor<byte[]> 
REMAINING_REQUESTS_RAW_STATES_DESC =
+            new ListStateDescriptor<>(
+                    "remaining_requests_raw_state", 
BytePrimitiveArraySerializer.INSTANCE);
+
+    private final int compactThreads;
+    private final SimpleVersionedSerializer<FileSinkCommittable> 
committableSerializer;
+
+    private final FileCompactor fileCompactor;
+    private final BucketWriter<?, String> bucketWriter;
+
+    private transient ExecutorService compactService;
+
+    private List<CompactorRequest> collectingRequests = new ArrayList<>();
+    private final TreeMap<Long, List<CompactorRequest>> snapshotRequests = new 
TreeMap<>();
+    private final List<Tuple2<CompactorRequest, 
CompletableFuture<Iterable<FileSinkCommittable>>>>
+            compactingRequests = new LinkedList<>();
+
+    private ListState<Map<Long, List<CompactorRequest>>> 
remainingRequestsState;
+
+    public CompactorOperator(
+            int compactThreads,
+            SimpleVersionedSerializer<FileSinkCommittable> 
committableSerializer,
+            FileCompactor fileCompactor,
+            BucketWriter<?, String> bucketWriter) {
+        this.compactThreads = compactThreads;
+        this.committableSerializer = committableSerializer;
+        this.fileCompactor = fileCompactor;
+        this.bucketWriter = bucketWriter;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.compactService =
+                Executors.newFixedThreadPool(
+                        Math.max(1, Math.min(compactThreads, 
Hardware.getNumberCPUCores())),
+                        new ExecutorThreadFactory("compact-executor"));
+    }
+
+    @Override
+    public void processElement(StreamRecord<CompactorRequest> element) throws 
Exception {
+        collectingRequests.add(element.getValue());
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        // add collecting requests into the final snapshot
+        snapshotRequests.put(Long.MAX_VALUE, collectingRequests);
+        collectingRequests = new ArrayList<>();
+
+        // submit all requests and wait until they are done
+        submitUntil(Long.MAX_VALUE);
+        assert snapshotRequests.isEmpty();
+
+        getAllTasksFuture().join();
+        emitCompacted(null);
+        assert compactingRequests.isEmpty();
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+        super.notifyCheckpointComplete(checkpointId);
+        submitUntil(checkpointId);
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        emitCompacted(checkpointId);
+    }
+
+    @Override
+    public void snapshotState(StateSnapshotContext context) throws Exception {
+        super.snapshotState(context);
+
+        // add collecting requests during the checkpoint into the snapshot
+        snapshotRequests.put(context.getCheckpointId(), collectingRequests);
+        collectingRequests = new ArrayList<>();
+
+        // snapshot all compacting requests as well, including the requests 
that are not finished
+        // when invoking prepareSnapshotPreBarrier but finished now, since 
they are not emitted yet
+        Map<Long, List<CompactorRequest>> requests = new 
HashMap<>(snapshotRequests);
+        requests.computeIfAbsent(SUBMITTED_ID, id -> new ArrayList<>())
+                .addAll(compactingRequests.stream().map(r -> 
r.f0).collect(Collectors.toList()));
+        remainingRequestsState.update(Collections.singletonList(requests));
+    }
+
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+
+        remainingRequestsState =
+                new SimpleVersionedListState<>(
+                        context.getOperatorStateStore()
+                                
.getListState(REMAINING_REQUESTS_RAW_STATES_DESC),
+                        new RemainingRequestsSerializer(
+                                new 
CompactorRequestSerializer(committableSerializer)));
+
+        Iterable<Map<Long, List<CompactorRequest>>> stateRemaining = 
remainingRequestsState.get();
+        if (stateRemaining != null) {
+            for (Map<Long, List<CompactorRequest>> requests : stateRemaining) {
+                // elements can be more than one when redistributed after 
parallelism changing
+                for (Map.Entry<Long, List<CompactorRequest>> e : 
requests.entrySet()) {
+                    List<CompactorRequest> list =
+                            snapshotRequests.computeIfAbsent(e.getKey(), id -> 
new ArrayList<>());
+                    list.addAll(e.getValue());
+                }
+            }
+        }
+        // submit all requests that is already submitted before this checkpoint
+        submitUntil(SUBMITTED_ID);
+    }
+
+    private void submitUntil(long checkpointId) {
+        NavigableMap<Long, List<CompactorRequest>> canSubmit =
+                snapshotRequests.subMap(Long.MIN_VALUE, true, checkpointId, 
true);
+        Iterator<Entry<Long, List<CompactorRequest>>> iter = 
canSubmit.entrySet().iterator();
+        while (iter.hasNext()) {
+            Entry<Long, List<CompactorRequest>> requestEntry = iter.next();
+            for (CompactorRequest req : requestEntry.getValue()) {
+                CompletableFuture<Iterable<FileSinkCommittable>> resultFuture =
+                        new CompletableFuture<>();
+                compactingRequests.add(new Tuple2<>(req, resultFuture));
+                compactService.submit(
+                        () -> {
+                            try {
+                                Iterable<FileSinkCommittable> result = 
compact(req);
+                                resultFuture.complete(result);
+                            } catch (Exception e) {
+                                resultFuture.completeExceptionally(e);
+                            }
+                        });
+            }
+            iter.remove();
+        }
+    }
+
+    private Iterable<FileSinkCommittable> compact(CompactorRequest request) 
throws Exception {
+        List<FileSinkCommittable> results = new 
ArrayList<>(request.getCommittableToPassthrough());
+
+        List<Path> compactingFiles = getCompactingPath(request, results);
+        if (compactingFiles.isEmpty()) {
+            return results;
+        }
+
+        Path targetPath = assembleCompactedFilePath(compactingFiles.get(0));
+        CompactingFileWriter compactingFileWriter =
+                bucketWriter.openNewCompactingFile(
+                        fileCompactor.getWriterType(),
+                        request.getBucketId(),
+                        targetPath,
+                        System.currentTimeMillis());
+        fileCompactor.compact(compactingFiles, compactingFileWriter);
+        PendingFileRecoverable compactedPendingFile = 
compactingFileWriter.closeForCommit();
+
+        FileSinkCommittable compacted =
+                new FileSinkCommittable(request.getBucketId(), 
compactedPendingFile);
+        results.add(0, compacted);
+        for (Path f : compactingFiles) {
+            // cleanup compacted files
+            results.add(new FileSinkCommittable(request.getBucketId(), f));
+        }
+
+        return results;
+    }
+
+    // results: side output pass through committable
+    private List<Path> getCompactingPath(
+            CompactorRequest request, List<FileSinkCommittable> results) 
throws IOException {
+        List<FileSinkCommittable> compactingCommittable = 
request.getCommittableToCompact();
+        List<Path> compactingFiles = new ArrayList<>();
+
+        for (FileSinkCommittable committable : compactingCommittable) {
+            PendingFileRecoverable pendingFile = committable.getPendingFile();
+            if (pendingFile == null
+                    || pendingFile.getPath() == null
+                    || !pendingFile.getPath().getName().startsWith(".")) {
+                // the file may be written with writer of elder version, or
+                // the file will be visible once committed, so it can not be 
compacted.
+                // pass through, add to results, do not add to compacting files
+                results.add(committable);
+            } else {
+                // commit the pending file and compact the committed file
+                
bucketWriter.recoverPendingFile(committable.getPendingFile()).commitAfterRecovery();
+                compactingFiles.add(committable.getPendingFile().getPath());
+            }
+        }
+        return compactingFiles;
+    }
+
+    private static Path assembleCompactedFilePath(Path uncompactedPath) {
+        String uncompactedName = uncompactedPath.getName();
+        if (uncompactedName.startsWith(".")) {
+            uncompactedName = uncompactedName.substring(1);
+        }
+        return new Path(uncompactedPath.getParent(), COMPACTED_PREFIX + 
uncompactedName);

Review comment:
       Do we need to add the prefix? Perhaps we could keep user's 
outputFileNameConfig? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to