tpalfy commented on code in PR #7893:
URL: https://github.com/apache/nifi/pull/7893#discussion_r1392528848


##########
nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java:
##########
@@ -63,9 +79,35 @@ public class FetchParquet extends AbstractFetchHDFSRecord {
 
     @Override
     public HDFSRecordReader createHDFSRecordReader(final ProcessContext 
context, final FlowFile flowFile, final Configuration conf, final Path path) 
throws IOException {
+        final Long offset = 
Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.RECORD_OFFSET))
+                .map(Long::parseLong)
+                .orElse(null);
+
+        final Long count = 
Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.RECORD_COUNT))
+                .map(Long::parseLong)
+                .orElse(null);
+
+        final long fileStartOffset = 
Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.FILE_RANGE_START_OFFSET))
+                .map(Long::parseLong)
+                .orElse(0L);
+        final long fileEndOffset = 
Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.FILE_RANGE_END_OFFSET))
+                .map(Long::parseLong)
+                .orElse(Long.MAX_VALUE);
+
         final InputFile inputFile = HadoopInputFile.fromPath(path, conf);
-        final ParquetReader.Builder<GenericRecord> readerBuilder = 
AvroParquetReader.<GenericRecord>builder(inputFile).withConf(conf);
-        return new AvroParquetHDFSRecordReader(readerBuilder.build());
+        final ParquetReader.Builder<GenericRecord> readerBuilder = 
AvroParquetReader.<GenericRecord>builder(inputFile)
+                .withConf(conf)
+                .withFileRange(fileStartOffset, fileEndOffset);
+
+        if (offset != null) {
+            
readerBuilder.withFilter(FilterCompat.get(OffsetRecordFilter.offset(offset)));
+        }
+
+        if (count == null) {
+            return new AvroParquetHDFSRecordReader(readerBuilder.build());
+        } else {
+            return new AvroParquetHDFSRecordReader(readerBuilder.build(), 
count);
+        }

Review Comment:
   ```suggestion
           return new AvroParquetHDFSRecordReader(readerBuilder.build(), count);
   ```



##########
nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/hadoop/AvroParquetHDFSRecordReader.java:
##########
@@ -35,11 +35,18 @@ public class AvroParquetHDFSRecordReader implements 
HDFSRecordReader {
     private GenericRecord lastRecord;
     private RecordSchema recordSchema;
     private boolean initialized = false;
+    private final Long count;
+    private long recordsRead = 0;
 
     private final ParquetReader<GenericRecord> parquetReader;
 
     public AvroParquetHDFSRecordReader(final ParquetReader<GenericRecord> 
parquetReader) {
+        this(parquetReader, null);
+    }
+
+    public AvroParquetHDFSRecordReader(final ParquetReader<GenericRecord> 
parquetReader, Long count) {

Review Comment:
   ```suggestion
       public AvroParquetHDFSRecordReader(final ParquetReader<GenericRecord> 
parquetReader, final Long count) {
   ```



##########
nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/parquet/ParquetTestUtils.java:
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.parquet;
+
+import static java.util.stream.Collectors.toList;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Map;
+import java.util.stream.IntStream;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+
+public class ParquetTestUtils {
+
+    private static final String SCHEMA_PATH = 
"src/test/resources/avro/user.avsc";
+
+    public static File createUsersParquetFile(int numUsers) throws IOException 
{
+        return createUsersParquetFile(IntStream
+                .range(0, numUsers)
+                .mapToObj(ParquetTestUtils::createUser)
+                .collect(toList())
+        );
+    }
+
+    public static Map<String, Object> createUser(int i) {
+        return Map.of(
+                "name", "Bob" + i,
+                "favorite_number", i,
+                "favorite_color", "blue" + i
+        );
+    }
+
+    private static File createUsersParquetFile(Collection<Map<String, Object>> 
users) throws IOException {
+        final Schema schema = getSchema();
+        final File parquetFile = new 
File("target/TestParquetReader-testReadUsers-" + System.currentTimeMillis());
+
+        // write some users to the parquet file...
+        try (final ParquetWriter<GenericRecord> writer = 
createParquetWriter(schema, parquetFile)) {
+            users.forEach(user -> {
+                final GenericRecord record = new GenericData.Record(schema);
+                user.forEach(record::put);
+                try {
+                    writer.write(record);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        }
+        return parquetFile;
+    }
+
+    private static Schema getSchema() throws IOException {
+        final File schemaFile = new File(SCHEMA_PATH);
+        final String schemaString = IOUtils.toString(new 
FileInputStream(schemaFile), StandardCharsets.UTF_8);
+        return new Schema.Parser().parse(schemaString);

Review Comment:
   ```suggestion
           try (InputStream schemaInputStream = 
ParquetTestUtils.class.getClassLoader().getResourceAsStream("avro/user.avsc")) {
               final String schemaString = IOUtils.toString(schemaInputStream, 
StandardCharsets.UTF_8);
               return new Schema.Parser().parse(schemaString);
           }
   ```



##########
nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/record/ParquetRecordReader.java:
##########
@@ -16,45 +16,79 @@
  */
 package org.apache.nifi.parquet.record;
 
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.Optional;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.parquet.filter.OffsetRecordFilter;
 import org.apache.nifi.parquet.stream.NifiParquetInputFile;
+import org.apache.nifi.parquet.utils.ParquetAttribute;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetReader.Builder;
 import org.apache.parquet.io.InputFile;
 
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.Map;
-
 public class ParquetRecordReader implements RecordReader {
 
     private GenericRecord lastParquetRecord;
-    private RecordSchema recordSchema;
+    private final RecordSchema recordSchema;
 
     private final InputStream inputStream;
-    private final InputFile inputFile;
     private final ParquetReader<GenericRecord> parquetReader;
-
-    public ParquetRecordReader(final InputStream inputStream, final long 
inputLength, final Configuration configuration) throws IOException {
+    private final Long count;

Review Comment:
   ```suggestion
       private final Long recordsToRead;
   ```



##########
nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/parquet/hadoop/AvroParquetHDFSRecordReader.java:
##########
@@ -35,11 +35,18 @@ public class AvroParquetHDFSRecordReader implements 
HDFSRecordReader {
     private GenericRecord lastRecord;
     private RecordSchema recordSchema;
     private boolean initialized = false;
+    private final Long count;

Review Comment:
   ```suggestion
       private final Long recordsToRead;
   ```



##########
nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/CalculateParquetOffsets.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.parquet;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.documentation.UseCase;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.parquet.stream.NifiParquetInputFile;
+import org.apache.nifi.parquet.utils.ParquetAttribute;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+
+@Tags({"parquet", "split", "partition", "break apart", "efficient processing", 
"load balance", "cluster"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "The processor generates N flow files from the input, and adds 
attributes with the offsets required to read "
+                + "the group of rows in the FlowFile's content. Can be used to 
increase the overall efficiency of "
+                + "processing extremely large Parquet files."
+)
+@UseCase(
+        description = "Multithreaded processing of extremely large Parquet 
files",
+        keywords = {"multi-threaded", "multi-core", "parallel"},
+        configuration = """
+                Instead of having something like
+
+                X -> ConvertRecord (Parquet / JSON) -> ...
+
+                Have something like
+
+                X -> CalculateParquetOffsets -> ConvertRecord (Parquet / JSON) 
-> ...
+
+                This increases the overall efficiency of this operation for 
extremely large
+                Parquet files (hundreds of GBs). With the second approach, you 
can leverage
+                multi-threading, for processing a single file.
+                """
+)
+@UseCase(
+        description = "Distributed processing of extremely large Parquet files 
across the NiFi nodes",
+        keywords = {"distribute", "nodes", "cluster", "transfer", "parallel"},
+        configuration = """
+                Instead of having something like
+
+                X -> ConvertRecord (Parquet / JSON) -> ...
+
+                Have something like
+
+                X -> CalculateParquetOffsets -> FetchParquet (JSON Writer) -> 
...
+
+                And set "Zero Content Output" to "true".
+
+                This way, a load balanced connection could be used between 
CalculateParquetOffsets and FetchParquet,
+                in order to distribute the work across the nodes, without 
transferring a lot of data across
+                the nodes of the cluster.
+                """
+)
+@WritesAttributes({
+        @WritesAttribute(
+                attribute = ParquetAttribute.RECORD_OFFSET,
+                description = "Sets the index of first record of the parquet 
file."
+        ),
+        @WritesAttribute(
+                attribute = ParquetAttribute.RECORD_COUNT,
+                description = "Sets the number of records in the parquet file."
+        )
+})
+@ReadsAttributes({
+        @ReadsAttribute(
+                attribute = ParquetAttribute.RECORD_OFFSET,
+                description = "Gets the index of first record in the input."
+        ),
+        @ReadsAttribute(
+                attribute = ParquetAttribute.RECORD_COUNT,
+                description = "Gets the number of records in the input."
+        ),
+        @ReadsAttribute(
+                attribute = ParquetAttribute.FILE_RANGE_START_OFFSET,
+                description = "Gets the start offset of the selected row group 
in the parquet file."
+        ),
+        @ReadsAttribute(
+                attribute = ParquetAttribute.FILE_RANGE_END_OFFSET,
+                description = "Gets the end offset of the selected row group 
in the parquet file."
+        )
+})
+@SideEffectFree
+public class CalculateParquetOffsets extends AbstractProcessor {
+
+    static final PropertyDescriptor PROP_RECORDS_PER_SPLIT = new 
PropertyDescriptor.Builder()
+            .name("Records Per Split")
+            .description("Specifies how many records should be covered in each 
FlowFile")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor PROP_ZERO_CONTENT_OUTPUT = new 
PropertyDescriptor.Builder()
+            .name("Zero Content Output")
+            .description("Whether to do, or do not copy the content of input 
FlowFile.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles, with special attributes that represent a 
chunk of the input file.")
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
+            PROP_RECORDS_PER_SPLIT,
+            PROP_ZERO_CONTENT_OUTPUT
+    );
+
+    static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {

Review Comment:
   ```suggestion
           if (inputFlowFile == null) {
   ```
   "original" makes sense when used as a relationship because that is a very 
specific context. In the code "original" can mean too many things. Also 
"input..." describes more accurately what it is. (In contrast, in case of a 
relationship calling it "input" would be very confusing.)



##########
nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/CalculateParquetOffsets.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.parquet;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.documentation.UseCase;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.parquet.stream.NifiParquetInputFile;
+import org.apache.nifi.parquet.utils.ParquetAttribute;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+
+@Tags({"parquet", "split", "partition", "break apart", "efficient processing", 
"load balance", "cluster"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "The processor generates N flow files from the input, and adds 
attributes with the offsets required to read "
+                + "the group of rows in the FlowFile's content. Can be used to 
increase the overall efficiency of "
+                + "processing extremely large Parquet files."
+)
+@UseCase(
+        description = "Multithreaded processing of extremely large Parquet 
files",
+        keywords = {"multi-threaded", "multi-core", "parallel"},
+        configuration = """
+                Instead of having something like
+
+                X -> ConvertRecord (Parquet / JSON) -> ...
+
+                Have something like
+
+                X -> CalculateParquetOffsets -> ConvertRecord (Parquet / JSON) 
-> ...
+
+                This increases the overall efficiency of this operation for 
extremely large
+                Parquet files (hundreds of GBs). With the second approach, you 
can leverage
+                multi-threading, for processing a single file.
+                """
+)
+@UseCase(
+        description = "Distributed processing of extremely large Parquet files 
across the NiFi nodes",
+        keywords = {"distribute", "nodes", "cluster", "transfer", "parallel"},
+        configuration = """
+                Instead of having something like
+
+                X -> ConvertRecord (Parquet / JSON) -> ...
+
+                Have something like
+
+                X -> CalculateParquetOffsets -> FetchParquet (JSON Writer) -> 
...
+
+                And set "Zero Content Output" to "true".
+
+                This way, a load balanced connection could be used between 
CalculateParquetOffsets and FetchParquet,
+                in order to distribute the work across the nodes, without 
transferring a lot of data across
+                the nodes of the cluster.
+                """
+)
+@WritesAttributes({
+        @WritesAttribute(
+                attribute = ParquetAttribute.RECORD_OFFSET,
+                description = "Sets the index of first record of the parquet 
file."
+        ),
+        @WritesAttribute(
+                attribute = ParquetAttribute.RECORD_COUNT,
+                description = "Sets the number of records in the parquet file."
+        )
+})
+@ReadsAttributes({
+        @ReadsAttribute(
+                attribute = ParquetAttribute.RECORD_OFFSET,
+                description = "Gets the index of first record in the input."
+        ),
+        @ReadsAttribute(
+                attribute = ParquetAttribute.RECORD_COUNT,
+                description = "Gets the number of records in the input."
+        ),
+        @ReadsAttribute(
+                attribute = ParquetAttribute.FILE_RANGE_START_OFFSET,
+                description = "Gets the start offset of the selected row group 
in the parquet file."
+        ),
+        @ReadsAttribute(
+                attribute = ParquetAttribute.FILE_RANGE_END_OFFSET,
+                description = "Gets the end offset of the selected row group 
in the parquet file."
+        )
+})
+@SideEffectFree
+public class CalculateParquetOffsets extends AbstractProcessor {
+
+    static final PropertyDescriptor PROP_RECORDS_PER_SPLIT = new 
PropertyDescriptor.Builder()
+            .name("Records Per Split")
+            .description("Specifies how many records should be covered in each 
FlowFile")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor PROP_ZERO_CONTENT_OUTPUT = new 
PropertyDescriptor.Builder()
+            .name("Zero Content Output")
+            .description("Whether to do, or do not copy the content of input 
FlowFile.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles, with special attributes that represent a 
chunk of the input file.")
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
+            PROP_RECORDS_PER_SPLIT,
+            PROP_ZERO_CONTENT_OUTPUT
+    );
+
+    static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        final long partitionSize = 
context.getProperty(PROP_RECORDS_PER_SPLIT).asLong();
+        final boolean zeroContentOutput = 
context.getProperty(PROP_ZERO_CONTENT_OUTPUT).asBoolean();
+
+        final long recordOffset = 
Optional.ofNullable(original.getAttribute(ParquetAttribute.RECORD_OFFSET))
+                .map(Long::valueOf)
+                .orElse(0L);
+
+        final long recordCount = 
Optional.ofNullable(original.getAttribute(ParquetAttribute.RECORD_COUNT))
+                .map(Long::valueOf)
+                .orElseGet(() -> getRecordCount(session, original) - 
recordOffset);
+
+        List<FlowFile> partitions = getPartitions(
+                session, original, partitionSize, recordCount, recordOffset, 
zeroContentOutput);
+        session.transfer(partitions, REL_SUCCESS);
+        session.adjustCounter("Records Split", recordCount, false);
+        session.adjustCounter("Partitions Created", partitions.size(), false);
+
+        if (zeroContentOutput) {
+            session.remove(original);
+        }
+    }
+
+    private long getRecordCount(ProcessSession session, FlowFile flowFile) {
+        final long fileStartOffset = 
Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.FILE_RANGE_START_OFFSET))
+                .map(Long::parseLong)
+                .orElse(0L);
+        final long fileEndOffset = 
Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.FILE_RANGE_END_OFFSET))
+                .map(Long::parseLong)
+                .orElse(Long.MAX_VALUE);
+
+        final ParquetReadOptions readOptions = ParquetReadOptions.builder()
+                .withRange(fileStartOffset, fileEndOffset)
+                .build();
+        try (
+                InputStream in = session.read(flowFile);
+                ParquetFileReader reader = new ParquetFileReader(
+                        new NifiParquetInputFile(in, flowFile.getSize()),
+                        readOptions
+                )
+        ) {
+            return reader.getRecordCount();
+        } catch (IOException e) {
+            throw new ProcessException(e);
+        }
+    }
+
+    private List<FlowFile> getPartitions(
+            ProcessSession session,
+            FlowFile flowFile,
+            long partitionSize,
+            long recordCount,
+            long recordOffset,
+            boolean zeroContentOutput
+    ) {
+        final long numberOfPartitions = (recordCount / partitionSize) + 
((recordCount % partitionSize) > 0 ? 1 : 0);

Review Comment:
   ```suggestion
           final long numberOfPartitions = Math.ceilDiv(recordCount, 
partitionSize);
   ```



##########
nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/CalculateParquetOffsets.java:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.parquet;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.documentation.UseCase;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.parquet.stream.NifiParquetInputFile;
+import org.apache.nifi.parquet.utils.ParquetAttribute;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.hadoop.ParquetFileReader;
+
+@Tags({"parquet", "split", "partition", "break apart", "efficient processing", 
"load balance", "cluster"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription(
+        "The processor generates N flow files from the input, and adds 
attributes with the offsets required to read "
+                + "the group of rows in the FlowFile's content. Can be used to 
increase the overall efficiency of "
+                + "processing extremely large Parquet files."
+)
+@UseCase(
+        description = "Multithreaded processing of extremely large Parquet 
files",
+        keywords = {"multi-threaded", "multi-core", "parallel"},
+        configuration = """
+                Instead of having something like
+
+                X -> ConvertRecord (Parquet / JSON) -> ...
+
+                Have something like
+
+                X -> CalculateParquetOffsets -> ConvertRecord (Parquet / JSON) 
-> ...
+
+                This increases the overall efficiency of this operation for 
extremely large
+                Parquet files (hundreds of GBs). With the second approach, you 
can leverage
+                multi-threading, for processing a single file.
+                """
+)
+@UseCase(
+        description = "Distributed processing of extremely large Parquet files 
across the NiFi nodes",
+        keywords = {"distribute", "nodes", "cluster", "transfer", "parallel"},
+        configuration = """
+                Instead of having something like
+
+                X -> ConvertRecord (Parquet / JSON) -> ...
+
+                Have something like
+
+                X -> CalculateParquetOffsets -> FetchParquet (JSON Writer) -> 
...
+
+                And set "Zero Content Output" to "true".
+
+                This way, a load balanced connection could be used between 
CalculateParquetOffsets and FetchParquet,
+                in order to distribute the work across the nodes, without 
transferring a lot of data across
+                the nodes of the cluster.
+                """
+)
+@WritesAttributes({
+        @WritesAttribute(
+                attribute = ParquetAttribute.RECORD_OFFSET,
+                description = "Sets the index of first record of the parquet 
file."
+        ),
+        @WritesAttribute(
+                attribute = ParquetAttribute.RECORD_COUNT,
+                description = "Sets the number of records in the parquet file."
+        )
+})
+@ReadsAttributes({
+        @ReadsAttribute(
+                attribute = ParquetAttribute.RECORD_OFFSET,
+                description = "Gets the index of first record in the input."
+        ),
+        @ReadsAttribute(
+                attribute = ParquetAttribute.RECORD_COUNT,
+                description = "Gets the number of records in the input."
+        ),
+        @ReadsAttribute(
+                attribute = ParquetAttribute.FILE_RANGE_START_OFFSET,
+                description = "Gets the start offset of the selected row group 
in the parquet file."
+        ),
+        @ReadsAttribute(
+                attribute = ParquetAttribute.FILE_RANGE_END_OFFSET,
+                description = "Gets the end offset of the selected row group 
in the parquet file."
+        )
+})
+@SideEffectFree
+public class CalculateParquetOffsets extends AbstractProcessor {
+
+    static final PropertyDescriptor PROP_RECORDS_PER_SPLIT = new 
PropertyDescriptor.Builder()
+            .name("Records Per Split")
+            .description("Specifies how many records should be covered in each 
FlowFile")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor PROP_ZERO_CONTENT_OUTPUT = new 
PropertyDescriptor.Builder()
+            .name("Zero Content Output")
+            .description("Whether to do, or do not copy the content of input 
FlowFile.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("FlowFiles, with special attributes that represent a 
chunk of the input file.")
+            .build();
+
+    static final List<PropertyDescriptor> PROPERTY_DESCRIPTORS = List.of(
+            PROP_RECORDS_PER_SPLIT,
+            PROP_ZERO_CONTENT_OUTPUT
+    );
+
+    static final Set<Relationship> RELATIONSHIPS = Set.of(REL_SUCCESS);
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTY_DESCRIPTORS;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        final FlowFile original = session.get();
+        if (original == null) {
+            return;
+        }
+
+        final long partitionSize = 
context.getProperty(PROP_RECORDS_PER_SPLIT).asLong();
+        final boolean zeroContentOutput = 
context.getProperty(PROP_ZERO_CONTENT_OUTPUT).asBoolean();
+
+        final long recordOffset = 
Optional.ofNullable(original.getAttribute(ParquetAttribute.RECORD_OFFSET))
+                .map(Long::valueOf)
+                .orElse(0L);
+
+        final long recordCount = 
Optional.ofNullable(original.getAttribute(ParquetAttribute.RECORD_COUNT))
+                .map(Long::valueOf)
+                .orElseGet(() -> getRecordCount(session, original) - 
recordOffset);
+
+        List<FlowFile> partitions = getPartitions(
+                session, original, partitionSize, recordCount, recordOffset, 
zeroContentOutput);
+        session.transfer(partitions, REL_SUCCESS);
+        session.adjustCounter("Records Split", recordCount, false);
+        session.adjustCounter("Partitions Created", partitions.size(), false);
+
+        if (zeroContentOutput) {
+            session.remove(original);
+        }
+    }
+
+    private long getRecordCount(ProcessSession session, FlowFile flowFile) {
+        final long fileStartOffset = 
Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.FILE_RANGE_START_OFFSET))
+                .map(Long::parseLong)
+                .orElse(0L);
+        final long fileEndOffset = 
Optional.ofNullable(flowFile.getAttribute(ParquetAttribute.FILE_RANGE_END_OFFSET))
+                .map(Long::parseLong)
+                .orElse(Long.MAX_VALUE);
+
+        final ParquetReadOptions readOptions = ParquetReadOptions.builder()
+                .withRange(fileStartOffset, fileEndOffset)
+                .build();
+        try (
+                InputStream in = session.read(flowFile);
+                ParquetFileReader reader = new ParquetFileReader(
+                        new NifiParquetInputFile(in, flowFile.getSize()),
+                        readOptions
+                )
+        ) {
+            return reader.getRecordCount();
+        } catch (IOException e) {
+            throw new ProcessException(e);
+        }
+    }
+
+    private List<FlowFile> getPartitions(
+            ProcessSession session,
+            FlowFile flowFile,

Review Comment:
   ```suggestion
               FlowFile inputFlowFile,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to