mschroederi commented on a change in pull request #113: URL: https://github.com/apache/bahir-flink/pull/113#discussion_r594310198
########## File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/committer/PinotSinkGlobalCommitter.java ########## @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pinot.committer; + +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.streaming.connectors.pinot.PinotControllerApi; +import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter; +import org.apache.pinot.common.segment.ReadMode; +import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; +import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment; +import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver; +import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.core.segment.name.SegmentNameGenerator; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.FileFormat; +import org.apache.pinot.tools.admin.command.UploadSegmentCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.*; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Global committer takes committables from {@link org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter}, + * generates segments and pushed them to the Pinot controller. + * Note: We use a custom multithreading approach to parallelize the segment creation and upload to + * overcome the performance limitations resulting from using a {@link GlobalCommitter} always + * running at a parallelism of 1. + */ +public class PinotSinkGlobalCommitter implements GlobalCommitter<PinotSinkCommittable, PinotSinkGlobalCommittable> { + + private static final Logger LOG = LoggerFactory.getLogger(PinotSinkGlobalCommitter.class); + + private final String pinotControllerHost; + private final String pinotControllerPort; + private final String tableName; + private final SegmentNameGenerator segmentNameGenerator; + private final String tempDirPrefix; + private final FileSystemAdapter fsAdapter; + private final String timeColumnName; + private final TimeUnit segmentTimeUnit; + + /** + * @param pinotControllerHost Host of the Pinot controller + * @param pinotControllerPort Port of the Pinot controller + * @param tableName Target table's name + * @param segmentNameGenerator Pinot segment name generator + * @param fsAdapter Adapter for interacting with the shared file system + * @param timeColumnName Name of the column containing the timestamp + * @param segmentTimeUnit Unit of the time column + */ + public PinotSinkGlobalCommitter(String pinotControllerHost, String pinotControllerPort, String tableName, SegmentNameGenerator segmentNameGenerator, String tempDirPrefix, FileSystemAdapter fsAdapter, String timeColumnName, TimeUnit segmentTimeUnit) { + this.pinotControllerHost = checkNotNull(pinotControllerHost); + this.pinotControllerPort = checkNotNull(pinotControllerPort); + this.tableName = checkNotNull(tableName); + this.segmentNameGenerator = checkNotNull(segmentNameGenerator); + this.tempDirPrefix = checkNotNull(tempDirPrefix); + this.fsAdapter = checkNotNull(fsAdapter); + this.timeColumnName = checkNotNull(timeColumnName); + this.segmentTimeUnit = checkNotNull(segmentTimeUnit); + } + + /** + * Identifies global committables that need to be re-committed from a list of recovered committables. + * + * @param globalCommittables List of global committables that are checked for required re-commit + * @return List of global committable that need to be re-committed + * @throws IOException + */ + @Override + public List<PinotSinkGlobalCommittable> filterRecoveredCommittables(List<PinotSinkGlobalCommittable> globalCommittables) throws IOException { + PinotControllerApi controllerApi = new PinotControllerApi(this.pinotControllerHost, this.pinotControllerPort); + List<PinotSinkGlobalCommittable> committablesToRetry = new ArrayList<>(); + + for (PinotSinkGlobalCommittable globalCommittable : globalCommittables) { + CommitStatus commitStatus = this.getCommitStatus(globalCommittable); + + if (commitStatus.getMissingSegmentNames().isEmpty()) { + // All segments were already committed. Thus, we do not need to retry the commit. + continue; + } + + for (String existingSegment : commitStatus.getExistingSegmentNames()) { + // Some but not all segments were already committed. As we cannot assure the data + // files containing the same data as originally when recovering from failure, + // we delete the already committed segments in order to recommit them later on. + controllerApi.deleteSegment(tableName, existingSegment); + } + committablesToRetry.add(globalCommittable); + } + + return committablesToRetry; + } + + /** + * Combines multiple {@link PinotSinkCommittable}s into one {@link PinotSinkGlobalCommittable} + * by finding the minimum and maximum timestamps from the provided {@link PinotSinkCommittable}s. + * + * @param committables Committables created by {@link org.apache.flink.streaming.connectors.pinot.writer.PinotSinkWriter} + * @return Global committer committable + */ + @Override + public PinotSinkGlobalCommittable combine(List<PinotSinkCommittable> committables) { + List<String> dataFilePaths = new ArrayList<>(); + long minTimestamp = Long.MAX_VALUE; + long maxTimestamp = Long.MIN_VALUE; + + // Extract all data file paths and the overall minimum and maximum timestamps + // from all committables + for (PinotSinkCommittable committable : committables) { + dataFilePaths.add(committable.getDataFilePath()); + minTimestamp = Long.min(minTimestamp, committable.getMinTimestamp()); + maxTimestamp = Long.max(maxTimestamp, committable.getMaxTimestamp()); + } + + LOG.info("Combined {} committables into one global committable", committables.size()); + return new PinotSinkGlobalCommittable(dataFilePaths, minTimestamp, maxTimestamp); + } + + /** + * Copies data files from shared filesystem to the local filesystem, generates segments with names + * according to the segment naming schema and finally pushes the segments to the Pinot cluster. + * Before pushing a segment it is checked whether there already exists a segment with that name + * in the Pinot cluster by calling the Pinot controller. In case there is one, it gets deleted. + * + * @param globalCommittables List of global committables + * @return Global committables whose commit failed + * @throws IOException + */ + @Override + public List<PinotSinkGlobalCommittable> commit(List<PinotSinkGlobalCommittable> globalCommittables) throws IOException { + // Retrieve the Pinot table schema and the Pinot table config from the Pinot controller + PinotControllerApi controllerApi = new PinotControllerApi(this.pinotControllerHost, this.pinotControllerPort); + Schema tableSchema = controllerApi.getSchema(this.tableName); + TableConfig tableConfig = controllerApi.getTableConfig(this.tableName); + + // List of failed global committables that can be retried later on + List<PinotSinkGlobalCommittable> failedCommits = new ArrayList<>(); + + for (PinotSinkGlobalCommittable globalCommittable : globalCommittables) { + // Make sure to remove all previously committed segments in globalCommittable + // when recovering from failure + CommitStatus commitStatus = this.getCommitStatus(globalCommittable); + for (String existingSegment : commitStatus.getExistingSegmentNames()) { + // Some but not all segments were already committed. As we cannot assure the data + // files containing the same data as originally when recovering from failure, + // we delete the already committed segments in order to recommit them later on. + controllerApi.deleteSegment(tableName, existingSegment); + } + + // We use a thread pool in order to parallelize the segment creation and segment upload + ExecutorService pool = Executors.newCachedThreadPool(); + Set<Future<Boolean>> resultFutures = new HashSet<>(); + + // Commit all segments in globalCommittable + int sequenceId = 0; + for (String dataFilePath : globalCommittable.getDataFilePaths()) { Review comment: good hint. Thanks 👍 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org