Hi, I am trying to implement Parquet Writer as SinkFunction. The pipeline consists of kafka as source and parquet file as a sink however it seems like the stream is repeating itself like endless loop and the parquet file is not written . can someone please help me with this?
object ParquetSinkWriter{ private val path = new Path("tmp/pfile") private val schemaString = Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString private val avroSchema: Schema = new Schema.Parser().parse(schemaString) private val compressionCodecName = CompressionCodecName.SNAPPY private val config = ParquetWriterConfig() val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builder[GenericRecord](path) .withSchema(avroSchema) .withCompressionCodec(compressionCodecName) .withPageSize(config.pageSize) .withRowGroupSize(config.blockSize) .withDictionaryEncoding(config.enableDictionary) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .withValidation(config.validating) .build() } class ParquetSinkWriter(path: Path, avroSchema: Schema) extends SinkFunction[GenericRecord] { import ParquetSinkWriter._ override def invoke(value: GenericRecord): Unit = { println(s"ADDING TO File : $value") // getting this output writer.write(value) //the output is not written to the file } } //main app object StreamingJob extends App { implicit val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(500) env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500) env.getCheckpointConfig.setCheckpointTimeout(600) env.getCheckpointConfig.setFailOnCheckpointingErrors(false) env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) env.setRestartStrategy(RestartStrategies.failureRateRestart(2, Time.seconds(3), Time.seconds(3))) val backend: StateBackend = new RocksDBStateBackend("file:///tmp/rocksdb", true) env.setStateBackend(backend) val writer = new ParquetSinkWriter(outputPath, schema) *val stream2: DataStream[DnsRequest] = env.addSource(//consume from kafka)* *stream2.map { r =>* * println(s"MAPPING $r") //this output keeps repeating in a loop* * val genericReocrd: GenericRecord = new GenericData.Record(schema)* * genericReocrd.put("qname", r.qname)* * genericReocrd.put("rcode", r.rcode)* * genericReocrd.put("ts", r.ts)* * genericReocrd* * }.addSink(writer) * Thanks for your help Avi