mschroederi commented on a change in pull request #113:
URL: https://github.com/apache/bahir-flink/pull/113#discussion_r592800143



##########
File path: 
flink-connector-pinot/src/main/java/org/apache/flink/streaming/connectors/pinot/writer/PinotWriterSegment.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.pinot.writer;
+
+import 
org.apache.flink.streaming.connectors.pinot.committer.PinotSinkCommittable;
+import org.apache.flink.streaming.connectors.pinot.external.JsonSerializer;
+import 
org.apache.flink.streaming.connectors.pinot.filesystem.FileSystemAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link PinotWriterSegment} represents exactly one segment that can be 
found in the Pinot
+ * cluster once the commit has been completed.
+ *
+ * @param <IN> Type of incoming elements
+ */
+public class PinotWriterSegment<IN> implements Serializable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger("PinotWriterSegment");
+
+    private final int maxRowsPerSegment;
+    private final String tempDirPrefix;
+    private final JsonSerializer<IN> jsonSerializer;
+    private final FileSystemAdapter fsAdapter;
+
+    private boolean acceptsElements = true;
+
+    private final List<IN> elements;
+    private File dataFile;
+    private long minTimestamp = Long.MAX_VALUE;
+    private long maxTimestamp = Long.MIN_VALUE;
+
+    /**
+     * @param maxRowsPerSegment Maximum number of rows to be stored within a 
Pinot segment
+     * @param tempDirPrefix     Prefix for temp directories used
+     * @param jsonSerializer    Serializer used to convert elements to JSON
+     * @param fsAdapter         Filesystem adapter used to save files for 
sharing files across nodes
+     */
+    protected PinotWriterSegment(int maxRowsPerSegment, String tempDirPrefix, 
JsonSerializer<IN> jsonSerializer, FileSystemAdapter fsAdapter) {
+        checkArgument(maxRowsPerSegment > 0L);
+        this.maxRowsPerSegment = maxRowsPerSegment;
+        this.tempDirPrefix = checkNotNull(tempDirPrefix);
+        this.jsonSerializer = checkNotNull(jsonSerializer);
+        this.fsAdapter = checkNotNull(fsAdapter);
+        this.elements = new ArrayList<>();
+    }
+
+    /**
+     * Takes elements and stores them in memory until either {@link 
#maxRowsPerSegment} is reached
+     * or {@link #prepareCommit} is called.
+     *
+     * @param element   Object from upstream task
+     * @param timestamp Timestamp assigned to element
+     * @throws IOException
+     */
+    public void write(IN element, long timestamp) throws IOException {
+        if (!this.acceptsElements()) {
+            throw new IllegalStateException("This PinotSegmentWriter does not 
accept any elements anymore.");
+        }
+        this.elements.add(element);
+        this.minTimestamp = Long.min(this.minTimestamp, timestamp);
+        this.maxTimestamp = Long.max(this.maxTimestamp, timestamp);
+
+        // Writes elements to local filesystem once the maximum number of 
items is reached
+        if (this.elements.size() == this.maxRowsPerSegment) {

Review comment:
       The current version of the `FileSystemAdapter` only supports copying 
local files to the shared FS and vice versa which should allow maximum 
flexibility when it comes to extensibility.
   I nonetheless see your point that we can avoid unnecessary I/O here. 
Therefore, I'd suggest to change the `FileSystemAdapter`'s API to accept a list 
of serialized elements to write to the shared filesystem. Those adapters that 
need a local file to upload could first write to local disk before uploading 
and all others could directly write to the respective filesystem.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to