Thanks Kostas,
Ok got it, so bucketingSink might not be a good choice here. can you please
advice what will be the best approach ? I have heavy load of data that I
consume from kafka that I want to process and put them in a file (doesn't
have to be parquet) . I thought that StreamingFileSink might be a good
choice but I guess I am doing something wrong there . if there is a good
example for that - it will be great .

BR
Avi

On Mon, Dec 3, 2018 at 4:11 PM Kostas Kloudas <k.klou...@data-artisans.com>
wrote:

> Hi Avi,
>
> For Bulk Formats like Parquet, unfortunately, we do not support setting
> the batch size.
> The part-files roll on every checkpoint. This is a known limitation and
> there are plans to
> alleviate it in the future.
>
> Setting the batch size (among other things) is supported for RowWise
> formats.
>
> Cheers,
> Kostas
>
> On Sun, Dec 2, 2018 at 9:29 PM Avi Levi <avi.l...@bluevoyant.com> wrote:
>
>> Thanks Kostas. I will definitely look into that. but is the
>> StreamingFileSink also support setting the batch size by size and/or by
>> time interval like bucketing sink ?
>>
>> On Sun, Dec 2, 2018 at 5:09 PM Kostas Kloudas <
>> k.klou...@data-artisans.com> wrote:
>>
>>> Hi Avi,
>>>
>>> The ParquetAvroWriters cannot be used with the BucketingSink.
>>>
>>> In fact the StreamingFIleSink is the "evolution" of the BucketingSink
>>> and it supports
>>> all the functionality that the BucketingSink supports.
>>>
>>> Given this, why not using the StreamingFileSink?
>>>
>>> On Sat, Dec 1, 2018 at 7:56 AM Avi Levi <avi.l...@bluevoyant.com> wrote:
>>>
>>>> Thanks looks good.
>>>> Do you know a way to use PaquetWriter or ParquetAvroWriters with a 
>>>> BucketingSink
>>>> file
>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/filesystem_sink.html#bucketing-file-sink>
>>>> ? something like :
>>>>
>>>> val bucketingSink = new BucketingSink[String]("/base/path")
>>>> bucketingSink.setBucketer(new DateTimeBucketer[String]("yyyy-MM-dd--HHmm"))
>>>> bucketingSink.setWriter(ParquetAvroWriters.forGenericRecord(schema))
>>>> bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB,
>>>> bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins
>>>>
>>>>
>>>> On Fri, Nov 30, 2018 at 3:59 PM Kostas Kloudas <
>>>> k.klou...@data-artisans.com> wrote:
>>>>
>>>>> And for a Java example which is actually similar to your pipeline,
>>>>> you can check the ParquetStreamingFileSinkITCase.
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Nov 30, 2018 at 2:39 PM Kostas Kloudas <
>>>>> k.klou...@data-artisans.com> wrote:
>>>>>
>>>>>> Hi Avi,
>>>>>>
>>>>>> At a first glance I am not seeing anything wrong with your code.
>>>>>> Did you verify that there are elements flowing in your pipeline and
>>>>>> that checkpoints are actually completed?
>>>>>> And also can you check the logs at Job and Task Manager for anything
>>>>>> suspicious?
>>>>>>
>>>>>> Unfortunately, we do not allow specifying encoding and other
>>>>>> parameters to your writer, which is an omission
>>>>>> on our part and this should be fixed. Could you open a JIRA for that?
>>>>>>
>>>>>> If you want to know more about Flink's Parquet-Avro writer, feel free
>>>>>> to have a look at the ParquetAvroWriters
>>>>>> class.
>>>>>>
>>>>>> Cheers,
>>>>>> Kostas
>>>>>>
>>>>>>
>>>>>> On Thu, Nov 29, 2018 at 6:58 PM Avi Levi <avi.l...@bluevoyant.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks a lot Kostas, but the file not created . what am I doing
>>>>>>> wrong?
>>>>>>> BTW how can you set the encoding etc' in Flink's Avro - Parquet
>>>>>>> writer?
>>>>>>>
>>>>>>> object Tester extends App {
>>>>>>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>   def now = System.currentTimeMillis()
>>>>>>>   val path = new Path(s"test-$now.parquet")
>>>>>>>   val schema: Schema = new Schema.Parser().parse(schemaString)
>>>>>>>   val streamingSink = StreamingFileSink.forBulkFormat( path,
>>>>>>>   ParquetAvroWriters.forGenericRecord(schema))
>>>>>>>   .build()
>>>>>>>   env.enableCheckpointing(100)
>>>>>>>   val stream = env.addSource(FirstSeenQueueImpl.consumer).map{ r =>
>>>>>>>     val genericReocrd: GenericRecord = new GenericData.Record(schema)
>>>>>>>     genericReocrd.put("name", r.name)
>>>>>>>     genericReocrd.put("code", r.code.asString)
>>>>>>>     genericReocrd.put("ts", r.ts)
>>>>>>>     genericReocrd
>>>>>>>   }
>>>>>>>     stream.addSink { r =>
>>>>>>>         println(s"In Sink $r") //getting this line
>>>>>>>         streamingSink
>>>>>>>     }
>>>>>>>   env.execute()
>>>>>>> }
>>>>>>>
>>>>>>> Cheers
>>>>>>> Avi
>>>>>>>
>>>>>>> On Thu, Nov 29, 2018 at 1:36 PM Kostas Kloudas <
>>>>>>> k.klou...@data-artisans.com> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> Sorry, previously I got confused and I assumed you were using
>>>>>>>> Flink's StreamingFileSink.
>>>>>>>>
>>>>>>>> Could you try to use Flink's Avro - Parquet writer?
>>>>>>>>
>>>>>>>> StreamingFileSink.forBulkFormat(
>>>>>>>>       Path...(MY_PATH),
>>>>>>>>       ParquetAvroWriters.forGenericRecord(MY_SCHEMA))
>>>>>>>> .build()
>>>>>>>>
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Kostas
>>>>>>>>
>>>>>>>> On Thu, Nov 29, 2018 at 12:25 PM Avi Levi <avi.l...@bluevoyant.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks.
>>>>>>>>> yes, the *env.execute* is called and enabled checkpoints
>>>>>>>>> I think the problem is where to place the *writer.close *to flush
>>>>>>>>> the cache
>>>>>>>>> If I'll place on the sink after the write event e.g
>>>>>>>>> addSink{
>>>>>>>>> writer.write
>>>>>>>>> writer.close
>>>>>>>>> }
>>>>>>>>> in this case only the first record will be included in the file
>>>>>>>>> but not the rest of the stream.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Nov 29, 2018 at 11:07 AM Kostas Kloudas <
>>>>>>>>> k.klou...@data-artisans.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi again Avi,
>>>>>>>>>>
>>>>>>>>>> In the first example that you posted (the one with the Kafka
>>>>>>>>>> source), do you call env.execute()?
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Kostas
>>>>>>>>>>
>>>>>>>>>> On Thu, Nov 29, 2018 at 10:01 AM Kostas Kloudas <
>>>>>>>>>> k.klou...@data-artisans.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Avi,
>>>>>>>>>>>
>>>>>>>>>>> In the last snippet that you posted, you have not activated
>>>>>>>>>>> checkpoints.
>>>>>>>>>>>
>>>>>>>>>>> Checkpoints are needed for the StreamingFileSink to produce
>>>>>>>>>>> results, especially in the case of BulkWriters (like Parquet) where
>>>>>>>>>>> the part file is rolled upon reception of a checkpoint and the
>>>>>>>>>>> part is finalised (i.e. "committed") when the checkpoint gets 
>>>>>>>>>>> completed
>>>>>>>>>>> successfully.
>>>>>>>>>>>
>>>>>>>>>>> Could you please enable checkpointing and make sure that the job
>>>>>>>>>>> runs long enough for at least some checkpoints to be completed?
>>>>>>>>>>>
>>>>>>>>>>> Thanks a lot,
>>>>>>>>>>> Kostas
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Nov 29, 2018 at 7:03 AM Avi Levi <
>>>>>>>>>>> avi.l...@bluevoyant.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Checkout this little App. you can see that the file is created
>>>>>>>>>>>> but no data is written. even for a single record
>>>>>>>>>>>>
>>>>>>>>>>>> import io.eels.component.parquet.ParquetWriterConfig
>>>>>>>>>>>> import org.apache.avro.Schema
>>>>>>>>>>>> import org.apache.avro.generic.{ GenericData, GenericRecord }
>>>>>>>>>>>> import 
>>>>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>>>>>>>>>>>> import org.apache.hadoop.fs.Path
>>>>>>>>>>>> import org.apache.parquet.avro.AvroParquetWriter
>>>>>>>>>>>> import org.apache.parquet.hadoop.{ ParquetFileWriter, 
>>>>>>>>>>>> ParquetWriter }
>>>>>>>>>>>> import org.apache.parquet.hadoop.metadata.CompressionCodecName
>>>>>>>>>>>> import scala.io.Source
>>>>>>>>>>>> import org.apache.flink.streaming.api.scala._
>>>>>>>>>>>>
>>>>>>>>>>>> object Tester extends App {
>>>>>>>>>>>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>>>>>>   def now = System.currentTimeMillis()
>>>>>>>>>>>>   val path = new Path(s"test-$now.parquet")
>>>>>>>>>>>>   val schemaString = 
>>>>>>>>>>>> Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString
>>>>>>>>>>>>   val schema: Schema = new Schema.Parser().parse(schemaString)
>>>>>>>>>>>>   val compressionCodecName = CompressionCodecName.SNAPPY
>>>>>>>>>>>>   val config = ParquetWriterConfig()
>>>>>>>>>>>>   val genericReocrd: GenericRecord = new GenericData.Record(schema)
>>>>>>>>>>>>   genericReocrd.put("name", "test_b")
>>>>>>>>>>>>   genericReocrd.put("code", "NoError")
>>>>>>>>>>>>   genericReocrd.put("ts", 100L)
>>>>>>>>>>>>   val stream = env.fromElements(genericReocrd)
>>>>>>>>>>>>   val writer: ParquetWriter[GenericRecord] = 
>>>>>>>>>>>> AvroParquetWriter.builder[GenericRecord](path)
>>>>>>>>>>>>     .withSchema(schema)
>>>>>>>>>>>>     .withCompressionCodec(compressionCodecName)
>>>>>>>>>>>>     .withPageSize(config.pageSize)
>>>>>>>>>>>>     .withRowGroupSize(config.blockSize)
>>>>>>>>>>>>     .withDictionaryEncoding(config.enableDictionary)
>>>>>>>>>>>>     .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>>>>>>>>>>     .withValidation(config.validating)
>>>>>>>>>>>>     .build()
>>>>>>>>>>>>
>>>>>>>>>>>>   writer.write(genericReocrd)
>>>>>>>>>>>>   stream.addSink { r =>
>>>>>>>>>>>>     println(s"In Sink $r")
>>>>>>>>>>>>     writer.write(r)
>>>>>>>>>>>>   }
>>>>>>>>>>>>   env.execute()
>>>>>>>>>>>>   //  writer.close()
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Nov 29, 2018 at 6:57 AM vipul singh <neoea...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Can you try closing the writer?
>>>>>>>>>>>>>
>>>>>>>>>>>>> AvroParquetWriter has an internal buffer. Try doing a .close()
>>>>>>>>>>>>> in snapshot()( since you are checkpointing hence this method will 
>>>>>>>>>>>>> be called)
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Nov 28, 2018 at 7:33 PM Avi Levi <
>>>>>>>>>>>>> avi.l...@bluevoyant.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks Rafi,
>>>>>>>>>>>>>> I am actually not using assignTimestampsAndWatermarks , I
>>>>>>>>>>>>>> will try to add it as you suggested. however it seems that the 
>>>>>>>>>>>>>> messages I
>>>>>>>>>>>>>> repeating in the stream over and over even if I am pushing 
>>>>>>>>>>>>>> single message
>>>>>>>>>>>>>> manually to the queue, that message will repeat infinity
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers
>>>>>>>>>>>>>> Avi
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Nov 28, 2018 at 10:40 PM Rafi Aroch <
>>>>>>>>>>>>>> rafi.ar...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Avi,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I can't see the part where you use
>>>>>>>>>>>>>>> assignTimestampsAndWatermarks.
>>>>>>>>>>>>>>> If this part in not set properly, it's possible that
>>>>>>>>>>>>>>> watermarks are not sent and nothing will be written to your 
>>>>>>>>>>>>>>> Sink.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> See here for more details:
>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hope this helps,
>>>>>>>>>>>>>>> Rafi
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Nov 28, 2018, 21:22 Avi Levi <
>>>>>>>>>>>>>>> avi.l...@bluevoyant.com wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I am trying to implement Parquet Writer as SinkFunction.
>>>>>>>>>>>>>>>> The pipeline consists of kafka as source and parquet file as a 
>>>>>>>>>>>>>>>> sink however
>>>>>>>>>>>>>>>> it seems like the stream is repeating itself like endless loop 
>>>>>>>>>>>>>>>> and the
>>>>>>>>>>>>>>>> parquet file is not written . can someone please help me with 
>>>>>>>>>>>>>>>> this?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> object ParquetSinkWriter{
>>>>>>>>>>>>>>>>   private val path = new Path("tmp/pfile")
>>>>>>>>>>>>>>>>   private val schemaString =
>>>>>>>>>>>>>>>> Source.fromURL(getClass.getResource("/dns_request_schema.avsc")).mkString
>>>>>>>>>>>>>>>>   private val avroSchema: Schema = new
>>>>>>>>>>>>>>>> Schema.Parser().parse(schemaString)
>>>>>>>>>>>>>>>>   private val compressionCodecName =
>>>>>>>>>>>>>>>> CompressionCodecName.SNAPPY
>>>>>>>>>>>>>>>>   private   val config = ParquetWriterConfig()
>>>>>>>>>>>>>>>>   val writer: ParquetWriter[GenericRecord] =
>>>>>>>>>>>>>>>> AvroParquetWriter.builder[GenericRecord](path)
>>>>>>>>>>>>>>>>     .withSchema(avroSchema)
>>>>>>>>>>>>>>>>     .withCompressionCodec(compressionCodecName)
>>>>>>>>>>>>>>>>     .withPageSize(config.pageSize)
>>>>>>>>>>>>>>>>     .withRowGroupSize(config.blockSize)
>>>>>>>>>>>>>>>>     .withDictionaryEncoding(config.enableDictionary)
>>>>>>>>>>>>>>>>     .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
>>>>>>>>>>>>>>>>     .withValidation(config.validating)
>>>>>>>>>>>>>>>>     .build()
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> class ParquetSinkWriter(path: Path, avroSchema: Schema)
>>>>>>>>>>>>>>>> extends SinkFunction[GenericRecord] {
>>>>>>>>>>>>>>>>   import ParquetSinkWriter._
>>>>>>>>>>>>>>>>   override def invoke(value: GenericRecord): Unit = {
>>>>>>>>>>>>>>>>     println(s"ADDING TO File : $value") // getting this
>>>>>>>>>>>>>>>> output
>>>>>>>>>>>>>>>>     writer.write(value) //the output is not written to the
>>>>>>>>>>>>>>>> file
>>>>>>>>>>>>>>>>   }
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> //main app
>>>>>>>>>>>>>>>> object StreamingJob extends App  {
>>>>>>>>>>>>>>>>  implicit val env: StreamExecutionEnvironment =
>>>>>>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>>>>>>>>>>   env.enableCheckpointing(500)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
>>>>>>>>>>>>>>>>   env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
>>>>>>>>>>>>>>>>   env.getCheckpointConfig.setCheckpointTimeout(600)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> env.getCheckpointConfig.setFailOnCheckpointingErrors(false)
>>>>>>>>>>>>>>>>   env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> env.setRestartStrategy(RestartStrategies.failureRateRestart(2,
>>>>>>>>>>>>>>>> Time.seconds(3), Time.seconds(3)))
>>>>>>>>>>>>>>>>   val backend: StateBackend = new
>>>>>>>>>>>>>>>> RocksDBStateBackend("file:///tmp/rocksdb", true)
>>>>>>>>>>>>>>>>   env.setStateBackend(backend)
>>>>>>>>>>>>>>>>   val writer = new ParquetSinkWriter(outputPath, schema)
>>>>>>>>>>>>>>>>   *val stream2: DataStream[DnsRequest] =
>>>>>>>>>>>>>>>> env.addSource(//consume from kafka)*
>>>>>>>>>>>>>>>> *stream2.map { r =>*
>>>>>>>>>>>>>>>> *    println(s"MAPPING $r") //this output keeps repeating
>>>>>>>>>>>>>>>> in a loop*
>>>>>>>>>>>>>>>> *    val genericReocrd: GenericRecord = new
>>>>>>>>>>>>>>>> GenericData.Record(schema)*
>>>>>>>>>>>>>>>> *    genericReocrd.put("qname", r.qname)*
>>>>>>>>>>>>>>>> *    genericReocrd.put("rcode", r.rcode)*
>>>>>>>>>>>>>>>> *    genericReocrd.put("ts", r.ts)*
>>>>>>>>>>>>>>>> *    genericReocrd*
>>>>>>>>>>>>>>>> *  }.addSink(writer) *
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for your help
>>>>>>>>>>>>>>>> Avi
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Vipul
>>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to