mschroederi commented on a change in pull request #113: URL: https://github.com/apache/bahir-flink/pull/113#discussion_r592800143
########## File path: flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotWriterSegment.java ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.pinot.writer; + +import org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable; +import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer; +import org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link PinotWriterSegment} represents exactly one segment that can be found in the Pinot + * cluster once the commit has been completed. + * + * @param <IN> Type of incoming elements + */ +public class PinotWriterSegment<IN> implements Serializable { + + private static final Logger LOG = LoggerFactory.getLogger("PinotWriterSegment"); + + private final int maxRowsPerSegment; + private final String tempDirPrefix; + private final JsonSerializer<IN> jsonSerializer; + private final FileSystemAdapter fsAdapter; + + private boolean acceptsElements = true; + + private final List<IN> elements; + private File dataFile; + private long minTimestamp = Long.MAX_VALUE; + private long maxTimestamp = Long.MIN_VALUE; + + /** + * @param maxRowsPerSegment Maximum number of rows to be stored within a Pinot segment + * @param tempDirPrefix Prefix for temp directories used + * @param jsonSerializer Serializer used to convert elements to JSON + * @param fsAdapter Filesystem adapter used to save files for sharing files across nodes + */ + protected PinotWriterSegment(int maxRowsPerSegment, String tempDirPrefix, JsonSerializer<IN> jsonSerializer, FileSystemAdapter fsAdapter) { + checkArgument(maxRowsPerSegment > 0L); + this.maxRowsPerSegment = maxRowsPerSegment; + this.tempDirPrefix = checkNotNull(tempDirPrefix); + this.jsonSerializer = checkNotNull(jsonSerializer); + this.fsAdapter = checkNotNull(fsAdapter); + this.elements = new ArrayList<>(); + } + + /** + * Takes elements and stores them in memory until either {@link #maxRowsPerSegment} is reached + * or {@link #prepareCommit} is called. + * + * @param element Object from upstream task + * @param timestamp Timestamp assigned to element + * @throws IOException + */ + public void write(IN element, long timestamp) throws IOException { + if (!this.acceptsElements()) { + throw new IllegalStateException("This PinotSegmentWriter does not accept any elements anymore."); + } + this.elements.add(element); + this.minTimestamp = Long.min(this.minTimestamp, timestamp); + this.maxTimestamp = Long.max(this.maxTimestamp, timestamp); + + // Writes elements to local filesystem once the maximum number of items is reached + if (this.elements.size() == this.maxRowsPerSegment) { Review comment: The current version of the `FileSystemAdapter` only supports copying local files to the shared FS and vice versa which should allow maximum flexibility when it comes to extensibility. I nonetheless see your point that we can avoid unnecessary I/O here. Therefore, I'd suggest to change the `FileSystemAdapter`'s API to accept a list of serialized elements to write to the shared filesystem. Those adapters that need a local file to upload could first write to local disk before uploading and all others could directly write to the respective filesystem. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org