Hi Danny, I looked into it in a bit more thorough detail, the bottleneck seems to be the transform function which is at 100% and causing back pressuring. Im looking into that. Thanks for your help, much appreciated !
On Fri, May 20, 2022 at 1:24 AM Ber, Jeremy <jd...@amazon.com> wrote: > Hi Zain— > > > > Are you seeing any data loss present within the Flink Dashboard subtasks > of each task? On the bottom of your dashboard you should see data going > from each blue box to the next. Is this a comprehensive set of data? > Meaning do you see 80M from the source -> first operator -> second operator > -> sink? > > > > Secondly, it may be easier to troubleshoot this by removing a few > variables. Would you be able to remove the operator which segregates your > data into 100 length records and simply forward that data to the next > operator? Simultaneously, could you leave the Kinesis Producer > configuration settings (apart from queue limit) at their defaults? This > will give a good baseline from which to improve upon. > > > > Jeremy > > > > *From: *Zain Haider Nemati <zain.hai...@retailo.co> > *Date: *Wednesday, May 18, 2022 at 6:15 AM > *To: *Danny Cranmer <dannycran...@apache.org> > *Cc: *Alexander Preuß <alexanderpre...@ververica.com>, user < > user@flink.apache.org> > *Subject: *RE: [EXTERNAL]Kinesis Sink - Data being received with > intermittent breaks > > > > *CAUTION*: This email originated from outside of the organization. Do not > click links or open attachments unless you can confirm the sender and know > the content is safe. > > > > Hey Danny, > > Thanks for getting back to me. > > - You are seeing bursty throughput, but the job is keeping up? There is no > backpressure? --> Correct I'm not seeing any backpressure in any of the > metrics > > - What is the throughput at the sink? --> num of records out -- 1100 per > 10 seconds > > - On the graph screenshot, what is the period and stat (sum/average/etc)? > -->It is incoming data (MB/s) each second > > > > So let me explain this in totality, the number of records residing in the > source are about 80 million and the number of records i see in the kinesis > data stream after it has consumed the data from source is about 20 million > so im seeing alot of data loss and I think this potentially has to do with > the intermediate dips im seeing in the records coming in the data stream. > > > > What are the configurations you guys generally suggest for data of this > range and sinking to a kinesis data stream? > > > > On Wed, May 18, 2022 at 2:00 AM Danny Cranmer <dannycran...@apache.org> > wrote: > > Hello Zain, > > > > Thanks for providing the additional information. Going back to the > original issue: > > - You are seeing bursty throughput, but the job is keeping up? There is no > backpressure? > > - What is the throughput at the sink? > > - On the graph screenshot, what is the period and stat (sum/average/etc)? > > > > Let me shed some light on the log messages, let's take this example: > > > > LogInputStreamReader ... Stage 1 Triggers ... { stream: > 'flink-kafka-tracer', manual: 0, count: 0, size: 0, matches: 0, timed: 3, > UserRecords: 6, KinesisRecords: 3 } > > > > Flush trigger reason: > > - manual: the flush was manually triggered > > - count: flush was triggered by the number of records in the container > > - size: the flush was triggered by the number of bytes in the container > > - matches: the predicate was matched > > - timed: the flush is triggered by elapsed timer > > > > Input/Output: > > - UserRecords: Number of input records KPL flushed (this can be higher > than KinesisRecords when aggregation is enabled) > > - KinesisRecords: Number of records shipped to Kinesis Data Streams > > > > Stage 2 triggers tells us the number of API invocations via the PutRecords > field. > > > > I can see from your logs that the majority of flushes are due to the > timer, and it does not look overly bursty. Seems to sit at around 3 records > per 15 seconds, or 1 record every 5 seconds. This seems very low, is it > expected? > > > > Thanks, > > Danny Cranmer > > > > On Mon, May 16, 2022 at 10:57 PM Zain Haider Nemati < > zain.hai...@retailo.co> wrote: > > Hey Danny, > > Thanks for having a look at the issue. > > I am using a custom flink operator to segregate the data into a consistent > format of length 100 which is no more than 1 MB. The configurations I > shared were after I was exploring tweaking some of them to see if it > improves the throughput. > > > > Regarding your queries : > > - Which Flink version is this? -- > *Version 1.13* > > - Can you see any errors in the Flink logs? -->* No, Im attaching flink > logs after I have set all the configurations to default* > > - Do you see any errors/throttles in the Kinesis Data Stream metrics? --> *I > was before segregating into smaller chunks not anymore* > > - How many shards does your stream have? --> *It has 4 shards* > > - What is your sink operator parallelism? --> *1* > > - What is the general health of your job graph? --> *This is the only job > running at the moment, it isn't unhealthy* > > - Are the operators upstream of the sink backpressured? --> *No* > > - Are you sure the sink is actually the issue here? --> * I have used > the .print() as a sink and Im seeing all the records in real time it chokes > when paired with sink* > > - Are there any other potential bottlenecks? --> *So data is coming in > from source correctly, I have a flatmap transformation enabled which reads > and segments it into chunks of <=1MB which is also tested using the > .print() sink* > > - When you say you are trying to achieve "1MB chunks", I assume this is > per Kinesis record, not per PutRecords batch? --> *Correct* > > > > Attaching a small chunk of the log file from when the job is started [It > goes down to 0 records for some periods of time as well, in the log file it > shows mostly between 3-6 records] > > > > Really appreciate your response on this, since I have not been able to > gather much help from other resources online. Would be great if you can let > me know what the issue here could be, let me know if you need to know > anything else as well ! > > > > Cheers > > > > > > On Tue, May 17, 2022 at 12:34 AM Danny Cranmer <dannycran...@apache.org> > wrote: > > Hello Zain, > > > > When you say "converting them to chunks of <= 1MB " does this mean you are > creating these chunks in a custom Flink operator, or you are relying on > the connector to do so? If you are generating your own chunks you can > potentially disable Aggregation at the sink. > > > > Your throughput is incredibly bursty, I have a few questions: > > - Which Flink version is this? > > - Can you see any errors in the Flink logs? > > - Do you see any errors/throttles in the Kinesis Data Stream metrics? > > - How many shards does your stream have? > > - What is your sink operator parallelism? > > - What is the general health of your job graph? > > - Are the operators upstream of the sink backpressured? > > - Are you sure the sink is actually the issue here? > > - Are there any other potential bottlenecks? > > - When you say you are trying to achieve "1MB chunks", I assume this is > per Kinesis record, not per PutRecords batch? > > > > Some comments on your configuration: > > > > As previously mentioned, if you are generating the chunks you can > potentially remove the aggregation config and disable it. > > - producerConfig.put(“AggregationMaxCount”, “3”); > - producerConfig.put(“AggregationMaxSize”, “256”); > > + producerConfig.put("AggregationEnabled”, “false”); > > > > This is very low, and could conflict with your chunk size. These > configurations are regarding the PutRecords request, which has a quota of > 500 records and 5MiB. You are setting the max size to 100kB, which is less > than your largest chunk. I would recommend removing these configurations. > > - producerConfig.put(“CollectionMaxCount”, “3”); > - producerConfig.put(“CollectionMaxSize”, “100000”); > > > > This is the default threading model, so can be removed. > > - producerConfig.put(“ThreadingModel”, “POOLED”); > > > > This config should not have too much impact. The default is 100ms, you are > increasing to 1s. This could increase your end-to-end latency under low > throughput scenarios. > > - producerConfig.put(“RecordMaxBufferedTime”, “1000"); > > > > This config controls the sink backpressure and can also impact throughput. > Do you see any logs like "Waiting for the queue length to drop below the > limit takes unusually long, still not done after <x> attempts"? > > kinesis.setQueueLimit(1000); > > > > Thanks, > > Danny > > > > On Mon, May 16, 2022 at 5:27 PM Alexander Preuß < > alexanderpre...@ververica.com> wrote: > > Hi Zain, > > > > I'm looping in Danny here, he is probably the most knowledgeable when it > comes to the Kinesis connector. > > > > Best, > > Alexander > > > > On Mon, May 16, 2022 at 12:13 AM Zain Haider Nemati < > zain.hai...@retailo.co> wrote: > > Hi, > > Im fetching data from kafka topics converting them to chunks of <= 1MB and > sinking them to a kinesis data stream. > > The streaming job is functional however I see bursts of data in kinesis > stream with intermittent dips where data received is 0. I'm attaching the > configuration parameters for kinesis sink. What could be the cause of this > issue? > > The data is being fed into datastream by a kafka topic which is being fed > in by a mongodb and has about 60 million records which are loaded fully. > > I am trying to configure parameters in such a way that the 1MB per data > payload limit of kinesis is not breached. Would appreciate help on this ! > > > > producerConfig.put(AWSConfigConstants.AWS_REGION, “xxx”); > > producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, “xxx"); > > producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, “xxx”); > producerConfig.put(“AggregationMaxCount”, “3”); > producerConfig.put(“AggregationMaxSize”, “256”); > producerConfig.put(“CollectionMaxCount”, “3”); > producerConfig.put(“CollectionMaxSize”, “100000”); > producerConfig.put(“AggregationEnabled”, true); > producerConfig.put(“RateLimit”, “50"); > producerConfig.put(“RecordMaxBufferedTime”, “1000"); > producerConfig.put(“ThreadingModel”, “POOLED”); > FlinkKinesisProducer<String> kinesis = new > FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig); > kinesis.setFailOnError(false); > kinesis.setDefaultStream(“xxx”); > kinesis.setDefaultPartition(“0"); > kinesis.setQueueLimit(1000); > > > > *Data in Kinesis :* > > > > > -- > > *Alexander Preuß* | Engineer - Data Intensive Systems > > alexanderpre...@ververica.com > > [image: Image removed by sender.] <https://www.ververica.com/> > > > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > > Ververica GmbH > > Registered at Amtsgericht Charlottenburg: HRB 158244 B > > Managing Directors: Alexander Walden, Karl Anton Wehner, Yip Park Tung > Jason, Jinwei (Kevin) Zhang > > > >