Re: Dynamic file name prefix - StreamingFileSink

2020-10-13 Thread Ravi Bhushan Ratnakar
Hi Vijayendra, OutputFileConfig provides a builder method to create immutable objects with given 'prefix' and 'suffix'. The parameter which you are passing to '*withPartPrefix*' will only be evaluated at the time of calling this method '*withPartPrefix*'. So if you want to achieve a dynamic

Re: [Compression] Flink DataStream[GenericRecord] Avrowriters

2020-07-30 Thread Ravi Bhushan Ratnakar
- for deflateCodec snappy - for snappyCodec bzip2 - for bzip2Codec xz - for xzCodec Regards, Ravi On Thu, Jul 30, 2020 at 8:21 AM Ravi Bhushan Ratnakar < ravibhushanratna...@gmail.com> wrote: > If it is possible, please share the sample output file. > Regards, > Ravi > > On T

Re: [Compression] Flink DataStream[GenericRecord] Avrowriters

2020-07-29 Thread Ravi Bhushan Ratnakar
dec.SNAPPY) stream:DataStream.writeUsingOutputFormat(aof) Regards, Ravi On Wed, Jul 29, 2020 at 9:12 PM Ravi Bhushan Ratnakar < ravibhushanratna...@gmail.com> wrote: > Hi Vijayendra, > > Currently AvroWriters doesn't support compression. If you want to use > compression th

Re: [Compression] Flink DataStream[GenericRecord] Avrowriters

2020-07-29 Thread Ravi Bhushan Ratnakar
Hi Vijayendra, Currently AvroWriters doesn't support compression. If you want to use compression then you need to have a custom implementation of AvroWriter where you can add features of compression. Please find a sample customization for AvroWriters where you could use compression. You can use

Re: Compression Streamingfilesink ROW-encoded format

2020-07-29 Thread Ravi Bhushan Ratnakar
ue, Jul 28, 2020 at 1:20 PM Ravi Bhushan Ratnakar < > ravibhushanratna...@gmail.com> wrote: > >> Hi Vijayendra, >> >> As far as rowFormat is concerned, it doesn't support compression. >> >> >> Regards, >> Ravi >> >> On Tue 28 Jul, 20

Re: Compression Streamingfilesink ROW-encoded format

2020-07-28 Thread Ravi Bhushan Ratnakar
; Regards, > Vijay > > On Tue, Jul 28, 2020 at 11:28 AM Ravi Bhushan Ratnakar < > ravibhushanratna...@gmail.com> wrote: > >> Hi Vijayendra, >> >> You could achieve row encoded with like this as well >> >> codecName = "org.apache.hadoop.i

Re: Compression Streamingfilesink ROW-encoded format

2020-07-28 Thread Ravi Bhushan Ratnakar
Hi Vijayendra, You could achieve row encoded with like this as well codecName = "org.apache.hadoop.io.compress.GzipCodec" val streamingFileSink:StreamingFileSink[String] = StreamingFileSink.forBulkFormat(new Path(outputPath),CompressWriters.forExtractor(new

Re: Kinesis Connector and Savepoint/Checkpoint restore.

2019-11-21 Thread Ravi Bhushan Ratnakar
heckpointed >> state. I guess this is okay since in production we won't have idle shards, >> but it might be better to send through a empty record that doesn't get >> emitted, but it does trigger a state update. >> >> -Steve >> >> >> On Wed, Oct 1

Re: Issue with BulkWriter

2019-10-22 Thread Ravi Bhushan Ratnakar
ream.write('\n'); > } > > public void finish() throws IOException { > compressedStream.flush(); > compressedStream.finish(); > } > > public void flush() throws IOException { > compressedStream.flush(); > } > } > > > On Mon, Oct 21, 2019 at 11:17

Re: Issue with BulkWriter

2019-10-22 Thread Ravi Bhushan Ratnakar
Hi, Seems like that you want to use "com.hadoop.compression.lzo.LzoCodec" instead of "com.hadoop.compression.lzo.*LzopCodec*" in the below line. compressedStream = factory.getCodecByClassName("com.hadoop.compression.lzo.LzopCodec").createOutputStream(stream); Regarding "lzop: unexpected end of

Re: Customize Part file naming (Flink 1.9.0)

2019-10-19 Thread Ravi Bhushan Ratnakar
Hi, As an alternative, you may use BucketingSink which provides you the provision to customize suffix/prefix. https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.html Regards, Ravi On Sat, Oct 19, 2019 at 3:54 AM

Re: Flink S3 sink unable to compress data

2019-10-18 Thread Ravi Bhushan Ratnakar
Hi, As per my understanding, Encoder's encode method is called for each and every message and hence it is not logical to create compressor around given output stream which will lead into unpredictable erroneous situation. Encode responsibility is to encode the given object, not to compress. It

Re: Kinesis Connector and Savepoint/Checkpoint restore.

2019-10-16 Thread Ravi Bhushan Ratnakar
to not exist in a savepoints? This seems like a big problem. > > -Steve > > On Wed, Oct 16, 2019 at 12:08 AM Ravi Bhushan Ratnakar < > ravibhushanratna...@gmail.com> wrote: > >> Hi, >> >> I am also facing the same problem. I am using Flink 1.9.0 and consuming >>

Re: Jar Uploads in High Availability (Flink 1.7.2)

2019-10-15 Thread Ravi Bhushan Ratnakar
Hi, i was also experiencing with the similar behavior. I adopted following approach - used a distributed file system(in my case aws efs) and set the attribute "web.upload.dir", this way both the job manager have same location. - on the load balancer side(aws elb), i used "readiness

Re: Kinesis Connector and Savepoint/Checkpoint restore.

2019-10-15 Thread Ravi Bhushan Ratnakar
Hi, I am also facing the same problem. I am using Flink 1.9.0 and consuming from Kinesis source with retention of 1 day. I am observing that when the job is submitted with "latest" initial stream position, the job starts well and keep on processing data from all the shards for very long period of

Re: Checkpointing is not performing well

2019-09-11 Thread Ravi Bhushan Ratnakar
herwise i am sure its always the problem whatever the kind of > streaming engine you use. Tune your configuration to get the optimal rate > so that flink checkpoint state is healthier. > > Regards > Bhaskar > > On Tue, Sep 10, 2019 at 11:16 PM Ravi Bhushan Ratnakar < > r

Re: Checkpointing is not performing well

2019-09-10 Thread Ravi Bhushan Ratnakar
>> Ravi, have you looked at the io operation(iops) rate of the disk? You can >> monitoring the iops performance and tune it accordingly with your work >> load. This helped us in our project when we hit the wall tuning prototype >> much all the parameters. >> >&g

Re: Checkpointing is not performing well

2019-09-07 Thread Ravi Bhushan Ratnakar
cs-release-1.9/ops/state/state_backends.html#the-rocksdbstatebackend > > > Thanks, > Rafi > > On Sat, Sep 7, 2019, 17:47 Ravi Bhushan Ratnakar < > ravibhushanratna...@gmail.com> wrote: > >> Hi All, >> >> I am writing a streaming application using Flink 1.9. This

Checkpointing is not performing well

2019-09-07 Thread Ravi Bhushan Ratnakar
Hi All, I am writing a streaming application using Flink 1.9. This application consumes data from kinesis stream which is basically avro payload. Application is using KeyedProcessFunction to execute business logic on the basis of correlation id using event time characteristics with below

StreamingFileSink not committing file to S3

2019-08-05 Thread Ravi Bhushan Ratnakar
ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kinesis.html#event-time-for-consumed-records > > Thanks, > Rafi > > > On Sat, Aug 3, 2019 at 8:23 PM Ravi Bhushan Ratnakar < > ravibhushanratna...@gmail.com> wrote: > >> Hi All, >> >>

Re: Flink Kafka to BucketingSink to S3 - S3Exception

2018-11-05 Thread Ravi Bhushan Ratnakar
could you please share your dependency versioning? >>2. Does this use a kafka source with high flink parallelism (~400) >>for all kafka partitions and does it run continuously for several days? >> 3. Could you please share your checkpoint interval configuration, >>batc

Re: Flink Kafka to BucketingSink to S3 - S3Exception

2018-11-03 Thread Ravi Bhushan Ratnakar
I have done little changes in BucketingSink and implemented as new CustomBucketingSink to use in my project which works fine with s3 and s3a protocol. This implementation doesn't require xml file configuration, rather than it uses configuration provided using flink configuration object by calling

Need help regarding Flink Batch Application

2018-08-08 Thread Ravi Bhushan Ratnakar
Hi Everybody, Currently I am working on a project where i need to write a Flink Batch Application which has to process hourly data around 400GB of compressed sequence file. After processing, it has write it as compressed parquet format in S3. I have managed to write the application in Flink and