mschroederi commented on a change in pull request #113:
URL: https://github.com/apache/bahir-flink/pull/113#discussion_r594310198



##########
File path: 
flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommitter.java
##########
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.committer;
+
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.streaming.connectors.pinot.PinotControllerApi;
+import 
org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
+import 
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
+import org.apache.pinot.tools.admin.command.UploadSegmentCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.*;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Global committer takes committables from {@link 
org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter},
+ * generates segments and pushed them to the Pinot controller.
+ * Note: We use a custom multithreading approach to parallelize the segment 
creation and upload to
+ * overcome the performance limitations resulting from using a {@link 
GlobalCommitter} always
+ * running at a parallelism of 1.
+ */
+public class PinotSinkGlobalCommitter implements 
GlobalCommitter<PinotSinkCommittable, PinotSinkGlobalCommittable> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PinotSinkGlobalCommitter.class);
+
+    private final String pinotControllerHost;
+    private final String pinotControllerPort;
+    private final String tableName;
+    private final SegmentNameGenerator segmentNameGenerator;
+    private final String tempDirPrefix;
+    private final FileSystemAdapter fsAdapter;
+    private final String timeColumnName;
+    private final TimeUnit segmentTimeUnit;
+
+    /**
+     * @param pinotControllerHost  Host of the Pinot controller
+     * @param pinotControllerPort  Port of the Pinot controller
+     * @param tableName            Target table's name
+     * @param segmentNameGenerator Pinot segment name generator
+     * @param fsAdapter            Adapter for interacting with the shared 
file system
+     * @param timeColumnName       Name of the column containing the timestamp
+     * @param segmentTimeUnit      Unit of the time column
+     */
+    public PinotSinkGlobalCommitter(String pinotControllerHost, String 
pinotControllerPort, String tableName, SegmentNameGenerator 
segmentNameGenerator, String tempDirPrefix, FileSystemAdapter fsAdapter, String 
timeColumnName, TimeUnit segmentTimeUnit) {
+        this.pinotControllerHost = checkNotNull(pinotControllerHost);
+        this.pinotControllerPort = checkNotNull(pinotControllerPort);
+        this.tableName = checkNotNull(tableName);
+        this.segmentNameGenerator = checkNotNull(segmentNameGenerator);
+        this.tempDirPrefix = checkNotNull(tempDirPrefix);
+        this.fsAdapter = checkNotNull(fsAdapter);
+        this.timeColumnName = checkNotNull(timeColumnName);
+        this.segmentTimeUnit = checkNotNull(segmentTimeUnit);
+    }
+
+    /**
+     * Identifies global committables that need to be re-committed from a list 
of recovered committables.
+     *
+     * @param globalCommittables List of global committables that are checked 
for required re-commit
+     * @return List of global committable that need to be re-committed
+     * @throws IOException
+     */
+    @Override
+    public List<PinotSinkGlobalCommittable> 
filterRecoveredCommittables(List<PinotSinkGlobalCommittable> 
globalCommittables) throws IOException {
+        PinotControllerApi controllerApi = new 
PinotControllerApi(this.pinotControllerHost, this.pinotControllerPort);
+        List<PinotSinkGlobalCommittable> committablesToRetry = new 
ArrayList<>();
+
+        for (PinotSinkGlobalCommittable globalCommittable : 
globalCommittables) {
+            CommitStatus commitStatus = 
this.getCommitStatus(globalCommittable);
+
+            if (commitStatus.getMissingSegmentNames().isEmpty()) {
+                // All segments were already committed. Thus, we do not need 
to retry the commit.
+                continue;
+            }
+
+            for (String existingSegment : 
commitStatus.getExistingSegmentNames()) {
+                // Some but not all segments were already committed. As we 
cannot assure the data
+                // files containing the same data as originally when 
recovering from failure,
+                // we delete the already committed segments in order to 
recommit them later on.
+                controllerApi.deleteSegment(tableName, existingSegment);
+            }
+            committablesToRetry.add(globalCommittable);
+        }
+
+        return committablesToRetry;
+    }
+
+    /**
+     * Combines multiple {@link PinotSinkCommittable}s into one {@link 
PinotSinkGlobalCommittable}
+     * by finding the minimum and maximum timestamps from the provided {@link 
PinotSinkCommittable}s.
+     *
+     * @param committables Committables created by {@link 
org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter}
+     * @return Global committer committable
+     */
+    @Override
+    public PinotSinkGlobalCommittable combine(List<PinotSinkCommittable> 
committables) {
+        List<String> dataFilePaths = new ArrayList<>();
+        long minTimestamp = Long.MAX_VALUE;
+        long maxTimestamp = Long.MIN_VALUE;
+
+        // Extract all data file paths and the overall minimum and maximum 
timestamps
+        // from all committables
+        for (PinotSinkCommittable committable : committables) {
+            dataFilePaths.add(committable.getDataFilePath());
+            minTimestamp = Long.min(minTimestamp, 
committable.getMinTimestamp());
+            maxTimestamp = Long.max(maxTimestamp, 
committable.getMaxTimestamp());
+        }
+
+        LOG.info("Combined {} committables into one global committable", 
committables.size());
+        return new PinotSinkGlobalCommittable(dataFilePaths, minTimestamp, 
maxTimestamp);
+    }
+
+    /**
+     * Copies data files from shared filesystem to the local filesystem, 
generates segments with names
+     * according to the segment naming schema and finally pushes the segments 
to the Pinot cluster.
+     * Before pushing a segment it is checked whether there already exists a 
segment with that name
+     * in the Pinot cluster by calling the Pinot controller. In case there is 
one, it gets deleted.
+     *
+     * @param globalCommittables List of global committables
+     * @return Global committables whose commit failed
+     * @throws IOException
+     */
+    @Override
+    public List<PinotSinkGlobalCommittable> 
commit(List<PinotSinkGlobalCommittable> globalCommittables) throws IOException {
+        // Retrieve the Pinot table schema and the Pinot table config from the 
Pinot controller
+        PinotControllerApi controllerApi = new 
PinotControllerApi(this.pinotControllerHost, this.pinotControllerPort);
+        Schema tableSchema = controllerApi.getSchema(this.tableName);
+        TableConfig tableConfig = controllerApi.getTableConfig(this.tableName);
+
+        // List of failed global committables that can be retried later on
+        List<PinotSinkGlobalCommittable> failedCommits = new ArrayList<>();
+
+        for (PinotSinkGlobalCommittable globalCommittable : 
globalCommittables) {
+            // Make sure to remove all previously committed segments in 
globalCommittable
+            // when recovering from failure
+            CommitStatus commitStatus = 
this.getCommitStatus(globalCommittable);
+            for (String existingSegment : 
commitStatus.getExistingSegmentNames()) {
+                // Some but not all segments were already committed. As we 
cannot assure the data
+                // files containing the same data as originally when 
recovering from failure,
+                // we delete the already committed segments in order to 
recommit them later on.
+                controllerApi.deleteSegment(tableName, existingSegment);
+            }
+
+            // We use a thread pool in order to parallelize the segment 
creation and segment upload
+            ExecutorService pool = Executors.newCachedThreadPool();
+            Set<Future<Boolean>> resultFutures = new HashSet<>();
+
+            // Commit all segments in globalCommittable
+            int sequenceId = 0;
+            for (String dataFilePath : globalCommittable.getDataFilePaths()) {

Review comment:
       good hint. Thanks 👍




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to