xtern commented on code in PR #2678: URL: https://github.com/apache/ignite-3/pull/2678#discussion_r1358416964
########## modules/core/src/main/java/org/apache/ignite/internal/util/subscription/IterableToPublisherAdapter.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.subscription; + +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodHandles.Lookup; +import java.lang.invoke.VarHandle; +import java.util.Iterator; +import java.util.concurrent.Executor; +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; + +/** + * An adapter that issues a new iterator for every subscription and drains that iterator on a given + * executor, emitting up to given {@code batchSize} entries at time. The drain task will be re-scheduled + * until demand is fulfilled or iterator has no more items. + * + * @param <T> The type of the entry this publisher will emit. + */ +public class IterableToPublisherAdapter<T> implements Publisher<T> { + private final Iterable<T> iterable; + private final Executor executor; + private final int batchSize; + + /** + * Constructor. + * + * @param iterable An iterable to issue iterator for every incoming subscription. + * @param executor This executor will be used to drain iterator and supply entries to the subscription. + * @param batchSize An amount of entries to supply during a single iteration. It's always good idea + * to provide some reasonable value here in order to give am ability to other publishers which share the same + * executor to make progress. + */ + public IterableToPublisherAdapter(Iterable<T> iterable, Executor executor, int batchSize) { + this.iterable = iterable; + this.executor = executor; + this.batchSize = batchSize; + } + + @Override + public void subscribe(Subscriber<? super T> subscriber) { + Iterator<T> it = iterable.iterator(); + + Subscription subscription = new SubscriptionImpl<>(it, subscriber, executor, batchSize); + + subscriber.onSubscribe(subscription); + } + + @SuppressWarnings("FieldMayBeFinal") + private static class SubscriptionImpl<T> implements Subscription { + private static final VarHandle CANCELLED_HANDLE; + private static final VarHandle REQUESTED_HANDLE; + private static final VarHandle WIP_HANDLE; + + static { + try { + Lookup lookup = MethodHandles.lookup(); + + CANCELLED_HANDLE = lookup.findVarHandle(SubscriptionImpl.class, "cancelled", boolean.class); + REQUESTED_HANDLE = lookup.findVarHandle(SubscriptionImpl.class, "requested", long.class); + WIP_HANDLE = lookup.findVarHandle(SubscriptionImpl.class, "wip", boolean.class); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new ExceptionInInitializerError(e); + } + } + + private final Iterator<T> it; + private final Subscriber<? super T> subscriber; + private final Executor executor; + private final int batchSize; + + private boolean cancelled = false; + private long requested = 0; + private boolean wip = false; + + SubscriptionImpl(Iterator<T> it, Subscriber<? super T> subscriber, Executor executor, int batchSize) { + this.it = it; + this.subscriber = subscriber; + this.executor = executor; + this.batchSize = batchSize; + } + + @Override + public void request(long n) { + if (n <= 0) { + notifyError(new IllegalArgumentException("N should be positive: n")); Review Comment: This message looks strange, perhaps it meant something like: ```suggestion notifyError(new IllegalArgumentException("N should be positive: " + n)); ``` ########## modules/core/src/main/java/org/apache/ignite/internal/util/subscription/IterableToPublisherAdapter.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.subscription; + +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodHandles.Lookup; +import java.lang.invoke.VarHandle; +import java.util.Iterator; +import java.util.concurrent.Executor; +import java.util.concurrent.Flow.Publisher; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; + +/** + * An adapter that issues a new iterator for every subscription and drains that iterator on a given + * executor, emitting up to given {@code batchSize} entries at time. The drain task will be re-scheduled + * until demand is fulfilled or iterator has no more items. + * + * @param <T> The type of the entry this publisher will emit. + */ +public class IterableToPublisherAdapter<T> implements Publisher<T> { + private final Iterable<T> iterable; + private final Executor executor; + private final int batchSize; + + /** + * Constructor. + * + * @param iterable An iterable to issue iterator for every incoming subscription. + * @param executor This executor will be used to drain iterator and supply entries to the subscription. + * @param batchSize An amount of entries to supply during a single iteration. It's always good idea + * to provide some reasonable value here in order to give am ability to other publishers which share the same + * executor to make progress. + */ + public IterableToPublisherAdapter(Iterable<T> iterable, Executor executor, int batchSize) { + this.iterable = iterable; + this.executor = executor; + this.batchSize = batchSize; + } + + @Override + public void subscribe(Subscriber<? super T> subscriber) { + Iterator<T> it = iterable.iterator(); + + Subscription subscription = new SubscriptionImpl<>(it, subscriber, executor, batchSize); + + subscriber.onSubscribe(subscription); + } + + @SuppressWarnings("FieldMayBeFinal") + private static class SubscriptionImpl<T> implements Subscription { + private static final VarHandle CANCELLED_HANDLE; + private static final VarHandle REQUESTED_HANDLE; + private static final VarHandle WIP_HANDLE; Review Comment: Personally I don't see the point of using varhandles instead of atomics here. From my point of view the code with atomics will look shorter and cleaner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
