davidradl commented on code in PR #26828:
URL: https://github.com/apache/flink/pull/26828#discussion_r2273861456


##########
flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CheckpointedStreamingProgram.java:
##########
@@ -132,7 +121,207 @@ public void notifyCheckpointComplete(long checkpointId) 
throws Exception {
         public void notifyCheckpointAborted(long checkpointId) {}
     }
 
-    // 
--------------------------------------------------------------------------------------------
+    private static class SimpleSplit implements SourceSplit {
+        @Override
+        public String splitId() {
+            return "simple";
+        }
+    }
+
+    private static class SimpleStringGenerator implements Source<String, 
SimpleSplit, Void> {
+        @Override
+        public Boundedness getBoundedness() {
+            return Boundedness.CONTINUOUS_UNBOUNDED;
+        }
+
+        @Override
+        public SplitEnumerator<SimpleSplit, Void> createEnumerator(
+                SplitEnumeratorContext<SimpleSplit> enumContext) throws 
Exception {
+            return new SplitEnumerator<>() {
+                private boolean assigned = false;
+
+                @Override
+                public void start() {}
+
+                @Override
+                public void handleSplitRequest(int subtaskId, String hostname) 
{
+                    if (!assigned) {
+                        enumContext.assignSplit(new SimpleSplit(), subtaskId);
+                        assigned = true;
+                    }
+                }
+
+                @Override
+                public void addSplitsBack(List<SimpleSplit> splits, int 
subtaskId) {}
+
+                @Override
+                public void addReader(int subtaskId) {
+                    if (!assigned) {
+                        enumContext.assignSplit(new SimpleSplit(), subtaskId);
+                        enumContext.signalNoMoreSplits(subtaskId);
+                        assigned = true;
+                    }
+                }
+
+                @Override
+                public Void snapshotState(long checkpointId) {
+                    return null;
+                }
+
+                @Override
+                public void close() {}
+            };
+        }
+
+        @Override
+        public SplitEnumerator<SimpleSplit, Void> restoreEnumerator(
+                SplitEnumeratorContext<SimpleSplit> enumContext, Void 
checkpoint) throws Exception {
+            return createEnumerator(enumContext);
+        }
+
+        @Override
+        public SimpleVersionedSerializer<SimpleSplit> getSplitSerializer() {
+            return new SimpleVersionedSerializer<>() {
+                @Override
+                public int getVersion() {
+                    return 1;
+                }
+
+                @Override
+                public byte[] serialize(SimpleSplit split) {
+                    return new byte[0];
+                }
+
+                @Override
+                public SimpleSplit deserialize(int version, byte[] bytes) {
+                    return new SimpleSplit();
+                }
+            };
+        }
+
+        @Override
+        public SimpleVersionedSerializer<Void> 
getEnumeratorCheckpointSerializer() {
+            return new SimpleVersionedSerializer<>() {
+                @Override
+                public int getVersion() {
+                    return 1;
+                }
+
+                @Override
+                public byte[] serialize(Void obj) {
+                    return new byte[0];
+                }
+
+                @Override
+                public Void deserialize(int version, byte[] serialized) {
+                    return null;
+                }
+            };
+        }
+
+        @Override
+        public SourceReader<String, SimpleSplit> 
createReader(SourceReaderContext readerContext)
+                throws Exception {
+            return new SourceReader<>() {
+                private volatile boolean running = true;
+                private boolean splitReceived = false; // Add this

Review Comment:
   what does this comment mean?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to