xtern commented on code in PR #2678:
URL: https://github.com/apache/ignite-3/pull/2678#discussion_r1358416964


##########
modules/core/src/main/java/org/apache/ignite/internal/util/subscription/IterableToPublisherAdapter.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.subscription;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodHandles.Lookup;
+import java.lang.invoke.VarHandle;
+import java.util.Iterator;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+
+/**
+ * An adapter that issues a new iterator for every subscription and drains 
that iterator on a given
+ * executor, emitting up to given {@code batchSize} entries at time. The drain 
task will be re-scheduled
+ * until demand is fulfilled or iterator has no more items.
+ *
+ * @param <T> The type of the entry this publisher will emit.
+ */
+public class IterableToPublisherAdapter<T> implements Publisher<T> {
+    private final Iterable<T> iterable;
+    private final Executor executor;
+    private final int batchSize;
+
+    /**
+     * Constructor.
+     *
+     * @param iterable An iterable to issue iterator for every incoming 
subscription.
+     * @param executor This executor will be used to drain iterator and supply 
entries to the subscription.
+     * @param batchSize An amount of entries to supply during a single 
iteration. It's always good idea
+     *      to provide some reasonable value here in order to give am ability 
to other publishers which share the same
+     *      executor to make progress.
+     */
+    public IterableToPublisherAdapter(Iterable<T> iterable, Executor executor, 
int batchSize) {
+        this.iterable = iterable;
+        this.executor = executor;
+        this.batchSize = batchSize;
+    }
+
+    @Override
+    public void subscribe(Subscriber<? super T> subscriber) {
+        Iterator<T> it = iterable.iterator();
+
+        Subscription subscription = new SubscriptionImpl<>(it, subscriber, 
executor, batchSize);
+
+        subscriber.onSubscribe(subscription);
+    }
+
+    @SuppressWarnings("FieldMayBeFinal")
+    private static class SubscriptionImpl<T> implements Subscription {
+        private static final VarHandle CANCELLED_HANDLE;
+        private static final VarHandle REQUESTED_HANDLE;
+        private static final VarHandle WIP_HANDLE;
+
+        static {
+            try {
+                Lookup lookup = MethodHandles.lookup();
+
+                CANCELLED_HANDLE = 
lookup.findVarHandle(SubscriptionImpl.class, "cancelled", boolean.class);
+                REQUESTED_HANDLE = 
lookup.findVarHandle(SubscriptionImpl.class, "requested", long.class);
+                WIP_HANDLE = lookup.findVarHandle(SubscriptionImpl.class, 
"wip", boolean.class);
+            } catch (NoSuchFieldException | IllegalAccessException e) {
+                throw new ExceptionInInitializerError(e);
+            }
+        }
+
+        private final Iterator<T> it;
+        private final Subscriber<? super T> subscriber;
+        private final Executor executor;
+        private final int batchSize;
+
+        private boolean cancelled = false;
+        private long requested = 0;
+        private boolean wip = false;
+
+        SubscriptionImpl(Iterator<T> it, Subscriber<? super T> subscriber, 
Executor executor, int batchSize) {
+            this.it = it;
+            this.subscriber = subscriber;
+            this.executor = executor;
+            this.batchSize = batchSize;
+        }
+
+        @Override
+        public void request(long n) {
+            if (n <= 0) {
+                notifyError(new IllegalArgumentException("N should be 
positive: n"));

Review Comment:
   This message looks strange, perhaps it meant something like:
   
   ```suggestion
                   notifyError(new IllegalArgumentException("N should be 
positive: " + n));
   ```



##########
modules/core/src/main/java/org/apache/ignite/internal/util/subscription/IterableToPublisherAdapter.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.subscription;
+
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodHandles.Lookup;
+import java.lang.invoke.VarHandle;
+import java.util.Iterator;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.Flow.Subscription;
+
+/**
+ * An adapter that issues a new iterator for every subscription and drains 
that iterator on a given
+ * executor, emitting up to given {@code batchSize} entries at time. The drain 
task will be re-scheduled
+ * until demand is fulfilled or iterator has no more items.
+ *
+ * @param <T> The type of the entry this publisher will emit.
+ */
+public class IterableToPublisherAdapter<T> implements Publisher<T> {
+    private final Iterable<T> iterable;
+    private final Executor executor;
+    private final int batchSize;
+
+    /**
+     * Constructor.
+     *
+     * @param iterable An iterable to issue iterator for every incoming 
subscription.
+     * @param executor This executor will be used to drain iterator and supply 
entries to the subscription.
+     * @param batchSize An amount of entries to supply during a single 
iteration. It's always good idea
+     *      to provide some reasonable value here in order to give am ability 
to other publishers which share the same
+     *      executor to make progress.
+     */
+    public IterableToPublisherAdapter(Iterable<T> iterable, Executor executor, 
int batchSize) {
+        this.iterable = iterable;
+        this.executor = executor;
+        this.batchSize = batchSize;
+    }
+
+    @Override
+    public void subscribe(Subscriber<? super T> subscriber) {
+        Iterator<T> it = iterable.iterator();
+
+        Subscription subscription = new SubscriptionImpl<>(it, subscriber, 
executor, batchSize);
+
+        subscriber.onSubscribe(subscription);
+    }
+
+    @SuppressWarnings("FieldMayBeFinal")
+    private static class SubscriptionImpl<T> implements Subscription {
+        private static final VarHandle CANCELLED_HANDLE;
+        private static final VarHandle REQUESTED_HANDLE;
+        private static final VarHandle WIP_HANDLE;

Review Comment:
   Personally I don't see the point of using varhandles instead of atomics here.
   From my point of view the code with atomics will look shorter and cleaner.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to