[ https://issues.apache.org/jira/browse/IGNITE-1191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14698833#comment-14698833 ]
Alexey Goncharuk edited comment on IGNITE-1191 at 9/2/15 10:40 PM: ------------------------------------------------------------------- I'd like to suggest preliminary API for continuous queries that will allow to implement all changes listed in subtasks of this ticket. I think we should extend {{CacheEntryListenerConfiguration}} available in JCache and add a flag that defines whether initial iteration is required (updates for it will go to the same listener): {code} public class ContinuousQueryConfiguration<K, V> extends MutableCacheEntryListenerConfiguration<K, V> { /** Is initial iteration required. */ private boolean initIterRequired; /** * Creates new continuous query configuration. */ public ContinuousQueryConfiguration() { super(null, null, false, false); } /** * Creates new continuous query configuration. * * @param listenerFactory Local listener factory. * @param filterFactory Remote filter factory. * @param isOldValueRequired Is old value required. * @param isSynchronous Is synchronous. * @param initIterRequired Is initial iteration required. */ public ContinuousQueryConfiguration( Factory<? extends CacheEntryListener<? super K, ? super V>> listenerFactory, Factory<? extends CacheEntryEventFilter<? super K, ? super V>> filterFactory, boolean isOldValueRequired, boolean isSynchronous, boolean initIterRequired) { super(listenerFactory, filterFactory, isOldValueRequired, isSynchronous); this.initIterRequired = initIterRequired; } /** * Sets whether initial iteration is required. * * @param initIterRequired {@code True} if initial iteration required. */ public void setInitialIterationRequired(boolean initIterRequired) { this.initIterRequired = initIterRequired; } /** * Tells whether initial iteration is required. * * @return {@code True} if initial iteration required. */ public boolean isInitialIterationRequired() { return initIterRequired; } } {code} To support transformers we can add one more extension. I created separate class because of more complicated generics: {code} public class TransformContinuousQueryConfiguration<K, V, R> extends ContinuousQueryConfiguration<K, R> { /** Transformer factory. */ private Factory<IgniteClosure<? super V, ? super R>> transformerFactory; /** * Creates new continuous query configuration. */ public TransformContinuousQueryConfiguration() { super(); } /** * Creates new continuous query configuration. * * @param listenerFactory Local listener factory. * @param filterFactory Remote filter factory. * @param isOldValueRequired Is old value required. * @param isSynchronous Is synchronous. * @param initIterRequired Is initial iteration required. * @param transformerFactory Transformer factory. */ public TransformContinuousQueryConfiguration( Factory<? extends CacheEntryListener<? super K, ? super R>> listenerFactory, Factory<? extends CacheEntryEventFilter<? super K, ? super V>> filterFactory, boolean isOldValueRequired, boolean isSynchronous, boolean initIterRequired, Factory<IgniteClosure<? super V, ? super R>> transformerFactory) { super(listenerFactory, filterFactory, isOldValueRequired, isSynchronous, initIterRequired); this.transformerFactory = transformerFactory; } /** * Sets transformer factory. * * @param transformerFactory Transformer factory. */ public void setTransformerFactory(Factory<IgniteClosure<? super V, ? super R>> transformerFactory) { this.transformerFactory = transformerFactory; } /** * Gets transformer factory. * * @return Transformer factory. */ public Factory<IgniteClosure<? super V, ? super R>> getTransformerFactory() { return transformerFactory; } } {code} {{IgniteCache}} should have two new methods: {code} public AutoCloseable queryContinuous(ContinuousQueryConfiguration<K, V> cfg); public <R> AutoCloseable queryContinuous(TransformContinuousQueryConfiguration<K, V, R> cfg); {code} Returned {{AutoCloseable}} instance allows to deregister a query. was (Author: vkulichenko): I'd like to suggest preliminary API for continuous queries that will allow to implement all changes listed in subtasks of this ticket. I think we should extend {{CacheEntryListenerConfiguration}} available in JCache and add a flag that defines whether initial iteration is required (updates for it will go to the same listener): {code} public class ContinuousQueryConfiguration<K, V> extends MutableCacheEntryListenerConfiguration<K, V> { /** Is initial iteration required. */ private boolean initIterRequired; /** * Creates new continuous query configuration. */ public ContinuousQueryConfiguration() { super(null, null, false, false); } /** * Creates new continuous query configuration. * * @param listenerFactory Local listener factory. * @param filterFactory Remote filter factory. * @param isOldValueRequired Is old value required. * @param isSynchronous Is synchronous. * @param initIterRequired Is initial iteration required. */ public ContinuousQueryConfiguration( Factory<? extends CacheEntryListener<? super K, ? super V>> listenerFactory, Factory<? extends CacheEntryEventFilter<? super K, ? super V>> filterFactory, boolean isOldValueRequired, boolean isSynchronous, boolean initIterRequired) { super(listenerFactory, filterFactory, isOldValueRequired, isSynchronous); this.initIterRequired = initIterRequired; } /** * Sets whether initial iteration is required. * * @param initIterRequired {@code True} if initial iteration required. */ public void setInitialIterationRequired(boolean initIterRequired) { this.initIterRequired = initIterRequired; } /** * Tells whether initial iteration is required. * * @return {@code True} if initial iteration required. */ public boolean isInitialIterationRequired() { return initIterRequired; } } {code} To support transformers we can add one more extension. I created separate class because of more complicated generics: {code} public class TransformContinuousQueryConfiguration<K, V, R> extends ContinuousQueryConfiguration<K, R> { /** Transformer factory. */ private Factory<IgniteClosure<? super V, ? super R>> transformerFactory; /** * Creates new continuous query configuration. */ public TransformContinuousQueryConfiguration() { super(); } /** * Creates new continuous query configuration. * * @param listenerFactory Local listener factory. * @param filterFactory Remote filter factory. * @param isOldValueRequired Is old value required. * @param isSynchronous Is synchronous. * @param initIterRequired Is initial iteration required. * @param transformerFactory Transformer factory. */ public TransformContinuousQueryConfiguration( Factory<? extends CacheEntryListener<? super K, ? super R>> listenerFactory, Factory<? extends CacheEntryEventFilter<? super K, ? super R>> filterFactory, boolean isOldValueRequired, boolean isSynchronous, boolean initIterRequired, Factory<IgniteClosure<? super V, ? super R>> transformerFactory) { super(listenerFactory, filterFactory, isOldValueRequired, isSynchronous, initIterRequired); this.transformerFactory = transformerFactory; } /** * Sets transformer factory. * * @param transformerFactory Transformer factory. */ public void setTransformerFactory(Factory<IgniteClosure<? super V, ? super R>> transformerFactory) { this.transformerFactory = transformerFactory; } /** * Gets transformer factory. * * @return Transformer factory. */ public Factory<IgniteClosure<? super V, ? super R>> getTransformerFactory() { return transformerFactory; } } {code} {{IgniteCache}} should have two new methods: {code} public AutoCloseable queryContinuous(ContinuousQueryConfiguration<K, V> cfg); public <R> AutoCloseable queryContinuous(TransformContinuousQueryConfiguration<K, V, R> cfg); {code} Returned {{AutoCloseable}} instance allows to deregister a query. > Continuous Query Changes > ------------------------ > > Key: IGNITE-1191 > URL: https://issues.apache.org/jira/browse/IGNITE-1191 > Project: Ignite > Issue Type: Improvement > Components: cache > Reporter: Dmitriy Setrakyan > -- This message was sent by Atlassian JIRA (v6.3.4#6332)