Re: Job gets stuck when using kafka transactions and eventually crashes

2023-01-25 Thread Vishal Surana
Can you elaborate a bit more? While idleness is not what we’re seeing now, it 
could perhaps be an issue later on. What about a certain partition going idle 
will result in state buildup?

Thanks,
Vishal
On 25 Jan 2023 at 9:14 PM +0530, Martijn Visser , 
wrote:
> Hi Vishal,
>
> Could idleness be an issue? I could see that if idleness occurs and the Kafka 
> Source not going in an idle state, that more internal state (to commit Kafka 
> transactions) can build up over time that ultimately causes an out of memory 
> problem. See 
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#idleness
>  for more details on this.
>
> Best regards,
>
> Martijn
>
> > Op ma 23 jan. 2023 om 10:53 schreef Vishal Surana :
> > > Could it be that link is unable to commit offsets to Kafka? I know that 
> > > blinks checkpoint mechanism isn’t tied to its ability to commit offset 
> > > but at the same time, we’ve seen that the job can take hours to commit 
> > > offsets while checkpoints go through successfully during that period. But 
> > > with Kafka transactions enabled, the commit of offset is now required to 
> > > happen.
> > >
> > > Thanks,
> > > Vishal
> > > On 23 Jan 2023 at 12:18 PM +0530, Vishal Surana , 
> > > wrote:
> > > > My job runs fine when running without kafka transactions. The source 
> > > > and sink are kafka in my job with a couple of RocksDB based stateful 
> > > > operators taking 100GB each.
> > > >
> > > > When I enable kafka transactions, things go well initially and we can 
> > > > see high throughput as well. However, after a few hours, the job seems 
> > > > to get stuck as it's unable to commit the transaction, due to which 
> > > > it's unable to consume any more messages as we've enabled exactly once 
> > > > processing with unaligned checkpoints. The number of hours it takes 
> > > > might vary but it always happens and eventually the job crashes with 
> > > > this exception:
> > > >
> > > > ERROR org.apache.kafka.common.utils.KafkaThread : - Uncaught 
> > > > exception in thread 'kafka-producer-network-thread | 
> > > > producer-TRANSACTION_ID_PREFIX-1-17060':
> > > > java.lang.OutOfMemoryError: Direct buffer memory\n\tat 
> > > > java.nio.Bits.reserveMemory(Bits.java: 175)
> > > > at java.nio.DirectByteBuffer.(DirectByteBuffer.java: 118)
> > > > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java: 317)
> > > > at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java: 242)
> > > > at sun.nio.ch.IOUtil.write(IOUtil.java: 164)
> > > > at sun.nio.ch.IOUtil.write(IOUtil.java: 130)
> > > > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java: 493)
> > > > at java.nio.channels.SocketChannel.write(SocketChannel.java: 507)
> > > > at 
> > > > org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java:
> > > >  152)
> > > > at 
> > > > org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java:
> > > >  58)
> > > > at 
> > > > org.apache.kafka.common.network.NetworkSend.writeTo(NetworkSend.java: 
> > > > 41)
> > > > at 
> > > > org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java: 
> > > > 430)
> > > > at org.apache.kafka.common.network.Selector.write(Selector.java: 644)
> > > > at org.apache.kafka.common.network.Selector.attemptWrite(Selector.java: 
> > > > 637)
> > > > at 
> > > > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:
> > > >  593)
> > > > at org.apache.kafka.common.network.Selector.poll(Selector.java: 481)
> > > > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java: 561)
> > > > at 
> > > > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java: 
> > > > 327)
> > > > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java: 
> > > > 242)
> > > > at java.lang.Thread.run(Thread.java: 829)
> > > >
> > > > What seems to be happening all of a sudden? Any suggestions on how to 
> > > > fix it?
> > > >
> > > > --
> > > > Regards,
> > > > Vishal


Re: Job gets stuck when using kafka transactions and eventually crashes

2023-01-23 Thread Vishal Surana
Could it be that link is unable to commit offsets to Kafka? I know that blinks 
checkpoint mechanism isn’t tied to its ability to commit offset but at the same 
time, we’ve seen that the job can take hours to commit offsets while 
checkpoints go through successfully during that period. But with Kafka 
transactions enabled, the commit of offset is now required to happen.

Thanks,
Vishal
On 23 Jan 2023 at 12:18 PM +0530, Vishal Surana , wrote:
> My job runs fine when running without kafka transactions. The source and sink 
> are kafka in my job with a couple of RocksDB based stateful operators taking 
> 100GB each.
>
> When I enable kafka transactions, things go well initially and we can see 
> high throughput as well. However, after a few hours, the job seems to get 
> stuck as it's unable to commit the transaction, due to which it's unable to 
> consume any more messages as we've enabled exactly once processing with 
> unaligned checkpoints. The number of hours it takes might vary but it always 
> happens and eventually the job crashes with this exception:
>
> ERROR org.apache.kafka.common.utils.KafkaThread : - Uncaught exception in 
> thread 'kafka-producer-network-thread | 
> producer-TRANSACTION_ID_PREFIX-1-17060':
> java.lang.OutOfMemoryError: Direct buffer memory\n\tat 
> java.nio.Bits.reserveMemory(Bits.java: 175)
> at java.nio.DirectByteBuffer.(DirectByteBuffer.java: 118)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java: 317)
> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java: 242)
> at sun.nio.ch.IOUtil.write(IOUtil.java: 164)
> at sun.nio.ch.IOUtil.write(IOUtil.java: 130)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java: 493)
> at java.nio.channels.SocketChannel.write(SocketChannel.java: 507)
> at 
> org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java:
>  152)
> at 
> org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java: 
> 58)
> at org.apache.kafka.common.network.NetworkSend.writeTo(NetworkSend.java: 41)
> at org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java: 430)
> at org.apache.kafka.common.network.Selector.write(Selector.java: 644)
> at org.apache.kafka.common.network.Selector.attemptWrite(Selector.java: 637)
> at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java: 
> 593)
> at org.apache.kafka.common.network.Selector.poll(Selector.java: 481)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java: 561)
> at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java: 
> 327)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java: 242)
> at java.lang.Thread.run(Thread.java: 829)
>
> What seems to be happening all of a sudden? Any suggestions on how to fix it?
>
> --
> Regards,
> Vishal


Job gets stuck when using kafka transactions and eventually crashes

2023-01-22 Thread Vishal Surana
My job runs fine when running without kafka transactions. The source and
sink are kafka in my job with a couple of RocksDB based stateful operators
taking 100GB each.

When I enable kafka transactions, things go well initially and we can see
high throughput as well. However, after a few hours, the job seems to get
stuck as it's unable to commit the transaction, due to which it's unable to
consume any more messages as we've enabled exactly once processing with
unaligned checkpoints. The number of hours it takes might vary but it
always happens and eventually the job crashes with this exception:

ERROR org.apache.kafka.common.utils.KafkaThread : - Uncaught exception
in thread 'kafka-producer-network-thread |
producer-TRANSACTION_ID_PREFIX-1-17060':
java.lang.OutOfMemoryError: Direct buffer memory\n\tat
java.nio.Bits.reserveMemory(Bits.java: 175)
at java.nio.DirectByteBuffer.(DirectByteBuffer.java: 118)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java: 317)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java: 242)
at sun.nio.ch.IOUtil.write(IOUtil.java: 164)
at sun.nio.ch.IOUtil.write(IOUtil.java: 130)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java: 493)
at java.nio.channels.SocketChannel.write(SocketChannel.java: 507)
at
org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java:
152)
at
org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java:
58)
at org.apache.kafka.common.network.NetworkSend.writeTo(NetworkSend.java:
41)
at org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java:
430)
at org.apache.kafka.common.network.Selector.write(Selector.java: 644)
at org.apache.kafka.common.network.Selector.attemptWrite(Selector.java:
637)
at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:
593)
at org.apache.kafka.common.network.Selector.poll(Selector.java: 481)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java: 561)
at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:
327)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java: 242)
at java.lang.Thread.run(Thread.java: 829)

What seems to be happening all of a sudden? Any suggestions on how to fix
it?

-- 
Regards,
Vishal


Re: Kafka transactioins & flink checkpoints

2022-11-16 Thread Vishal Surana
Yes. I do use RocksDB for (incremental) checkpointing. During each checkpoint 
15-20GB of state gets created (new state added, some expired). I make use of 
FIFO compaction.

I’m a bit surprised you were able to run with 10+TB state without unaligned 
checkpoints because the performance in my application degrades quite a lot. Can 
you share your checkpoint configurations?


Thanks,
Vishal
On 15 Nov 2022, 10:07 PM +0530, Yaroslav Tkachenko , 
wrote:
> Hi Vishal,
>
> Just wanted to comment on this bit:
>
> > My job has very large amount of state (>100GB) and I have no option but to 
> > use unaligned checkpoints.
>
> I successfully ran Flink jobs with 10+ TB of state and no unaligned 
> checkpoints enabled. Usually, you consider enabling them when there is some 
> kind of skew in the topology, but IMO it's unrelated to the state size.
>
> > Reducing the checkpoint interval is not really an option given the size of 
> > the checkpoint
>
> Do you use RocksDB state backend with incremental checkpointing?
>
> > On Tue, Nov 15, 2022 at 12:07 AM Vishal Surana  wrote:
> > > I wanted to achieve exactly once semantics in my job and wanted to make 
> > > sure I understood the current behaviour correctly:
> > >
> > > 1. Only one Kafka transaction at a time (no concurrent checkpoints)
> > > 2. Only one transaction per checkpoint
> > >
> > >
> > > My job has very large amount of state (>100GB) and I have no option but 
> > > to use unaligned checkpoints. With the above limitation, it seems to me 
> > > that if checkpoint interval is 1 minute and checkpoint takes about 10 
> > > seconds to complete then only one Kafka transaction can happen in 70 
> > > seconds. All of the output records will not be visible until the 
> > > transaction completes. This way a steady stream of inputs will result in 
> > > an buffered output stream where data is only visible after a minute, 
> > > thereby destroying any sort of real time streaming use cases. Reducing 
> > > the checkpoint interval is not really an option given the size of the 
> > > checkpoint. Only way out would be to allow for multiple transactions per 
> > > checkpoint.
> > >
> > > Thanks,
> > > Vishal


Kafka transactioins & flink checkpoints

2022-11-15 Thread Vishal Surana
I wanted to achieve exactly once semantics in my job and wanted to make sure I 
understood the current behaviour correctly:

1. Only one Kafka transaction at a time (no concurrent checkpoints)
2. Only one transaction per checkpoint


My job has very large amount of state (>100GB) and I have no option but to use 
unaligned checkpoints. With the above limitation, it seems to me that if 
checkpoint interval is 1 minute and checkpoint takes about 10 seconds to 
complete then only one Kafka transaction can happen in 70 seconds. All of the 
output records will not be visible until the transaction completes. This way a 
steady stream of inputs will result in an buffered output stream where data is 
only visible after a minute, thereby destroying any sort of real time streaming 
use cases. Reducing the checkpoint interval is not really an option given the 
size of the checkpoint. Only way out would be to allow for multiple 
transactions per checkpoint.

Thanks,
Vishal


When should we use flink-json instead of Jackson directly?

2022-10-28 Thread Vishal Surana
I've been using Jackson to deserialize JSON messages into Scala classes and
Java POJOs. The object mapper is heavily customized for our use cases. It
seems that flink-json internally uses Jackson as well and allows for
injecting our own mappers. Would there be any benefit of using flink-json
instead?

-- 
Thanks,
Vishal


Re: ContinuousFileMonitoringFunction retrieved invalid state.

2022-07-04 Thread Vishal Surana
Wow! This is bad! I am using reactive mode and this is indeed the issue.
This should have been urgently patched as jobs with upgraded Flink version
are in very precarious position. With all the other upgrades (rocksdb,
etc.) going into 1.15.0 there's no easy rollback.

On Fri, Jul 1, 2022 at 8:14 AM Lijie Wang  wrote:

> Hi,
> Are you using the reactive mode? There is a known issue like that:
> https://issues.apache.org/jira/browse/FLINK-28274
>
> Best,
> Lijie
>
> yuxia  于2022年7月1日周五 09:49写道:
>
>> I'm not sure why it happened. But from the Flink source code, it seems
>> try to restore from an invalid state.  Seems the state actually contains
>> more that one value, but Flink expected the state should contains one or
>> zero value.
>>
>> Best regards,
>> Yuxia
>>
>> --
>> *发件人: *"Vishal Surana" 
>> *收件人: *"User" 
>> *发送时间: *星期五, 2022年 7 月 01日 上午 5:28:07
>> *主题: *ContinuousFileMonitoringFunction retrieved invalid state.
>>
>> My job is unable to restore state after savepoint due to the following
>> exception. Seems to be a rare exception as I haven't found any forum
>> discussing it. Please advise.
>>
>> java.lang.IllegalArgumentException: ContinuousFileMonitoringFunction
>> retrieved invalid state.
>> at
>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>> at
>> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.initializeState(ContinuousFileMonitoringFunction.java:167)
>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>> at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>> at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:94)
>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>> at
>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>> at
>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>> at
>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>> at
>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>> at java.lang.Thread.run(Thread.java:829) ~[?:?]
>>
>> --
>> Regards,
>> Vishal
>>
>>

-- 
Regards,
Vishal


Can FIFO compaction with RocksDB result in data loss?

2022-07-04 Thread Vishal Surana
In my load tests, I've found FIFO compaction to offer the best performance
as my job needs state only for so long. However, this particular statement
in RocksDB documentation concerns me:

"Since we never rewrite the key-value pair, we also don't ever apply the
compaction filter on the keys."

This is from -
https://github.com/facebook/rocksdb/wiki/FIFO-compaction-style

I've observed that SST files are getting compacted into larger SST files
until a configured threshold is reached. Thus I'm not sure what's going on
anymore.

My questions at this stage are:

   1. If there's a value that I get from RocksDB and I decide to update
   this value back then will it work with FIFO compaction?
   2. SST files get created each time a checkpoint is triggered. At
   this point, does the data for a given key get merged in case the initial
   data was read from an SST file while the update must have happened in
   memory?
   3. If the answer to above is yes, then I suppose I can use FIFO
   compaction. However, then my question is whether the RocksDB documentation
   is wrong or whether Flink is doing something in addition to what RocksDB
   does.

Thanks!


ContinuousFileMonitoringFunction retrieved invalid state.

2022-06-30 Thread Vishal Surana
My job is unable to restore state after savepoint due to the following
exception. Seems to be a rare exception as I haven't found any forum
discussing it. Please advise.

java.lang.IllegalArgumentException: ContinuousFileMonitoringFunction
retrieved invalid state.
at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
at
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.initializeState(ContinuousFileMonitoringFunction.java:167)
~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:94)
~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
at java.lang.Thread.run(Thread.java:829) ~[?:?]

--
Regards,
Vishal


Re: Optimizing parallelism in reactive mode with adaptive scaling

2022-06-29 Thread Vishal Surana
I've attached a screenshot of the job which highlights the "missing slots".
[image: Screenshot 2022-06-29 at 9.38.54 PM.png]

Coming to slot sharing, it seems that slot sharing isn't being honored. It
doesn't matter if I put 2 or 3 of the 3 heavy weight operators - flink is
simply ignoring it and assigning them identical number of slots while not
putting them in the slot group. I say this because the number above in the
picture (145 slots) is arrived at by counting all the slots.

You can tell me what I'm doing wrong. I've asked for many more slots (~350)
but job manager isn't able to as presumably there aren't any available
slots.

Thanks for your reply!

-Vishal

On Wed, Jun 29, 2022 at 7:30 PM Weihua Hu  wrote:

> Hi, Vishal
>
> The reactive mode will adjust the parallelism of tasks by slots of
> cluster. it will not allocate new workers automatically.[1]
> 1. max parallelism only works to scale up the parallelism of tasks.  it
> will not affect the scheduling of tasks.
> 2. flink will enable slot sharing by default, use two slot sharing groups
> to split tasks will cause more slots in summary.
> 3. Are you sure there are 20 task managers running? Could you give a pic
> of your job UI?
> 4. That depends on whether you have some hot-key in data, if the hot-key
> data is dealt by the same operator, it will cause the overload.
>
> Maybe just giving this job more task managers and slots will solve the
> heavy operators.
>
>
> [1]https://flink.apache.org/2021/05/06/reactive-mode.html
>
> Best,
> Weihua
>
>
> On Wed, Jun 29, 2022 at 8:56 PM Vishal Surana  wrote:
>
>> I have a job which has about 10 operators, 3 of which are heavy weight. I
>> understand that the current implementation of autoscaling gives more or
>> less no configurability besides max parallelism. That is practically
>> useless as the operators I have will inevitably choke if one of the 3 ends
>> up with insufficient slots. I have explored the following:
>>
>>1. Set very high max parallelism for the most heavy weight operator
>>with the hope that flink can use this signal to allocate subtasks. But 
>> this
>>doesn't work
>>2. I used slot sharing to group 2 of the 3 operators and created a
>>slot sharing group for just the other one with the hope that it will
>>free up more slots. Both of these are stateful operators with RocksDB 
>> being
>>the state backend. However despite setting the same slot sharing group
>>name, they're scheduled independently and each of the three (successive)
>>operators end up with the exact same parallelism no matter how many task
>>managers are running. I say slot sharing doesn't work because if it did,
>>there would have been more available slots. It is curious that flink ends
>>up allocating an identical number of slots to each.
>>3. When slot sharing is enabled, my other jobs are able to work with
>>very few slots. In this job, I see the opposite. For instance, if I spin 
>> up
>>20 task managers each with 16 slots, then there are 320 available slots.
>>However once the job starts, the job itself says ~275 slots are used and
>>the number of available slots in the GUI is 0. I have verified that 275 is
>>the correct number by examining the number of subtasks of each operator.
>>How can that be? Where are the remaining slots?
>>4. While the data is partitioned by a hash function that ought to
>>more or less distribute data randomly across operators, I can see that 
>> some
>>operators are overloaded while others aren't. Does flink try to avoid
>>uniformly distributing load for any reason, possibly to reduce network? Is
>>there a way to disable such a feature?
>>
>> I'm running flink version 1.13.5 but I didn't see any related change in
>> recent versions of flink.
>>
>> Thanks a lot!
>>
>> --
>> Vishal
>>
>

-- 
Regards,
Vishal


Optimizing parallelism in reactive mode with adaptive scaling

2022-06-29 Thread Vishal Surana
I have a job which has about 10 operators, 3 of which are heavy weight. I
understand that the current implementation of autoscaling gives more or
less no configurability besides max parallelism. That is practically
useless as the operators I have will inevitably choke if one of the 3 ends
up with insufficient slots. I have explored the following:

   1. Set very high max parallelism for the most heavy weight operator with
   the hope that flink can use this signal to allocate subtasks. But this
   doesn't work
   2. I used slot sharing to group 2 of the 3 operators and created a slot
   sharing group for just the other one with the hope that it will free up
   more slots. Both of these are stateful operators with RocksDB being the
   state backend. However despite setting the same slot sharing group name,
   they're scheduled independently and each of the three (successive)
   operators end up with the exact same parallelism no matter how many task
   managers are running. I say slot sharing doesn't work because if it did,
   there would have been more available slots. It is curious that flink ends
   up allocating an identical number of slots to each.
   3. When slot sharing is enabled, my other jobs are able to work with
   very few slots. In this job, I see the opposite. For instance, if I spin up
   20 task managers each with 16 slots, then there are 320 available slots.
   However once the job starts, the job itself says ~275 slots are used and
   the number of available slots in the GUI is 0. I have verified that 275 is
   the correct number by examining the number of subtasks of each operator.
   How can that be? Where are the remaining slots?
   4. While the data is partitioned by a hash function that ought to more
   or less distribute data randomly across operators, I can see that some
   operators are overloaded while others aren't. Does flink try to avoid
   uniformly distributing load for any reason, possibly to reduce network? Is
   there a way to disable such a feature?

I'm running flink version 1.13.5 but I didn't see any related change in
recent versions of flink.

Thanks a lot!

--
Vishal


Re: Broadcast State + Stateful Operator + Async IO

2022-04-29 Thread Vishal Surana
Thanks a lot for your quick response! Your suggestion however would never
work for our use case. Ours is a streaming system that must process 100
thousand messages per second and produce immediate results and it's simply
impossible to rerun the job.

Our job is a streaming job broken down into various operators with very
strict latency requirements (less than 10 seconds at all times). There
could be multiple messages for a given entity in quick succession and
ordered processing is another strict requirement. Question is how can we
best leverage flink's features of stateful stream processing as well as
async IO.


Re: Broadcast State + Stateful Operator + Async IO

2022-04-29 Thread Vishal Surana
Yes. You have explained my requirements exactly as they are. My operator
will talk to multiple databases and a couple of web services to enrich
incoming input streams. I cannot think of a way to use the async IO
operator. So I thought maybe convert these 7-10 calls into async calls and
chain the Futures together. I believe I have to block once in the end of
the KeyedBroadcastProcessFunction but if there's a way to avoid that also
while also ensuring ordered processing of events, then do let me know.

On Fri, Apr 29, 2022 at 7:35 AM Guowei Ma  wrote:

> Hi Vishal
>
> I want to understand your needs first. Your requirements are: After a
> stateful operator receives a notification, it needs to traverse all the
> data stored in the operator state, communicate with an external system
> during the traversal process (maybe similar to join?). In order to improve
> the efficiency of  this behavior, you want to take an asynchronous
> approach. That is, if you modify the state of different keys, do not block
> each other due to external communication.
> If I understand correctly, according to the existing function of
> KeyedBroadcastProcessFunction, it is really impossible.
> As for whether there are other solutions, it may depend on specific
> scenarios, such as what kind of external system. So could you describe in
> detail what scenario has this requirement, and what are the external
> systems it depends on?
>
> Best,
> Guowei
>
>
> On Fri, Apr 29, 2022 at 12:42 AM Vishal Surana 
> wrote:
>
>> Hello,
>> My application has a stateful operator which leverages RocksDB to store a
>> large amount of state. It, along with other operators receive configuration
>> as a broadcast stream (KeyedBroadcastProcessFunction). The operator depends
>> upon another input stream that triggers some communication with external
>> services whose results are then combined to yield the state that gets
>> stored in RocksDB.
>>
>> In order to make the application more efficient, I am going to switch to
>> asynchronous IO but as the result is ultimately going to be a (Scala)
>> Future, I will have to block once to get the result. I was hoping to
>> leverage the Async IO operator but that apparently doesn't support RocksDB
>> based state storage. Am I correct in saying
>> that KeyedBroadcastProcessFunction is the only option I have? If so, then I
>> want to understand how registering a future's callbacks (via onComplete)
>> works with a synchronous operator such as KeyedBroadcastProcessFunction.
>> Will the thread executing the function simply relinquish control to some
>> other subtask while the results of the external services are being awaited?
>> Will the callback eventually be triggered automatically or will I have to
>> explicitly block on the result future like so: Await.result(f, timeout).
>>
>> --
>> Regards,
>> Vishal
>>
>

-- 
Regards,
Vishal


Broadcast State + Stateful Operator + Async IO

2022-04-28 Thread Vishal Surana
Hello,
My application has a stateful operator which leverages RocksDB to store a
large amount of state. It, along with other operators receive configuration
as a broadcast stream (KeyedBroadcastProcessFunction). The operator depends
upon another input stream that triggers some communication with external
services whose results are then combined to yield the state that gets
stored in RocksDB.

In order to make the application more efficient, I am going to switch to
asynchronous IO but as the result is ultimately going to be a (Scala)
Future, I will have to block once to get the result. I was hoping to
leverage the Async IO operator but that apparently doesn't support RocksDB
based state storage. Am I correct in saying
that KeyedBroadcastProcessFunction is the only option I have? If so, then I
want to understand how registering a future's callbacks (via onComplete)
works with a synchronous operator such as KeyedBroadcastProcessFunction.
Will the thread executing the function simply relinquish control to some
other subtask while the results of the external services are being awaited?
Will the callback eventually be triggered automatically or will I have to
explicitly block on the result future like so: Await.result(f, timeout).

-- 
Regards,
Vishal


Disabling autogenerated uid/hash doesn't work when using file source

2021-08-25 Thread Vishal Surana
I set names and uid for all my flink operators and have explicitly disabled 
auto generation of uid to force developers in my team the same practice. 
However, when using a file source, there's no option of providing it due to 
which the job fails to start unless we enable auto generation. Am I doing 
something wrong?


Protobuf + Confluent Schema Registry support

2021-06-30 Thread Vishal Surana
Using the vanilla kafka producer, I can write protobuf messages to kafka while 
leveraging schema registry support as well. A flink kafka producer requires us 
to explicity provide a serializer which converts the message to a 
producerrecord containing the serialized bytes of the message. We can't make 
use of the KafkaProtoobufSerializer[T] provided by Confluent. Thus the only way 
I could think of would be to create an instance of KafkaProtobufSerializer 
inside a FlinkSerializationSchema class and use it to serialize my messages. 
The problem with that would be that I would have to implement registration of 
the schema and other tasks done by KafkaProtobufSerializer. 

Is there any other way to solve this problem? 
Is there a plan to support protobuf serialization along with schema registry 
support?
I noticed you've recently added Avro + Schema Registry support to your codebase 
but haven't documented it. Is it ready for use?