Re: Job gets stuck when using kafka transactions and eventually crashes
Can you elaborate a bit more? While idleness is not what we’re seeing now, it could perhaps be an issue later on. What about a certain partition going idle will result in state buildup? Thanks, Vishal On 25 Jan 2023 at 9:14 PM +0530, Martijn Visser , wrote: > Hi Vishal, > > Could idleness be an issue? I could see that if idleness occurs and the Kafka > Source not going in an idle state, that more internal state (to commit Kafka > transactions) can build up over time that ultimately causes an out of memory > problem. See > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#idleness > for more details on this. > > Best regards, > > Martijn > > > Op ma 23 jan. 2023 om 10:53 schreef Vishal Surana : > > > Could it be that link is unable to commit offsets to Kafka? I know that > > > blinks checkpoint mechanism isn’t tied to its ability to commit offset > > > but at the same time, we’ve seen that the job can take hours to commit > > > offsets while checkpoints go through successfully during that period. But > > > with Kafka transactions enabled, the commit of offset is now required to > > > happen. > > > > > > Thanks, > > > Vishal > > > On 23 Jan 2023 at 12:18 PM +0530, Vishal Surana , > > > wrote: > > > > My job runs fine when running without kafka transactions. The source > > > > and sink are kafka in my job with a couple of RocksDB based stateful > > > > operators taking 100GB each. > > > > > > > > When I enable kafka transactions, things go well initially and we can > > > > see high throughput as well. However, after a few hours, the job seems > > > > to get stuck as it's unable to commit the transaction, due to which > > > > it's unable to consume any more messages as we've enabled exactly once > > > > processing with unaligned checkpoints. The number of hours it takes > > > > might vary but it always happens and eventually the job crashes with > > > > this exception: > > > > > > > > ERROR org.apache.kafka.common.utils.KafkaThread : - Uncaught > > > > exception in thread 'kafka-producer-network-thread | > > > > producer-TRANSACTION_ID_PREFIX-1-17060': > > > > java.lang.OutOfMemoryError: Direct buffer memory\n\tat > > > > java.nio.Bits.reserveMemory(Bits.java: 175) > > > > at java.nio.DirectByteBuffer.(DirectByteBuffer.java: 118) > > > > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java: 317) > > > > at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java: 242) > > > > at sun.nio.ch.IOUtil.write(IOUtil.java: 164) > > > > at sun.nio.ch.IOUtil.write(IOUtil.java: 130) > > > > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java: 493) > > > > at java.nio.channels.SocketChannel.write(SocketChannel.java: 507) > > > > at > > > > org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java: > > > > 152) > > > > at > > > > org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java: > > > > 58) > > > > at > > > > org.apache.kafka.common.network.NetworkSend.writeTo(NetworkSend.java: > > > > 41) > > > > at > > > > org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java: > > > > 430) > > > > at org.apache.kafka.common.network.Selector.write(Selector.java: 644) > > > > at org.apache.kafka.common.network.Selector.attemptWrite(Selector.java: > > > > 637) > > > > at > > > > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java: > > > > 593) > > > > at org.apache.kafka.common.network.Selector.poll(Selector.java: 481) > > > > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java: 561) > > > > at > > > > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java: > > > > 327) > > > > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java: > > > > 242) > > > > at java.lang.Thread.run(Thread.java: 829) > > > > > > > > What seems to be happening all of a sudden? Any suggestions on how to > > > > fix it? > > > > > > > > -- > > > > Regards, > > > > Vishal
Re: Job gets stuck when using kafka transactions and eventually crashes
Could it be that link is unable to commit offsets to Kafka? I know that blinks checkpoint mechanism isn’t tied to its ability to commit offset but at the same time, we’ve seen that the job can take hours to commit offsets while checkpoints go through successfully during that period. But with Kafka transactions enabled, the commit of offset is now required to happen. Thanks, Vishal On 23 Jan 2023 at 12:18 PM +0530, Vishal Surana , wrote: > My job runs fine when running without kafka transactions. The source and sink > are kafka in my job with a couple of RocksDB based stateful operators taking > 100GB each. > > When I enable kafka transactions, things go well initially and we can see > high throughput as well. However, after a few hours, the job seems to get > stuck as it's unable to commit the transaction, due to which it's unable to > consume any more messages as we've enabled exactly once processing with > unaligned checkpoints. The number of hours it takes might vary but it always > happens and eventually the job crashes with this exception: > > ERROR org.apache.kafka.common.utils.KafkaThread : - Uncaught exception in > thread 'kafka-producer-network-thread | > producer-TRANSACTION_ID_PREFIX-1-17060': > java.lang.OutOfMemoryError: Direct buffer memory\n\tat > java.nio.Bits.reserveMemory(Bits.java: 175) > at java.nio.DirectByteBuffer.(DirectByteBuffer.java: 118) > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java: 317) > at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java: 242) > at sun.nio.ch.IOUtil.write(IOUtil.java: 164) > at sun.nio.ch.IOUtil.write(IOUtil.java: 130) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java: 493) > at java.nio.channels.SocketChannel.write(SocketChannel.java: 507) > at > org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java: > 152) > at > org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java: > 58) > at org.apache.kafka.common.network.NetworkSend.writeTo(NetworkSend.java: 41) > at org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java: 430) > at org.apache.kafka.common.network.Selector.write(Selector.java: 644) > at org.apache.kafka.common.network.Selector.attemptWrite(Selector.java: 637) > at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java: > 593) > at org.apache.kafka.common.network.Selector.poll(Selector.java: 481) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java: 561) > at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java: > 327) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java: 242) > at java.lang.Thread.run(Thread.java: 829) > > What seems to be happening all of a sudden? Any suggestions on how to fix it? > > -- > Regards, > Vishal
Job gets stuck when using kafka transactions and eventually crashes
My job runs fine when running without kafka transactions. The source and sink are kafka in my job with a couple of RocksDB based stateful operators taking 100GB each. When I enable kafka transactions, things go well initially and we can see high throughput as well. However, after a few hours, the job seems to get stuck as it's unable to commit the transaction, due to which it's unable to consume any more messages as we've enabled exactly once processing with unaligned checkpoints. The number of hours it takes might vary but it always happens and eventually the job crashes with this exception: ERROR org.apache.kafka.common.utils.KafkaThread : - Uncaught exception in thread 'kafka-producer-network-thread | producer-TRANSACTION_ID_PREFIX-1-17060': java.lang.OutOfMemoryError: Direct buffer memory\n\tat java.nio.Bits.reserveMemory(Bits.java: 175) at java.nio.DirectByteBuffer.(DirectByteBuffer.java: 118) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java: 317) at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java: 242) at sun.nio.ch.IOUtil.write(IOUtil.java: 164) at sun.nio.ch.IOUtil.write(IOUtil.java: 130) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java: 493) at java.nio.channels.SocketChannel.write(SocketChannel.java: 507) at org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java: 152) at org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java: 58) at org.apache.kafka.common.network.NetworkSend.writeTo(NetworkSend.java: 41) at org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java: 430) at org.apache.kafka.common.network.Selector.write(Selector.java: 644) at org.apache.kafka.common.network.Selector.attemptWrite(Selector.java: 637) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java: 593) at org.apache.kafka.common.network.Selector.poll(Selector.java: 481) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java: 561) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java: 327) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java: 242) at java.lang.Thread.run(Thread.java: 829) What seems to be happening all of a sudden? Any suggestions on how to fix it? -- Regards, Vishal
Re: Kafka transactioins & flink checkpoints
Yes. I do use RocksDB for (incremental) checkpointing. During each checkpoint 15-20GB of state gets created (new state added, some expired). I make use of FIFO compaction. I’m a bit surprised you were able to run with 10+TB state without unaligned checkpoints because the performance in my application degrades quite a lot. Can you share your checkpoint configurations? Thanks, Vishal On 15 Nov 2022, 10:07 PM +0530, Yaroslav Tkachenko , wrote: > Hi Vishal, > > Just wanted to comment on this bit: > > > My job has very large amount of state (>100GB) and I have no option but to > > use unaligned checkpoints. > > I successfully ran Flink jobs with 10+ TB of state and no unaligned > checkpoints enabled. Usually, you consider enabling them when there is some > kind of skew in the topology, but IMO it's unrelated to the state size. > > > Reducing the checkpoint interval is not really an option given the size of > > the checkpoint > > Do you use RocksDB state backend with incremental checkpointing? > > > On Tue, Nov 15, 2022 at 12:07 AM Vishal Surana wrote: > > > I wanted to achieve exactly once semantics in my job and wanted to make > > > sure I understood the current behaviour correctly: > > > > > > 1. Only one Kafka transaction at a time (no concurrent checkpoints) > > > 2. Only one transaction per checkpoint > > > > > > > > > My job has very large amount of state (>100GB) and I have no option but > > > to use unaligned checkpoints. With the above limitation, it seems to me > > > that if checkpoint interval is 1 minute and checkpoint takes about 10 > > > seconds to complete then only one Kafka transaction can happen in 70 > > > seconds. All of the output records will not be visible until the > > > transaction completes. This way a steady stream of inputs will result in > > > an buffered output stream where data is only visible after a minute, > > > thereby destroying any sort of real time streaming use cases. Reducing > > > the checkpoint interval is not really an option given the size of the > > > checkpoint. Only way out would be to allow for multiple transactions per > > > checkpoint. > > > > > > Thanks, > > > Vishal
Kafka transactioins & flink checkpoints
I wanted to achieve exactly once semantics in my job and wanted to make sure I understood the current behaviour correctly: 1. Only one Kafka transaction at a time (no concurrent checkpoints) 2. Only one transaction per checkpoint My job has very large amount of state (>100GB) and I have no option but to use unaligned checkpoints. With the above limitation, it seems to me that if checkpoint interval is 1 minute and checkpoint takes about 10 seconds to complete then only one Kafka transaction can happen in 70 seconds. All of the output records will not be visible until the transaction completes. This way a steady stream of inputs will result in an buffered output stream where data is only visible after a minute, thereby destroying any sort of real time streaming use cases. Reducing the checkpoint interval is not really an option given the size of the checkpoint. Only way out would be to allow for multiple transactions per checkpoint. Thanks, Vishal
When should we use flink-json instead of Jackson directly?
I've been using Jackson to deserialize JSON messages into Scala classes and Java POJOs. The object mapper is heavily customized for our use cases. It seems that flink-json internally uses Jackson as well and allows for injecting our own mappers. Would there be any benefit of using flink-json instead? -- Thanks, Vishal
Re: ContinuousFileMonitoringFunction retrieved invalid state.
Wow! This is bad! I am using reactive mode and this is indeed the issue. This should have been urgently patched as jobs with upgraded Flink version are in very precarious position. With all the other upgrades (rocksdb, etc.) going into 1.15.0 there's no easy rollback. On Fri, Jul 1, 2022 at 8:14 AM Lijie Wang wrote: > Hi, > Are you using the reactive mode? There is a known issue like that: > https://issues.apache.org/jira/browse/FLINK-28274 > > Best, > Lijie > > yuxia 于2022年7月1日周五 09:49写道: > >> I'm not sure why it happened. But from the Flink source code, it seems >> try to restore from an invalid state. Seems the state actually contains >> more that one value, but Flink expected the state should contains one or >> zero value. >> >> Best regards, >> Yuxia >> >> -- >> *发件人: *"Vishal Surana" >> *收件人: *"User" >> *发送时间: *星期五, 2022年 7 月 01日 上午 5:28:07 >> *主题: *ContinuousFileMonitoringFunction retrieved invalid state. >> >> My job is unable to restore state after savepoint due to the following >> exception. Seems to be a rare exception as I haven't found any forum >> discussing it. Please advise. >> >> java.lang.IllegalArgumentException: ContinuousFileMonitoringFunction >> retrieved invalid state. >> at >> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) >> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] >> at >> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.initializeState(ContinuousFileMonitoringFunction.java:167) >> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] >> at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) >> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] >> at >> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) >> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:94) >> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] >> at >> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) >> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286) >> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] >> at >> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) >> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) >> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] >> at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) >> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) >> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) >> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] >> at >> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) >> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] >> at >> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) >> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) >> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) >> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] >> at java.lang.Thread.run(Thread.java:829) ~[?:?] >> >> -- >> Regards, >> Vishal >> >> -- Regards, Vishal
Can FIFO compaction with RocksDB result in data loss?
In my load tests, I've found FIFO compaction to offer the best performance as my job needs state only for so long. However, this particular statement in RocksDB documentation concerns me: "Since we never rewrite the key-value pair, we also don't ever apply the compaction filter on the keys." This is from - https://github.com/facebook/rocksdb/wiki/FIFO-compaction-style I've observed that SST files are getting compacted into larger SST files until a configured threshold is reached. Thus I'm not sure what's going on anymore. My questions at this stage are: 1. If there's a value that I get from RocksDB and I decide to update this value back then will it work with FIFO compaction? 2. SST files get created each time a checkpoint is triggered. At this point, does the data for a given key get merged in case the initial data was read from an SST file while the update must have happened in memory? 3. If the answer to above is yes, then I suppose I can use FIFO compaction. However, then my question is whether the RocksDB documentation is wrong or whether Flink is doing something in addition to what RocksDB does. Thanks!
ContinuousFileMonitoringFunction retrieved invalid state.
My job is unable to restore state after savepoint due to the following exception. Seems to be a rare exception as I haven't found any forum discussing it. Please advise. java.lang.IllegalArgumentException: ContinuousFileMonitoringFunction retrieved invalid state. at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.initializeState(ContinuousFileMonitoringFunction.java:167) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:94) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT] at java.lang.Thread.run(Thread.java:829) ~[?:?] -- Regards, Vishal
Re: Optimizing parallelism in reactive mode with adaptive scaling
I've attached a screenshot of the job which highlights the "missing slots". [image: Screenshot 2022-06-29 at 9.38.54 PM.png] Coming to slot sharing, it seems that slot sharing isn't being honored. It doesn't matter if I put 2 or 3 of the 3 heavy weight operators - flink is simply ignoring it and assigning them identical number of slots while not putting them in the slot group. I say this because the number above in the picture (145 slots) is arrived at by counting all the slots. You can tell me what I'm doing wrong. I've asked for many more slots (~350) but job manager isn't able to as presumably there aren't any available slots. Thanks for your reply! -Vishal On Wed, Jun 29, 2022 at 7:30 PM Weihua Hu wrote: > Hi, Vishal > > The reactive mode will adjust the parallelism of tasks by slots of > cluster. it will not allocate new workers automatically.[1] > 1. max parallelism only works to scale up the parallelism of tasks. it > will not affect the scheduling of tasks. > 2. flink will enable slot sharing by default, use two slot sharing groups > to split tasks will cause more slots in summary. > 3. Are you sure there are 20 task managers running? Could you give a pic > of your job UI? > 4. That depends on whether you have some hot-key in data, if the hot-key > data is dealt by the same operator, it will cause the overload. > > Maybe just giving this job more task managers and slots will solve the > heavy operators. > > > [1]https://flink.apache.org/2021/05/06/reactive-mode.html > > Best, > Weihua > > > On Wed, Jun 29, 2022 at 8:56 PM Vishal Surana wrote: > >> I have a job which has about 10 operators, 3 of which are heavy weight. I >> understand that the current implementation of autoscaling gives more or >> less no configurability besides max parallelism. That is practically >> useless as the operators I have will inevitably choke if one of the 3 ends >> up with insufficient slots. I have explored the following: >> >>1. Set very high max parallelism for the most heavy weight operator >>with the hope that flink can use this signal to allocate subtasks. But >> this >>doesn't work >>2. I used slot sharing to group 2 of the 3 operators and created a >>slot sharing group for just the other one with the hope that it will >>free up more slots. Both of these are stateful operators with RocksDB >> being >>the state backend. However despite setting the same slot sharing group >>name, they're scheduled independently and each of the three (successive) >>operators end up with the exact same parallelism no matter how many task >>managers are running. I say slot sharing doesn't work because if it did, >>there would have been more available slots. It is curious that flink ends >>up allocating an identical number of slots to each. >>3. When slot sharing is enabled, my other jobs are able to work with >>very few slots. In this job, I see the opposite. For instance, if I spin >> up >>20 task managers each with 16 slots, then there are 320 available slots. >>However once the job starts, the job itself says ~275 slots are used and >>the number of available slots in the GUI is 0. I have verified that 275 is >>the correct number by examining the number of subtasks of each operator. >>How can that be? Where are the remaining slots? >>4. While the data is partitioned by a hash function that ought to >>more or less distribute data randomly across operators, I can see that >> some >>operators are overloaded while others aren't. Does flink try to avoid >>uniformly distributing load for any reason, possibly to reduce network? Is >>there a way to disable such a feature? >> >> I'm running flink version 1.13.5 but I didn't see any related change in >> recent versions of flink. >> >> Thanks a lot! >> >> -- >> Vishal >> > -- Regards, Vishal
Optimizing parallelism in reactive mode with adaptive scaling
I have a job which has about 10 operators, 3 of which are heavy weight. I understand that the current implementation of autoscaling gives more or less no configurability besides max parallelism. That is practically useless as the operators I have will inevitably choke if one of the 3 ends up with insufficient slots. I have explored the following: 1. Set very high max parallelism for the most heavy weight operator with the hope that flink can use this signal to allocate subtasks. But this doesn't work 2. I used slot sharing to group 2 of the 3 operators and created a slot sharing group for just the other one with the hope that it will free up more slots. Both of these are stateful operators with RocksDB being the state backend. However despite setting the same slot sharing group name, they're scheduled independently and each of the three (successive) operators end up with the exact same parallelism no matter how many task managers are running. I say slot sharing doesn't work because if it did, there would have been more available slots. It is curious that flink ends up allocating an identical number of slots to each. 3. When slot sharing is enabled, my other jobs are able to work with very few slots. In this job, I see the opposite. For instance, if I spin up 20 task managers each with 16 slots, then there are 320 available slots. However once the job starts, the job itself says ~275 slots are used and the number of available slots in the GUI is 0. I have verified that 275 is the correct number by examining the number of subtasks of each operator. How can that be? Where are the remaining slots? 4. While the data is partitioned by a hash function that ought to more or less distribute data randomly across operators, I can see that some operators are overloaded while others aren't. Does flink try to avoid uniformly distributing load for any reason, possibly to reduce network? Is there a way to disable such a feature? I'm running flink version 1.13.5 but I didn't see any related change in recent versions of flink. Thanks a lot! -- Vishal
Re: Broadcast State + Stateful Operator + Async IO
Thanks a lot for your quick response! Your suggestion however would never work for our use case. Ours is a streaming system that must process 100 thousand messages per second and produce immediate results and it's simply impossible to rerun the job. Our job is a streaming job broken down into various operators with very strict latency requirements (less than 10 seconds at all times). There could be multiple messages for a given entity in quick succession and ordered processing is another strict requirement. Question is how can we best leverage flink's features of stateful stream processing as well as async IO.
Re: Broadcast State + Stateful Operator + Async IO
Yes. You have explained my requirements exactly as they are. My operator will talk to multiple databases and a couple of web services to enrich incoming input streams. I cannot think of a way to use the async IO operator. So I thought maybe convert these 7-10 calls into async calls and chain the Futures together. I believe I have to block once in the end of the KeyedBroadcastProcessFunction but if there's a way to avoid that also while also ensuring ordered processing of events, then do let me know. On Fri, Apr 29, 2022 at 7:35 AM Guowei Ma wrote: > Hi Vishal > > I want to understand your needs first. Your requirements are: After a > stateful operator receives a notification, it needs to traverse all the > data stored in the operator state, communicate with an external system > during the traversal process (maybe similar to join?). In order to improve > the efficiency of this behavior, you want to take an asynchronous > approach. That is, if you modify the state of different keys, do not block > each other due to external communication. > If I understand correctly, according to the existing function of > KeyedBroadcastProcessFunction, it is really impossible. > As for whether there are other solutions, it may depend on specific > scenarios, such as what kind of external system. So could you describe in > detail what scenario has this requirement, and what are the external > systems it depends on? > > Best, > Guowei > > > On Fri, Apr 29, 2022 at 12:42 AM Vishal Surana > wrote: > >> Hello, >> My application has a stateful operator which leverages RocksDB to store a >> large amount of state. It, along with other operators receive configuration >> as a broadcast stream (KeyedBroadcastProcessFunction). The operator depends >> upon another input stream that triggers some communication with external >> services whose results are then combined to yield the state that gets >> stored in RocksDB. >> >> In order to make the application more efficient, I am going to switch to >> asynchronous IO but as the result is ultimately going to be a (Scala) >> Future, I will have to block once to get the result. I was hoping to >> leverage the Async IO operator but that apparently doesn't support RocksDB >> based state storage. Am I correct in saying >> that KeyedBroadcastProcessFunction is the only option I have? If so, then I >> want to understand how registering a future's callbacks (via onComplete) >> works with a synchronous operator such as KeyedBroadcastProcessFunction. >> Will the thread executing the function simply relinquish control to some >> other subtask while the results of the external services are being awaited? >> Will the callback eventually be triggered automatically or will I have to >> explicitly block on the result future like so: Await.result(f, timeout). >> >> -- >> Regards, >> Vishal >> > -- Regards, Vishal
Broadcast State + Stateful Operator + Async IO
Hello, My application has a stateful operator which leverages RocksDB to store a large amount of state. It, along with other operators receive configuration as a broadcast stream (KeyedBroadcastProcessFunction). The operator depends upon another input stream that triggers some communication with external services whose results are then combined to yield the state that gets stored in RocksDB. In order to make the application more efficient, I am going to switch to asynchronous IO but as the result is ultimately going to be a (Scala) Future, I will have to block once to get the result. I was hoping to leverage the Async IO operator but that apparently doesn't support RocksDB based state storage. Am I correct in saying that KeyedBroadcastProcessFunction is the only option I have? If so, then I want to understand how registering a future's callbacks (via onComplete) works with a synchronous operator such as KeyedBroadcastProcessFunction. Will the thread executing the function simply relinquish control to some other subtask while the results of the external services are being awaited? Will the callback eventually be triggered automatically or will I have to explicitly block on the result future like so: Await.result(f, timeout). -- Regards, Vishal
Disabling autogenerated uid/hash doesn't work when using file source
I set names and uid for all my flink operators and have explicitly disabled auto generation of uid to force developers in my team the same practice. However, when using a file source, there's no option of providing it due to which the job fails to start unless we enable auto generation. Am I doing something wrong?
Protobuf + Confluent Schema Registry support
Using the vanilla kafka producer, I can write protobuf messages to kafka while leveraging schema registry support as well. A flink kafka producer requires us to explicity provide a serializer which converts the message to a producerrecord containing the serialized bytes of the message. We can't make use of the KafkaProtoobufSerializer[T] provided by Confluent. Thus the only way I could think of would be to create an instance of KafkaProtobufSerializer inside a FlinkSerializationSchema class and use it to serialize my messages. The problem with that would be that I would have to implement registration of the schema and other tasks done by KafkaProtobufSerializer. Is there any other way to solve this problem? Is there a plan to support protobuf serialization along with schema registry support? I noticed you've recently added Avro + Schema Registry support to your codebase but haven't documented it. Is it ready for use?