Hi,

I am trying to implement Parquet Writer as SinkFunction. The pipeline
consists of kafka as source and parquet file as a sink however it seems
like the stream is repeating itself like endless loop and the parquet file
is not written . can someone please help me with this?

object ParquetSinkWriter{
  private val path = new Path("tmp/pfile")
  private val schemaString =
Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
  private val avroSchema: Schema = new Schema.Parser().parse(schemaString)
  private val compressionCodecName = CompressionCodecName.SNAPPY
  private   val config = ParquetWriterConfig()
  val writer: ParquetWriter[GenericRecord] =
AvroParquetWriter.builder[GenericRecord](path)
    .withSchema(avroSchema)
    .withCompressionCodec(compressionCodecName)
    .withPageSize(config.pageSize)
    .withRowGroupSize(config.blockSize)
    .withDictionaryEncoding(config.enableDictionary)
    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
    .withValidation(config.validating)
    .build()
}

class ParquetSinkWriter(path: Path, avroSchema: Schema) extends
SinkFunction[GenericRecord] {
  import ParquetSinkWriter._
  override def invoke(value: GenericRecord): Unit = {
    println(s"ADDING TO File : $value") // getting this output
    writer.write(value) //the output is not written to the file
  }
}

//main app
object StreamingJob extends App  {
 implicit val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
  env.enableCheckpointing(500)

env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
  env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
  env.getCheckpointConfig.setCheckpointTimeout(600)
  env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
  env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
  env.setRestartStrategy(RestartStrategies.failureRateRestart(2,
Time.seconds(3), Time.seconds(3)))
  val backend: StateBackend = new
RocksDBStateBackend("file:///tmp/rocksdb", true)
  env.setStateBackend(backend)
  val writer = new ParquetSinkWriter(outputPath, schema)
  *val stream2: DataStream[DnsRequest] = env.addSource(//consume from
kafka)*
*stream2.map { r =>*
*    println(s"MAPPING $r") //this output keeps repeating in a loop*
*    val genericReocrd: GenericRecord = new GenericData.Record(schema)*
*    genericReocrd.put("qname", r.qname)*
*    genericReocrd.put("rcode", r.rcode)*
*    genericReocrd.put("ts", r.ts)*
*    genericReocrd*
*  }.addSink(writer) *

Thanks for your help
Avi

Reply via email to