[ 
https://issues.apache.org/jira/browse/IGNITE-1191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14698833#comment-14698833
 ] 

Alexey Goncharuk edited comment on IGNITE-1191 at 9/2/15 10:40 PM:
-------------------------------------------------------------------

I'd like to suggest preliminary API for continuous queries that will allow to 
implement all changes listed in subtasks of this ticket.

I think we should extend {{CacheEntryListenerConfiguration}} available in 
JCache and add a flag that defines whether initial iteration is required 
(updates for it will go to the same listener):
{code}
public class ContinuousQueryConfiguration<K, V> extends 
MutableCacheEntryListenerConfiguration<K, V> {
    /** Is initial iteration required. */
    private boolean initIterRequired;

    /**
     * Creates new continuous query configuration.
     */
    public ContinuousQueryConfiguration() {
        super(null, null, false, false);
    }

    /**
     * Creates new continuous query configuration.
     *
     * @param listenerFactory Local listener factory.
     * @param filterFactory Remote filter factory.
     * @param isOldValueRequired Is old value required.
     * @param isSynchronous Is synchronous.
     * @param initIterRequired Is initial iteration required.
     */
    public ContinuousQueryConfiguration(
        Factory<? extends CacheEntryListener<? super K, ? super V>> 
listenerFactory,
        Factory<? extends CacheEntryEventFilter<? super K, ? super V>> 
filterFactory,
        boolean isOldValueRequired,
        boolean isSynchronous,
        boolean initIterRequired) {
        super(listenerFactory, filterFactory, isOldValueRequired, 
isSynchronous);

        this.initIterRequired = initIterRequired;
    }

    /**
     * Sets whether initial iteration is required.
     *
     * @param initIterRequired {@code True} if initial iteration required.
     */
    public void setInitialIterationRequired(boolean initIterRequired) {
        this.initIterRequired = initIterRequired;
    }

    /**
     * Tells whether initial iteration is required.
     *
     * @return {@code True} if initial iteration required.
     */
    public boolean isInitialIterationRequired() {
        return initIterRequired;
    }
}
{code}
To support transformers we can add one more extension. I created separate class 
because of more complicated generics:
{code}
public class TransformContinuousQueryConfiguration<K, V, R> extends 
ContinuousQueryConfiguration<K, R> {
    /** Transformer factory. */
    private Factory<IgniteClosure<? super V, ? super R>> transformerFactory;

    /**
     * Creates new continuous query configuration.
     */
    public TransformContinuousQueryConfiguration() {
        super();
    }

    /**
     * Creates new continuous query configuration.
     *
     * @param listenerFactory Local listener factory.
     * @param filterFactory Remote filter factory.
     * @param isOldValueRequired Is old value required.
     * @param isSynchronous Is synchronous.
     * @param initIterRequired Is initial iteration required.
     * @param transformerFactory Transformer factory.
     */
    public TransformContinuousQueryConfiguration(
        Factory<? extends CacheEntryListener<? super K, ? super R>> 
listenerFactory,
        Factory<? extends CacheEntryEventFilter<? super K, ? super V>> 
filterFactory,
        boolean isOldValueRequired,
        boolean isSynchronous,
        boolean initIterRequired,
        Factory<IgniteClosure<? super V, ? super R>> transformerFactory) {
        super(listenerFactory, filterFactory, isOldValueRequired, 
isSynchronous, initIterRequired);

        this.transformerFactory = transformerFactory;
    }

    /**
     * Sets transformer factory.
     *
     * @param transformerFactory Transformer factory.
     */
    public void setTransformerFactory(Factory<IgniteClosure<? super V, ? super 
R>> transformerFactory) {
        this.transformerFactory = transformerFactory;
    }

    /**
     * Gets transformer factory.
     *
     * @return Transformer factory.
     */
    public Factory<IgniteClosure<? super V, ? super R>> getTransformerFactory() 
{
        return transformerFactory;
    }
}
{code}
{{IgniteCache}} should have two new methods:
{code}
public AutoCloseable queryContinuous(ContinuousQueryConfiguration<K, V> cfg);
public <R> AutoCloseable 
queryContinuous(TransformContinuousQueryConfiguration<K, V, R> cfg);
{code}
Returned {{AutoCloseable}} instance allows to deregister a query.


was (Author: vkulichenko):
I'd like to suggest preliminary API for continuous queries that will allow to 
implement all changes listed in subtasks of this ticket.

I think we should extend {{CacheEntryListenerConfiguration}} available in 
JCache and add a flag that defines whether initial iteration is required 
(updates for it will go to the same listener):
{code}
public class ContinuousQueryConfiguration<K, V> extends 
MutableCacheEntryListenerConfiguration<K, V> {
    /** Is initial iteration required. */
    private boolean initIterRequired;

    /**
     * Creates new continuous query configuration.
     */
    public ContinuousQueryConfiguration() {
        super(null, null, false, false);
    }

    /**
     * Creates new continuous query configuration.
     *
     * @param listenerFactory Local listener factory.
     * @param filterFactory Remote filter factory.
     * @param isOldValueRequired Is old value required.
     * @param isSynchronous Is synchronous.
     * @param initIterRequired Is initial iteration required.
     */
    public ContinuousQueryConfiguration(
        Factory<? extends CacheEntryListener<? super K, ? super V>> 
listenerFactory,
        Factory<? extends CacheEntryEventFilter<? super K, ? super V>> 
filterFactory,
        boolean isOldValueRequired,
        boolean isSynchronous,
        boolean initIterRequired) {
        super(listenerFactory, filterFactory, isOldValueRequired, 
isSynchronous);

        this.initIterRequired = initIterRequired;
    }

    /**
     * Sets whether initial iteration is required.
     *
     * @param initIterRequired {@code True} if initial iteration required.
     */
    public void setInitialIterationRequired(boolean initIterRequired) {
        this.initIterRequired = initIterRequired;
    }

    /**
     * Tells whether initial iteration is required.
     *
     * @return {@code True} if initial iteration required.
     */
    public boolean isInitialIterationRequired() {
        return initIterRequired;
    }
}
{code}
To support transformers we can add one more extension. I created separate class 
because of more complicated generics:
{code}
public class TransformContinuousQueryConfiguration<K, V, R> extends 
ContinuousQueryConfiguration<K, R> {
    /** Transformer factory. */
    private Factory<IgniteClosure<? super V, ? super R>> transformerFactory;

    /**
     * Creates new continuous query configuration.
     */
    public TransformContinuousQueryConfiguration() {
        super();
    }

    /**
     * Creates new continuous query configuration.
     *
     * @param listenerFactory Local listener factory.
     * @param filterFactory Remote filter factory.
     * @param isOldValueRequired Is old value required.
     * @param isSynchronous Is synchronous.
     * @param initIterRequired Is initial iteration required.
     * @param transformerFactory Transformer factory.
     */
    public TransformContinuousQueryConfiguration(
        Factory<? extends CacheEntryListener<? super K, ? super R>> 
listenerFactory,
        Factory<? extends CacheEntryEventFilter<? super K, ? super R>> 
filterFactory,
        boolean isOldValueRequired,
        boolean isSynchronous,
        boolean initIterRequired,
        Factory<IgniteClosure<? super V, ? super R>> transformerFactory) {
        super(listenerFactory, filterFactory, isOldValueRequired, 
isSynchronous, initIterRequired);

        this.transformerFactory = transformerFactory;
    }

    /**
     * Sets transformer factory.
     *
     * @param transformerFactory Transformer factory.
     */
    public void setTransformerFactory(Factory<IgniteClosure<? super V, ? super 
R>> transformerFactory) {
        this.transformerFactory = transformerFactory;
    }

    /**
     * Gets transformer factory.
     *
     * @return Transformer factory.
     */
    public Factory<IgniteClosure<? super V, ? super R>> getTransformerFactory() 
{
        return transformerFactory;
    }
}
{code}
{{IgniteCache}} should have two new methods:
{code}
public AutoCloseable queryContinuous(ContinuousQueryConfiguration<K, V> cfg);
public <R> AutoCloseable 
queryContinuous(TransformContinuousQueryConfiguration<K, V, R> cfg);
{code}
Returned {{AutoCloseable}} instance allows to deregister a query.

> Continuous Query Changes
> ------------------------
>
>                 Key: IGNITE-1191
>                 URL: https://issues.apache.org/jira/browse/IGNITE-1191
>             Project: Ignite
>          Issue Type: Improvement
>          Components: cache
>            Reporter: Dmitriy Setrakyan
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to