Re: KafkaStreamer, how to manage (stop consumming, resume) on client disconnection
flush() guarantees completion of all futures returned by addData(Object, Object) https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/IgniteDataStreamer.html#flush-- flush() will send the batch, but it is still possible for the server to crash before the message reaches it. If you need verify whether the particular put actually made it to the server, appropriate events are available. use these events: https://ignite.apache.org/docs/latest/events/events#cache-events You can put in retry logic if the client has disconnected. One possibility is to use the CacheException as per the contract of the flush() method you will get something like this: javax.cache.CacheException: class org.apache.ignite.IgniteClientDisconnectedException: Data streamer has been closed, client node disconnected. at org.apache.ignite.internal.processors.cache.GridCacheUtils.convertToCacheException(GridCacheUtils.java:1275) at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl.doFlush(DataStreamerImpl.java:1204) -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: KafkaStreamer, how to manage (stop consumming, resume) on client disconnection
Thanks for the answers. I resolved the problem of reconnection using events. It worked very well. What I found, is the following... The KafkaStreamer consumes records and send them to the IgniteDataStreamer. It doesn't handle the IgniteFuture returned. If the connection with the server is interrupted (server restart for example) the KafkaStreamer is stoped, kafka consumers are stoped, but those records that were sent to the streamer and (I believe) are in the buffer are still trying to be saved in the cache. There is no way to recover them as far as I know. Am I right? Should I implement a custom KafkaStreamer that, in that situation, handles the IgniteFuture and let's say retry the insertion in the cache? Another question, I'm using a grid service to start the streamer. What is the benefit of this vs a simple spring service if I'm using kubernetes for deployment? On Fri, Nov 20, 2020 at 5:01 PM akorensh wrote: > Hi, > I think listening to events would be a good solution for you. > > There are two discovery events that are triggered on the client node when > it > is disconnected from or reconnected to the cluster: > > EVT_CLIENT_NODE_DISCONNECTED > > EVT_CLIENT_NODE_RECONNECTED > > see: > > https://ignite.apache.org/docs/latest/clustering/connect-client-nodes#client-disconnectedreconnected-events > > > As for StreamReceiver: Keep in mind that the logic implemented in a stream > receiver is executed on the node where data is to be stored. If the server > where the data resides crashes, your code might not execute. > https://ignite.apache.org/docs/latest/data-streaming#stream-visitor > > Thanks, Alex > > > > > > -- > Sent from: http://apache-ignite-users.70518.x6.nabble.com/ > -- Facundo Maldonado
Re: KafkaStreamer, how to manage (stop consumming, resume) on client disconnection
Hi, I think listening to events would be a good solution for you. There are two discovery events that are triggered on the client node when it is disconnected from or reconnected to the cluster: EVT_CLIENT_NODE_DISCONNECTED EVT_CLIENT_NODE_RECONNECTED see: https://ignite.apache.org/docs/latest/clustering/connect-client-nodes#client-disconnectedreconnected-events As for StreamReceiver: Keep in mind that the logic implemented in a stream receiver is executed on the node where data is to be stored. If the server where the data resides crashes, your code might not execute. https://ignite.apache.org/docs/latest/data-streaming#stream-visitor Thanks, Alex -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: KafkaStreamer, how to manage (stop consumming, resume) on client disconnection
I forgot to mention, I'm starting the KafkaStreamer in a cluster service. Pretty similar to all the examples that are around. I saw the exception in the documentation, my concern here is where should I catch it given that I initialize and setup the streamer on the init() method and start it in the execute()? Should I create a custom implementation of a StreamReceiver (holding a reference to the KafkaStreamer) that actually call the cache.put() method, cach the exception and stop the streamer? I didn't take into account the event stuff, the solution may be on that path. I think is valid to add a listener on the service init() method, right? Thanks Alex for your answer. -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/
Re: KafkaStreamer, how to manage (stop consumming, resume) on client disconnection
Hi, You can use disconnect events/exception, and then use KafkaStreamer.stop. see: https://ignite.apache.org/docs/latest/clustering/connect-client-nodes#client-disconnectedreconnected-events https://ignite.apache.org/docs/latest/clustering/connect-client-nodes Here look for: While a client is in a disconnected state and an attempt to reconnect is in progress, the Ignite API throws a IgniteClientDisconnectedException KafkaStreamer stop method: https://ignite.apache.org/releases/latest/javadoc/org/apache/ignite/stream/kafka/KafkaStreamer.html#stop-- Thanks, Alex -- Sent from: http://apache-ignite-users.70518.x6.nabble.com/