This is an automated email from the ASF dual-hosted git repository.

bowenli86 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7373dc9e2a3 [FLINK-39226][python] Fix embedded PyIterator class cast 
after recovery (#27849)
7373dc9e2a3 is described below

commit 7373dc9e2a37e382f8479c353cf51d367007b9a8
Author: bowenli86 <[email protected]>
AuthorDate: Mon Apr 20 11:13:55 2026 -0700

    [FLINK-39226][python] Fix embedded PyIterator class cast after recovery 
(#27849)
    
    This pull request fixes FLINK-39226 by avoiding direct casts to 
pemja.core.object.PyIterator in embedded Python operators. After recovery, the 
returned iterator object can be loaded by a different user-code classloader, so 
hard casts can fail with ClassCastException. The change introduces a reflective 
wrapper that can consume iterator-like results without depending on the local 
PemJa class identity.
---
 ...ractOneInputEmbeddedPythonFunctionOperator.java |  16 ++-
 ...ractTwoInputEmbeddedPythonFunctionOperator.java |  31 +++---
 .../python/embedded/EmbeddedPythonIterator.java    |  73 ++++++++++++++
 .../EmbeddedPythonKeyedCoProcessOperator.java      |  17 ++--
 .../EmbeddedPythonKeyedProcessOperator.java        |  17 ++--
 .../embedded/EmbeddedPythonWindowOperator.java     |  17 ++--
 .../table/EmbeddedPythonTableFunctionOperator.java |  35 ++++---
 .../embedded/EmbeddedPythonIteratorTest.java       | 109 +++++++++++++++++++++
 .../embedded/ForeignClassLoaderIterator.java       |  47 +++++++++
 9 files changed, 288 insertions(+), 74 deletions(-)

diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractOneInputEmbeddedPythonFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractOneInputEmbeddedPythonFunctionOperator.java
index 6b06d2e5ebf..4db2b4bbaf1 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractOneInputEmbeddedPythonFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractOneInputEmbeddedPythonFunctionOperator.java
@@ -31,7 +31,6 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 
 import com.google.protobuf.AbstractMessageLite;
-import pemja.core.object.PyIterator;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -151,18 +150,17 @@ public abstract class 
AbstractOneInputEmbeddedPythonFunctionOperator<IN, OUT>
         timestamp = element.getTimestamp();
 
         IN value = element.getValue();
-        PyIterator results =
-                (PyIterator)
+        try (EmbeddedPythonIterator results =
+                EmbeddedPythonIterator.from(
                         interpreter.invokeMethod(
                                 "operation",
                                 "process_element",
-                                inputDataConverter.toExternal(value));
-
-        while (results.hasNext()) {
-            OUT result = outputDataConverter.toInternal(results.next());
-            collector.collect(result);
+                                inputDataConverter.toExternal(value)))) {
+            while (results.hasNext()) {
+                OUT result = outputDataConverter.toInternal(results.next());
+                collector.collect(result);
+            }
         }
-        results.close();
     }
 
     TypeInformation<IN> getInputTypeInfo() {
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractTwoInputEmbeddedPythonFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractTwoInputEmbeddedPythonFunctionOperator.java
index 6de7e6b8213..dce1000709d 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractTwoInputEmbeddedPythonFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/AbstractTwoInputEmbeddedPythonFunctionOperator.java
@@ -31,7 +31,6 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 
 import com.google.protobuf.AbstractMessageLite;
-import pemja.core.object.PyIterator;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -178,18 +177,17 @@ public abstract class 
AbstractTwoInputEmbeddedPythonFunctionOperator<IN1, IN2, O
         timestamp = element.getTimestamp();
 
         IN1 value = element.getValue();
-        PyIterator results =
-                (PyIterator)
+        try (EmbeddedPythonIterator results =
+                EmbeddedPythonIterator.from(
                         interpreter.invokeMethod(
                                 "operation",
                                 "process_element1",
-                                inputDataConverter1.toExternal(value));
-
-        while (results.hasNext()) {
-            OUT result = outputDataConverter.toInternal(results.next());
-            collector.collect(result);
+                                inputDataConverter1.toExternal(value)))) {
+            while (results.hasNext()) {
+                OUT result = outputDataConverter.toInternal(results.next());
+                collector.collect(result);
+            }
         }
-        results.close();
     }
 
     @Override
@@ -198,18 +196,17 @@ public abstract class 
AbstractTwoInputEmbeddedPythonFunctionOperator<IN1, IN2, O
         timestamp = element.getTimestamp();
 
         IN2 value = element.getValue();
-        PyIterator results =
-                (PyIterator)
+        try (EmbeddedPythonIterator results =
+                EmbeddedPythonIterator.from(
                         interpreter.invokeMethod(
                                 "operation",
                                 "process_element2",
-                                inputDataConverter2.toExternal(value));
-
-        while (results.hasNext()) {
-            OUT result = outputDataConverter.toInternal(results.next());
-            collector.collect(result);
+                                inputDataConverter2.toExternal(value)))) {
+            while (results.hasNext()) {
+                OUT result = outputDataConverter.toInternal(results.next());
+                collector.collect(result);
+            }
         }
-        results.close();
     }
 
     TypeInformation<IN1> getInputTypeInfo1() {
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonIterator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonIterator.java
new file mode 100644
index 00000000000..061f759dd18
--- /dev/null
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonIterator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python.embedded;
+
+import org.apache.flink.annotation.Internal;
+
+import java.lang.reflect.Method;
+import java.util.Objects;
+
+/**
+ * Reflective adapter for embedded Python iterators.
+ *
+ * <p>PEMJA iterator objects may come back from a different user-code 
classloader after recovery, so
+ * callers should not hard-cast them to {@code pemja.core.object.PyIterator}.
+ */
+@Internal
+public final class EmbeddedPythonIterator implements AutoCloseable {
+
+    private final Object iterator;
+    private final Method hasNextMethod;
+    private final Method nextMethod;
+    private final Method closeMethod;
+
+    private EmbeddedPythonIterator(Object iterator) {
+        this.iterator = Objects.requireNonNull(iterator, "iterator must not be 
null");
+
+        try {
+            Class<?> iteratorClass = iterator.getClass();
+            this.hasNextMethod = iteratorClass.getMethod("hasNext");
+            this.nextMethod = iteratorClass.getMethod("next");
+            this.closeMethod = iteratorClass.getMethod("close");
+        } catch (ReflectiveOperationException e) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Failed to adapt embedded Python iterator of type 
%s.",
+                            iterator.getClass().getName()),
+                    e);
+        }
+    }
+
+    public static EmbeddedPythonIterator from(Object iterator) {
+        return new EmbeddedPythonIterator(iterator);
+    }
+
+    public boolean hasNext() throws Exception {
+        return (boolean) hasNextMethod.invoke(iterator);
+    }
+
+    public Object next() throws Exception {
+        return nextMethod.invoke(iterator);
+    }
+
+    @Override
+    public void close() throws Exception {
+        closeMethod.invoke(iterator);
+    }
+}
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedCoProcessOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedCoProcessOperator.java
index edeee270208..57517164336 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedCoProcessOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedCoProcessOperator.java
@@ -36,8 +36,6 @@ import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.utils.PythonTypeUtils;
 import org.apache.flink.types.Row;
 
-import pemja.core.object.PyIterator;
-
 import java.util.List;
 
 import static org.apache.flink.python.PythonOptions.MAP_STATE_READ_CACHE_SIZE;
@@ -149,15 +147,14 @@ public class EmbeddedPythonKeyedCoProcessOperator<K, IN1, 
IN2, OUT>
         onTimerContext.timeDomain = timeDomain;
         onTimerContext.timer = timer;
 
-        PyIterator results =
-                (PyIterator)
-                        interpreter.invokeMethod("operation", "on_timer", 
timer.getTimestamp());
-
-        while (results.hasNext()) {
-            OUT result = outputDataConverter.toInternal(results.next());
-            collector.collect(result);
+        try (EmbeddedPythonIterator results =
+                EmbeddedPythonIterator.from(
+                        interpreter.invokeMethod("operation", "on_timer", 
timer.getTimestamp()))) {
+            while (results.hasNext()) {
+                OUT result = outputDataConverter.toInternal(results.next());
+                collector.collect(result);
+            }
         }
-        results.close();
 
         onTimerContext.timeDomain = null;
         onTimerContext.timer = null;
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedProcessOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedProcessOperator.java
index d0db39d37cc..3174ee1637f 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedProcessOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonKeyedProcessOperator.java
@@ -36,8 +36,6 @@ import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.utils.PythonTypeUtils;
 import org.apache.flink.types.Row;
 
-import pemja.core.object.PyIterator;
-
 import java.util.List;
 
 import static org.apache.flink.python.PythonOptions.MAP_STATE_READ_CACHE_SIZE;
@@ -143,15 +141,14 @@ public class EmbeddedPythonKeyedProcessOperator<K, IN, 
OUT>
             throws Exception {
         onTimerContext.timeDomain = timeDomain;
         onTimerContext.timer = timer;
-        PyIterator results =
-                (PyIterator)
-                        interpreter.invokeMethod("operation", "on_timer", 
timer.getTimestamp());
-
-        while (results.hasNext()) {
-            OUT result = outputDataConverter.toInternal(results.next());
-            collector.collect(result);
+        try (EmbeddedPythonIterator results =
+                EmbeddedPythonIterator.from(
+                        interpreter.invokeMethod("operation", "on_timer", 
timer.getTimestamp()))) {
+            while (results.hasNext()) {
+                OUT result = outputDataConverter.toInternal(results.next());
+                collector.collect(result);
+            }
         }
-        results.close();
 
         onTimerContext.timeDomain = null;
         onTimerContext.timer = null;
diff --git 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonWindowOperator.java
 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonWindowOperator.java
index 7c64933cf8b..7aaef476753 100644
--- 
a/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonWindowOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonWindowOperator.java
@@ -34,8 +34,6 @@ import org.apache.flink.streaming.api.utils.PythonTypeUtils;
 import org.apache.flink.table.runtime.operators.window.Window;
 import org.apache.flink.types.Row;
 
-import pemja.core.object.PyIterator;
-
 import java.util.List;
 
 import static org.apache.flink.python.PythonOptions.MAP_STATE_READ_CACHE_SIZE;
@@ -143,15 +141,14 @@ public class EmbeddedPythonWindowOperator<K, IN, OUT, W 
extends Window>
     private void invokeUserFunction(InternalTimer<K, W> timer) throws 
Exception {
         windowTimerContext.timer = timer;
 
-        PyIterator results =
-                (PyIterator)
-                        interpreter.invokeMethod("operation", "on_timer", 
timer.getTimestamp());
-
-        while (results.hasNext()) {
-            OUT result = outputDataConverter.toInternal(results.next());
-            collector.collect(result);
+        try (EmbeddedPythonIterator results =
+                EmbeddedPythonIterator.from(
+                        interpreter.invokeMethod("operation", "on_timer", 
timer.getTimestamp()))) {
+            while (results.hasNext()) {
+                OUT result = outputDataConverter.toInternal(results.next());
+                collector.collect(result);
+            }
         }
-        results.close();
 
         windowTimerContext.timer = null;
     }
diff --git 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/EmbeddedPythonTableFunctionOperator.java
 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/EmbeddedPythonTableFunctionOperator.java
index 77962daa67c..d23b2e3580a 100644
--- 
a/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/EmbeddedPythonTableFunctionOperator.java
+++ 
b/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/table/EmbeddedPythonTableFunctionOperator.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.fnexecution.v1.FlinkFnApi;
 import org.apache.flink.python.util.ProtoUtils;
+import 
org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonIterator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
@@ -33,8 +34,6 @@ import 
org.apache.flink.table.runtime.operators.python.AbstractEmbeddedStateless
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
-import pemja.core.object.PyIterator;
-
 import static org.apache.flink.python.PythonOptions.PYTHON_METRIC_ENABLED;
 import static org.apache.flink.python.PythonOptions.PYTHON_PROFILE_ENABLED;
 import static 
org.apache.flink.python.util.ProtoUtils.createFlattenRowTypeCoderInfoDescriptorProto;
@@ -147,25 +146,25 @@ public class EmbeddedPythonTableFunctionOperator extends 
AbstractEmbeddedStatele
                     userDefinedFunctionInputConverters[i].toExternal(value, 
udfInputOffsets[i]);
         }
 
-        PyIterator udtfResults =
-                (PyIterator)
+        try (EmbeddedPythonIterator udtfResults =
+                EmbeddedPythonIterator.from(
                         interpreter.invokeMethod(
                                 "table_operation",
                                 "process_element",
-                                (Object) (userDefinedFunctionInputArgs));
-
-        if (udtfResults.hasNext()) {
-            do {
-                Object[] udtfResult = (Object[]) udtfResults.next();
-                for (int i = 0; i < udtfResult.length; i++) {
-                    reuseResultRowData.setField(
-                            i, 
userDefinedFunctionOutputConverters[i].toInternal(udtfResult[i]));
-                }
-                rowDataWrapper.collect(reuseJoinedRow.replace(value, 
reuseResultRowData));
-            } while (udtfResults.hasNext());
-        } else if (joinType == FlinkJoinType.LEFT) {
-            rowDataWrapper.collect(reuseJoinedRow.replace(value, 
reuseNullResultRowData));
+                                (Object) (userDefinedFunctionInputArgs)))) {
+            if (udtfResults.hasNext()) {
+                do {
+                    Object[] udtfResult = (Object[]) udtfResults.next();
+                    for (int i = 0; i < udtfResult.length; i++) {
+                        reuseResultRowData.setField(
+                                i,
+                                
userDefinedFunctionOutputConverters[i].toInternal(udtfResult[i]));
+                    }
+                    rowDataWrapper.collect(reuseJoinedRow.replace(value, 
reuseResultRowData));
+                } while (udtfResults.hasNext());
+            } else if (joinType == FlinkJoinType.LEFT) {
+                rowDataWrapper.collect(reuseJoinedRow.replace(value, 
reuseNullResultRowData));
+            }
         }
-        udtfResults.close();
     }
 }
diff --git 
a/flink-python/src/test/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonIteratorTest.java
 
b/flink-python/src/test/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonIteratorTest.java
new file mode 100644
index 00000000000..2f3c0a2a381
--- /dev/null
+++ 
b/flink-python/src/test/java/org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonIteratorTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python.embedded;
+
+import org.junit.jupiter.api.Test;
+import pemja.core.object.PyIterator;
+
+import java.lang.reflect.Constructor;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link EmbeddedPythonIterator}. */
+class EmbeddedPythonIteratorTest {
+
+    @Test
+    void testReadsIteratorLoadedByDifferentClassLoader() throws Exception {
+        URL classesUrl =
+                EmbeddedPythonIteratorTest.class
+                        .getProtectionDomain()
+                        .getCodeSource()
+                        .getLocation();
+
+        try (URLClassLoader classLoader = new URLClassLoader(new URL[] 
{classesUrl}, null)) {
+            Class<?> iteratorClass =
+                    Class.forName(
+                            
"org.apache.flink.streaming.api.operators.python.embedded."
+                                    + "ForeignClassLoaderIterator",
+                            true,
+                            classLoader);
+            Object iterator =
+                    iteratorClass
+                            .getConstructor(Object[].class)
+                            .newInstance((Object) new Object[] {"first", 
"second"});
+
+            assertThat(iterator.getClass().getClassLoader())
+                    
.isNotSameAs(EmbeddedPythonIteratorTest.class.getClassLoader());
+
+            try (EmbeddedPythonIterator embeddedPythonIterator =
+                    EmbeddedPythonIterator.from(iterator)) {
+                assertThat(embeddedPythonIterator.hasNext()).isTrue();
+                assertThat(embeddedPythonIterator.next()).isEqualTo("first");
+                assertThat(embeddedPythonIterator.hasNext()).isTrue();
+                assertThat(embeddedPythonIterator.next()).isEqualTo("second");
+                assertThat(embeddedPythonIterator.hasNext()).isFalse();
+            }
+
+            
assertThat(iteratorClass.getMethod("isClosed").invoke(iterator)).isEqualTo(true);
+        }
+    }
+
+    @Test
+    void testReproducesDirectPemjaCastFailureAcrossClassLoaders() throws 
Exception {
+        Object iterator = createForeignPemjaIterator();
+
+        
assertThat(iterator.getClass().getName()).isEqualTo(PyIterator.class.getName());
+        assertThat(iterator.getClass()).isNotEqualTo(PyIterator.class);
+        assertThat(iterator.getClass().getClassLoader())
+                .isNotSameAs(PyIterator.class.getClassLoader());
+
+        assertThatThrownBy(() -> castToLocalPemjaIterator(iterator))
+                .isInstanceOf(ClassCastException.class)
+                .hasMessageContaining("pemja.core.object.PyIterator")
+                .hasMessageContaining("cannot be cast");
+    }
+
+    @Test
+    void 
testWrapsPemjaIteratorLoadedByDifferentClassLoaderWithoutCastFailure() throws 
Exception {
+        Object iterator = createForeignPemjaIterator();
+
+        assertThatCode(() -> 
EmbeddedPythonIterator.from(iterator)).doesNotThrowAnyException();
+    }
+
+    private static Object createForeignPemjaIterator() throws Exception {
+        URL pemjaJarUrl = 
PyIterator.class.getProtectionDomain().getCodeSource().getLocation();
+
+        try (URLClassLoader classLoader = new URLClassLoader(new URL[] 
{pemjaJarUrl}, null)) {
+            Class<?> iteratorClass =
+                    Class.forName("pemja.core.object.PyIterator", true, 
classLoader);
+            Constructor<?> constructor =
+                    iteratorClass.getDeclaredConstructor(long.class, 
long.class);
+            constructor.setAccessible(true);
+            return constructor.newInstance(0L, 0L);
+        }
+    }
+
+    private static PyIterator castToLocalPemjaIterator(Object iterator) {
+        return (PyIterator) iterator;
+    }
+}
diff --git 
a/flink-python/src/test/java/org/apache/flink/streaming/api/operators/python/embedded/ForeignClassLoaderIterator.java
 
b/flink-python/src/test/java/org/apache/flink/streaming/api/operators/python/embedded/ForeignClassLoaderIterator.java
new file mode 100644
index 00000000000..5fadded2702
--- /dev/null
+++ 
b/flink-python/src/test/java/org/apache/flink/streaming/api/operators/python/embedded/ForeignClassLoaderIterator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.python.embedded;
+
+/** Test-only iterator that can be loaded through an isolated classloader. */
+public class ForeignClassLoaderIterator {
+
+    private final Object[] values;
+    private int index;
+    private boolean closed;
+
+    public ForeignClassLoaderIterator(Object... values) {
+        this.values = values;
+    }
+
+    public boolean hasNext() {
+        return index < values.length;
+    }
+
+    public Object next() {
+        return values[index++];
+    }
+
+    public void close() {
+        closed = true;
+    }
+
+    public boolean isClosed() {
+        return closed;
+    }
+}

Reply via email to