Hi Danny,
I looked into it in a bit more thorough detail, the bottleneck seems to be
the transform function which is at 100% and causing back pressuring. Im
looking into that.
Thanks for your help, much appreciated !

On Fri, May 20, 2022 at 1:24 AM Ber, Jeremy <jd...@amazon.com> wrote:

> Hi Zain—
>
>
>
> Are you seeing any data loss present within the Flink Dashboard subtasks
> of each task? On the bottom of your dashboard you should see data going
> from each blue box to the next. Is this a comprehensive set of data?
> Meaning do you see 80M from the source -> first operator -> second operator
> -> sink?
>
>
>
> Secondly, it may be easier to troubleshoot this by removing a few
> variables. Would you be able to remove the operator which segregates your
> data into 100 length records and simply forward that data to the next
> operator? Simultaneously, could you leave the Kinesis Producer
> configuration settings (apart from queue limit) at their defaults? This
> will give a good baseline from which to improve upon.
>
>
>
> Jeremy
>
>
>
> *From: *Zain Haider Nemati <zain.hai...@retailo.co>
> *Date: *Wednesday, May 18, 2022 at 6:15 AM
> *To: *Danny Cranmer <dannycran...@apache.org>
> *Cc: *Alexander Preuß <alexanderpre...@ververica.com>, user <
> user@flink.apache.org>
> *Subject: *RE: [EXTERNAL]Kinesis Sink - Data being received with
> intermittent breaks
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> Hey Danny,
>
> Thanks for getting back to me.
>
> - You are seeing bursty throughput, but the job is keeping up? There is no
> backpressure? --> Correct I'm not seeing any backpressure in any of the
> metrics
>
> - What is the throughput at the sink? --> num of records out -- 1100 per
> 10 seconds
>
> - On the graph screenshot, what is the period and stat (sum/average/etc)?
> -->It is incoming data (MB/s) each second
>
>
>
> So let me explain this in totality, the number of records residing in the
> source are about 80 million and the number of records i see in the kinesis
> data stream after it has consumed the data from source is about 20 million
> so im seeing alot of data loss and I think this potentially has to do with
> the intermediate dips im seeing in the records coming in the data stream.
>
>
>
> What are the configurations you guys generally suggest for data of this
> range and sinking to a kinesis data stream?
>
>
>
> On Wed, May 18, 2022 at 2:00 AM Danny Cranmer <dannycran...@apache.org>
> wrote:
>
> Hello Zain,
>
>
>
> Thanks for providing the additional information. Going back to the
> original issue:
>
> - You are seeing bursty throughput, but the job is keeping up? There is no
> backpressure?
>
> - What is the throughput at the sink?
>
> - On the graph screenshot, what is the period and stat (sum/average/etc)?
>
>
>
> Let me shed some light on the log messages, let's take this example:
>
>
>
> LogInputStreamReader ... Stage 1 Triggers ...  { stream:
> 'flink-kafka-tracer', manual: 0, count: 0, size: 0, matches: 0, timed: 3,
> UserRecords: 6, KinesisRecords: 3 }
>
>
>
> Flush trigger reason:
>
> - manual: the flush was manually triggered
>
> - count: flush was triggered by the number of records in the container
>
> - size: the flush was triggered by the number of bytes in the container
>
> - matches: the predicate was matched
>
> - timed: the flush is triggered by elapsed timer
>
>
>
> Input/Output:
>
> - UserRecords: Number of input records KPL flushed (this can be higher
> than KinesisRecords when aggregation is enabled)
>
> - KinesisRecords: Number of records shipped to Kinesis Data Streams
>
>
>
> Stage 2 triggers tells us the number of API invocations via the PutRecords
> field.
>
>
>
> I can see from your logs that the majority of flushes are due to the
> timer, and it does not look overly bursty. Seems to sit at around 3 records
> per 15 seconds, or 1 record every 5 seconds. This seems very low, is it
> expected?
>
>
>
> Thanks,
>
> Danny Cranmer
>
>
>
> On Mon, May 16, 2022 at 10:57 PM Zain Haider Nemati <
> zain.hai...@retailo.co> wrote:
>
> Hey Danny,
>
> Thanks for having a look at the issue.
>
> I am using a custom flink operator to segregate the data into a consistent
> format of length 100 which is no more than 1 MB. The configurations I
> shared were after I was exploring tweaking some of them to see if it
> improves the throughput.
>
>
>
> Regarding your queries :
>
> - Which Flink version is this? -- > *Version 1.13*
>
> - Can you see any errors in the Flink logs?  -->* No, Im attaching flink
> logs after I have set all the configurations to default*
>
> - Do you see any errors/throttles in the Kinesis Data Stream metrics?  --> *I
> was before segregating into smaller chunks not anymore*
>
> - How many shards does your stream have? --> *It has 4 shards*
>
> - What is your sink operator parallelism? --> *1*
>
> - What is the general health of your job graph? --> *This is the only job
> running at the moment, it isn't unhealthy*
>
>   - Are the operators upstream of the sink backpressured? --> *No*
>
>   - Are you sure the sink is actually the issue here? --> * I have used
> the .print() as a sink and Im seeing all the records in real time it chokes
> when paired with sink*
>
>   - Are there any other potential bottlenecks? --> *So data is coming in
> from source correctly, I have a flatmap transformation enabled which reads
> and segments it into chunks of <=1MB which is also tested using the
> .print() sink*
>
> - When you say you are trying to achieve "1MB chunks", I assume this is
> per Kinesis record, not per PutRecords batch? --> *Correct*
>
>
>
> Attaching a small chunk of the log file from when the job is started [It
> goes down to 0 records for some periods of time as well, in the log file it
> shows mostly between 3-6 records]
>
>
>
> Really appreciate your response on this, since I have not been able to
> gather much help from other resources online. Would be great if you can let
> me know what the issue here could be, let me know if you need to know
> anything else as well !
>
>
>
> Cheers
>
>
>
>
>
> On Tue, May 17, 2022 at 12:34 AM Danny Cranmer <dannycran...@apache.org>
> wrote:
>
> Hello Zain,
>
>
>
> When you say "converting them to chunks of <= 1MB " does this mean you are
> creating these chunks in a custom Flink operator, or you are relying on
> the connector to do so? If you are generating your own chunks you can
> potentially disable Aggregation at the sink.
>
>
>
> Your throughput is incredibly bursty, I have a few questions:
>
> - Which Flink version is this?
>
> - Can you see any errors in the Flink logs?
>
> - Do you see any errors/throttles in the Kinesis Data Stream metrics?
>
> - How many shards does your stream have?
>
> - What is your sink operator parallelism?
>
> - What is the general health of your job graph?
>
>   - Are the operators upstream of the sink backpressured?
>
>   - Are you sure the sink is actually the issue here?
>
>   - Are there any other potential bottlenecks?
>
> - When you say you are trying to achieve "1MB chunks", I assume this is
> per Kinesis record, not per PutRecords batch?
>
>
>
> Some comments on your configuration:
>
>
>
> As previously mentioned, if you are generating the chunks you can
> potentially remove the aggregation config and disable it.
>
> - producerConfig.put(“AggregationMaxCount”, “3”);
> - producerConfig.put(“AggregationMaxSize”, “256”);
>
> + producerConfig.put("AggregationEnabled”, “false”);
>
>
>
> This is very low, and could conflict with your chunk size. These
> configurations are regarding the PutRecords request, which has a quota of
> 500 records and 5MiB. You are setting the max size to 100kB, which is less
> than your largest chunk. I would recommend removing these configurations.
>
> - producerConfig.put(“CollectionMaxCount”, “3”);
> - producerConfig.put(“CollectionMaxSize”, “100000”);
>
>
>
> This is the default threading model, so can be removed.
>
> - producerConfig.put(“ThreadingModel”, “POOLED”);
>
>
>
> This config should not have too much impact. The default is 100ms, you are
> increasing to 1s. This could increase your end-to-end latency under low
> throughput scenarios.
>
> - producerConfig.put(“RecordMaxBufferedTime”, “1000");
>
>
>
> This config controls the sink backpressure and can also impact throughput.
> Do you see any logs like "Waiting for the queue length to drop below the
> limit takes unusually long, still not done after <x> attempts"?
>
> kinesis.setQueueLimit(1000);
>
>
>
> Thanks,
>
> Danny
>
>
>
> On Mon, May 16, 2022 at 5:27 PM Alexander Preuß <
> alexanderpre...@ververica.com> wrote:
>
> Hi Zain,
>
>
>
> I'm looping in Danny here, he is probably the most knowledgeable when it
> comes to the Kinesis connector.
>
>
>
> Best,
>
> Alexander
>
>
>
> On Mon, May 16, 2022 at 12:13 AM Zain Haider Nemati <
> zain.hai...@retailo.co> wrote:
>
> Hi,
>
> Im fetching data from kafka topics converting them to chunks of <= 1MB and
> sinking them to a kinesis data stream.
>
> The streaming job is functional however I see bursts of data in kinesis
> stream with intermittent dips where data received is 0. I'm attaching the
> configuration parameters for kinesis sink. What could be the cause of this
> issue?
>
> The data is being fed into datastream by a kafka topic which is being fed
> in by a mongodb and has about 60 million records which are loaded fully.
>
> I am trying to configure parameters in such a way that the 1MB per data
> payload limit of kinesis is not breached. Would appreciate help on this !
>
>
>
> producerConfig.put(AWSConfigConstants.AWS_REGION, “xxx”);
>
> producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, “xxx");
>
> producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, “xxx”);
>                     producerConfig.put(“AggregationMaxCount”, “3”);
>                     producerConfig.put(“AggregationMaxSize”, “256”);
>                     producerConfig.put(“CollectionMaxCount”, “3”);
>                     producerConfig.put(“CollectionMaxSize”, “100000”);
>                     producerConfig.put(“AggregationEnabled”, true);
>                     producerConfig.put(“RateLimit”, “50");
>                     producerConfig.put(“RecordMaxBufferedTime”, “1000");
>                     producerConfig.put(“ThreadingModel”, “POOLED”);
>                     FlinkKinesisProducer<String> kinesis = new
> FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
>                         kinesis.setFailOnError(false);
>                         kinesis.setDefaultStream(“xxx”);
>                         kinesis.setDefaultPartition(“0");
>                         kinesis.setQueueLimit(1000);
>
>
>
> *Data in Kinesis :*
>
>
>
>
> --
>
> *Alexander Preuß* | Engineer - Data Intensive Systems
>
> alexanderpre...@ververica.com
>
> [image: Image removed by sender.] <https://www.ververica.com/>
>
>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
>
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>
> Managing Directors: Alexander Walden, Karl Anton Wehner, Yip Park Tung
> Jason, Jinwei (Kevin) Zhang
>
>
>
>

Reply via email to