GitHub user Susmit07 edited a discussion: Reading a parquet file and writing to 
s3 using pekko connectors.

Hello everyone,

I’m working on a project using Pekko connectors to read Parquet files from 
HDFS, process them, and upload them to S3. I’ve implemented the following code, 
and I’d like to confirm whether this approach is sound, specifically around the 
use of ByteString(outputStream.toByteArray) for converting serialized Parquet 
data to be uploaded to S3.

``` Scala
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroReadSupport
import org.apache.avro.generic.GenericRecord
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.alpakka.s3.scaladsl.S3
import akka.util.ByteString
import akka.Done
import scala.concurrent.Future
import java.io.ByteArrayOutputStream
import java.nio.file.Paths

// Function to process Parquet file and upload to S3
private def processParquetFile(filePath: String, fileHandler: FileHandler, 
bucket: String): Future[Done] = {
  logger.info(s"Starting to process file: $filePath")

  // Read the Parquet file as a Source[GenericRecord, NotUsed]
  val parquetSource: Source[GenericRecord, NotUsed] = 
fileHandler.readParquetFileSource(filePath)

  // AvroParquetWriter setup for GenericRecord
  val conf = new Configuration()
  conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true)

  // Define flow to write GenericRecord as ByteString and upload to S3
  val s3UploadFlow: Flow[GenericRecord, MultipartUploadResult, NotUsed] =
    Flow[GenericRecord]
      .map { record =>
        // Serialize GenericRecord to Parquet format and convert to ByteString
        val outputStream = new ByteArrayOutputStream()
        val writer = AvroParquetWriter.builder[GenericRecord](new 
Path(filePath))
          .withConf(conf)
          .withCompressionCodec(CompressionCodecName.SNAPPY)
          .withDataModel(GenericData.get()) // Ensure the data model for 
GenericRecord is set
          .build()

        // Write record to Parquet format
        writer.write(record)
        writer.close()

        // Convert to ByteString for S3 upload
        ByteString(outputStream.toByteArray)
      }
      .via(S3.multipartUpload(bucket, 
s"s3_key_prefix/${Paths.get(filePath).getFileName}"))

  // Setup S3 sink
  val s3Sink: Sink[MultipartUploadResult, Future[Done]] = Sink.foreach { result 
=>
    logger.info(s"Uploaded part to S3 successfully. Result: $result")
  }

  // Run the stream: Read Parquet -> Serialize -> Upload to S3
  parquetSource
    .via(s3UploadFlow)
    .runWith(s3Sink)
    .recover {
      case ex: Exception =>
        logger.error(s"Error processing file $filePath: ${ex.getMessage}", ex)
        Done
    }
}
```

My Concern:
I am using ByteString(outputStream.toByteArray) to convert the serialized 
Parquet data into a format that can be streamed to S3. I’m concerned that this 
approach could lead to OutOfMemory (OOM) issues, especially when processing 
large files, as ByteArrayOutputStream keeps everything in memory.

Is this approach safe for production when dealing with large files?
Should I consider a more memory-efficient way to handle the conversion?
Additionally, I’m using S3.multipartUpload since it’s typically recommended for 
files larger than 500 MB. However, I’m curious:

Can I use S3.putObject instead for smaller files?
Is multipartUpload still the preferred approach for all file sizes, given it 
handles uploads in parts, or should I switch to putObject for smaller files?
Looking forward to your thoughts and suggestions!

Thank you.




GitHub link: https://github.com/apache/pekko-connectors/discussions/857

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: 
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to