This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 793bb795c3 [Fix][Iceberg] Fix IllegalMonitorStateException in 
streaming enumerator (#10131)
793bb795c3 is described below

commit 793bb795c37d8e10b395ef0b601c27dd5393ab6a
Author: yzeng1618 <[email protected]>
AuthorDate: Wed Dec 24 10:19:20 2025 +0800

    [Fix][Iceberg] Fix IllegalMonitorStateException in streaming enumerator 
(#10131)
    
    Co-authored-by: zengyi <[email protected]>
---
 .../enumerator/IcebergStreamSplitEnumerator.java   |  16 ++-
 .../IcebergStreamSplitEnumeratorTest.java          | 139 +++++++++++++++++++++
 2 files changed, 150 insertions(+), 5 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumerator.java
index 84ebcb32a5..fffca402bc 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumerator.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator;
 
+import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
 import org.apache.seatunnel.shade.org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
@@ -46,7 +47,8 @@ import java.util.concurrent.ConcurrentMap;
 public class IcebergStreamSplitEnumerator extends AbstractSplitEnumerator {
 
     private final ConcurrentMap<TablePath, IcebergEnumeratorPosition> 
tableOffsets;
-    private volatile boolean initialized = false;
+
+    @VisibleForTesting volatile boolean initialized = false;
 
     public IcebergStreamSplitEnumerator(
             Context<IcebergFileScanTaskSplit> context,
@@ -79,9 +81,9 @@ public class IcebergStreamSplitEnumerator extends 
AbstractSplitEnumerator {
         Set<Integer> readers = context.registeredReaders();
         while (true) {
             for (TablePath tablePath : pendingTables) {
-                checkThrowInterruptedException();
-
                 synchronized (stateLock) {
+                    checkThrowInterruptedException();
+
                     log.info("Scan table {}.", tablePath);
 
                     Collection<IcebergFileScanTaskSplit> splits = 
loadSplits(tablePath);
@@ -95,7 +97,9 @@ public class IcebergStreamSplitEnumerator extends 
AbstractSplitEnumerator {
                 initialized = true;
             }
 
-            stateLock.wait(sourceConfig.getIncrementScanInterval());
+            synchronized (stateLock) {
+                stateLock.wait(sourceConfig.getIncrementScanInterval());
+            }
         }
     }
 
@@ -112,7 +116,9 @@ public class IcebergStreamSplitEnumerator extends 
AbstractSplitEnumerator {
     @Override
     public void handleSplitRequest(int subtaskId) {
         if (initialized) {
-            stateLock.notifyAll();
+            synchronized (stateLock) {
+                stateLock.notifyAll();
+            }
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumeratorTest.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumeratorTest.java
new file mode 100644
index 0000000000..7e8ec18e5f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/IcebergStreamSplitEnumeratorTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.iceberg.source.enumerator;
+
+import org.apache.seatunnel.api.common.metrics.AbstractMetricsContext;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.event.Event;
+import org.apache.seatunnel.api.event.EventListener;
+import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import 
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCommonOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.iceberg.source.split.IcebergFileScanTaskSplit;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/** Minimal test for {@link IcebergStreamSplitEnumerator} wait / notify fix. */
+class IcebergStreamSplitEnumeratorTest {
+
+    @Test
+    void testHandleSplitRequestDoesNotThrowIllegalMonitorStateException() 
throws Exception {
+        SourceSplitEnumerator.Context<IcebergFileScanTaskSplit> context =
+                new DummyEnumeratorContext();
+
+        IcebergSourceConfig sourceConfig = createSourceConfig();
+
+        // Catalog tables must be non-empty because AbstractSplitEnumerator 
uses the size as the
+        // capacity of an ArrayBlockingQueue.
+        TablePath tablePath = TablePath.of("default", "source");
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        TableIdentifier.of("seatunnel", "default", "source"),
+                        TableSchema.builder().build(),
+                        Collections.emptyMap(),
+                        Collections.emptyList(),
+                        "test table");
+        Map<TablePath, CatalogTable> catalogTables =
+                Collections.singletonMap(tablePath, catalogTable);
+
+        IcebergStreamSplitEnumerator enumerator =
+                new IcebergStreamSplitEnumerator(
+                        context, sourceConfig, catalogTables, 
Collections.emptyMap());
+
+        // Force initialized = true so handleSplitRequest executes the notify 
logic.
+        enumerator.initialized = true;
+
+        // Before the fix, this would throw IllegalMonitorStateException 
because notifyAll was
+        // called without holding the monitor.
+        Assertions.assertDoesNotThrow(() -> enumerator.handleSplitRequest(0));
+    }
+
+    private IcebergSourceConfig createSourceConfig() {
+        Map<String, Object> configs = new HashMap<>();
+        Map<String, Object> catalogProps = new HashMap<>();
+        catalogProps.put("type", "hadoop");
+        catalogProps.put("warehouse", Paths.get("target", "iceberg", 
"hadoop").toUri().toString());
+
+        configs.put(IcebergCommonOptions.KEY_CATALOG_NAME.key(), "seatunnel");
+        configs.put(IcebergCommonOptions.KEY_NAMESPACE.key(), "default");
+        configs.put(IcebergCommonOptions.KEY_TABLE.key(), "source");
+        configs.put(IcebergCommonOptions.CATALOG_PROPS.key(), catalogProps);
+
+        return new IcebergSourceConfig(ReadonlyConfig.fromMap(configs));
+    }
+
+    private static class DummyEnumeratorContext
+            implements SourceSplitEnumerator.Context<IcebergFileScanTaskSplit> 
{
+
+        private final MetricsContext metricsContext = new 
AbstractMetricsContext() {};
+        private final EventListener eventListener =
+                new EventListener() {
+                    @Override
+                    public void onEvent(Event event) {
+                        // no-op
+                    }
+                };
+
+        @Override
+        public int currentParallelism() {
+            return 1;
+        }
+
+        @Override
+        public java.util.Set<Integer> registeredReaders() {
+            return Collections.singleton(0);
+        }
+
+        @Override
+        public void assignSplit(int subtaskId, 
java.util.List<IcebergFileScanTaskSplit> splits) {
+            // no-op
+        }
+
+        @Override
+        public void signalNoMoreSplits(int subtask) {
+            // no-op
+        }
+
+        @Override
+        public void sendEventToSourceReader(
+                int subtaskId, org.apache.seatunnel.api.source.SourceEvent 
event) {
+            // no-op
+        }
+
+        @Override
+        public MetricsContext getMetricsContext() {
+            return metricsContext;
+        }
+
+        @Override
+        public EventListener getEventListener() {
+            return eventListener;
+        }
+    }
+}

Reply via email to