This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 74a3243984d [FLINK-39242][python] Fix error when getting columns from 
ResolvedSchema
74a3243984d is described below

commit 74a3243984d23a19d1e6ac77c86d1a6c477da584
Author: Mika Naylor <[email protected]>
AuthorDate: Wed Mar 11 13:52:09 2026 +0100

    [FLINK-39242][python] Fix error when getting columns from ResolvedSchema
---
 flink-python/pyflink/table/catalog.py              |  6 +++---
 .../pyflink/table/tests/test_schema_operation.py   | 22 +++++++++++++++++++---
 2 files changed, 22 insertions(+), 6 deletions(-)

diff --git a/flink-python/pyflink/table/catalog.py 
b/flink-python/pyflink/table/catalog.py
index 40bb06c8eb4..d72eca4293c 100644
--- a/flink-python/pyflink/table/catalog.py
+++ b/flink-python/pyflink/table/catalog.py
@@ -1559,11 +1559,11 @@ class Column(metaclass=ABCMeta):
             raise TypeError("The input %s is not an instance of Column." % 
j_column)
 
         if 
get_java_class(JPhysicalColumn).isAssignableFrom(j_column.getClass()):
-            return PhysicalColumn(j_physical_column=j_column.getClass())
+            return PhysicalColumn(j_physical_column=j_column)
         elif 
get_java_class(JComputedColumn).isAssignableFrom(j_column.getClass()):
-            return ComputedColumn(j_computed_column=j_column.getClass())
+            return ComputedColumn(j_computed_column=j_column)
         elif 
get_java_class(JMetadataColumn).isAssignableFrom(j_column.getClass()):
-            return MetadataColumn(j_metadata_column=j_column.getClass())
+            return MetadataColumn(j_metadata_column=j_column)
         else:
             return None
 
diff --git a/flink-python/pyflink/table/tests/test_schema_operation.py 
b/flink-python/pyflink/table/tests/test_schema_operation.py
index c293e783d48..d8ce568aa2e 100644
--- a/flink-python/pyflink/table/tests/test_schema_operation.py
+++ b/flink-python/pyflink/table/tests/test_schema_operation.py
@@ -15,7 +15,7 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from pyflink.table.catalog import ResolvedSchema
+from pyflink.table.catalog import ResolvedSchema, PhysicalColumn
 from pyflink.table.table_schema import TableSchema
 from pyflink.table.types import DataTypes
 from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase
@@ -33,7 +33,7 @@ class StreamTableSchemaTests(PyFlinkStreamTableTestCase):
         result = t.group_by(t.c).select(t.a.sum.alias('a'), t.c.alias('b'))
         schema = result.get_schema()
 
-        assert schema == TableSchema(["a", "b"], [DataTypes.BIGINT(), 
DataTypes.STRING()])
+        self.assertEqual(schema, TableSchema(["a", "b"], [DataTypes.BIGINT(), 
DataTypes.STRING()]))
 
     def test_get_resolved_schema(self):
         t = self.t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c'])
@@ -42,7 +42,23 @@ class StreamTableSchemaTests(PyFlinkStreamTableTestCase):
             ['a', 'b', 'c'],
             [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()],
         )
-        assert resolved_schema == expected_schema
+        self.assertEqual(resolved_schema, expected_schema)
+
+    def test_resolved_schema_get_columns(self):
+        physical_schema = ResolvedSchema.physical(
+            ['a', 'b', 'c'],
+            [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()],
+        )
+
+        columns = physical_schema.get_columns()
+        self.assertEqual(len(columns), 3)
+        for column in columns:
+            self.assertEqual(type(column), PhysicalColumn)
+
+        for idx in range(3):
+            column = physical_schema.get_column(idx)
+            self.assertEqual(type(column), PhysicalColumn)
+
 
 if __name__ == '__main__':
     import unittest

Reply via email to