This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 74a3243984d [FLINK-39242][python] Fix error when getting columns from
ResolvedSchema
74a3243984d is described below
commit 74a3243984d23a19d1e6ac77c86d1a6c477da584
Author: Mika Naylor <[email protected]>
AuthorDate: Wed Mar 11 13:52:09 2026 +0100
[FLINK-39242][python] Fix error when getting columns from ResolvedSchema
---
flink-python/pyflink/table/catalog.py | 6 +++---
.../pyflink/table/tests/test_schema_operation.py | 22 +++++++++++++++++++---
2 files changed, 22 insertions(+), 6 deletions(-)
diff --git a/flink-python/pyflink/table/catalog.py
b/flink-python/pyflink/table/catalog.py
index 40bb06c8eb4..d72eca4293c 100644
--- a/flink-python/pyflink/table/catalog.py
+++ b/flink-python/pyflink/table/catalog.py
@@ -1559,11 +1559,11 @@ class Column(metaclass=ABCMeta):
raise TypeError("The input %s is not an instance of Column." %
j_column)
if
get_java_class(JPhysicalColumn).isAssignableFrom(j_column.getClass()):
- return PhysicalColumn(j_physical_column=j_column.getClass())
+ return PhysicalColumn(j_physical_column=j_column)
elif
get_java_class(JComputedColumn).isAssignableFrom(j_column.getClass()):
- return ComputedColumn(j_computed_column=j_column.getClass())
+ return ComputedColumn(j_computed_column=j_column)
elif
get_java_class(JMetadataColumn).isAssignableFrom(j_column.getClass()):
- return MetadataColumn(j_metadata_column=j_column.getClass())
+ return MetadataColumn(j_metadata_column=j_column)
else:
return None
diff --git a/flink-python/pyflink/table/tests/test_schema_operation.py
b/flink-python/pyflink/table/tests/test_schema_operation.py
index c293e783d48..d8ce568aa2e 100644
--- a/flink-python/pyflink/table/tests/test_schema_operation.py
+++ b/flink-python/pyflink/table/tests/test_schema_operation.py
@@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from pyflink.table.catalog import ResolvedSchema
+from pyflink.table.catalog import ResolvedSchema, PhysicalColumn
from pyflink.table.table_schema import TableSchema
from pyflink.table.types import DataTypes
from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase
@@ -33,7 +33,7 @@ class StreamTableSchemaTests(PyFlinkStreamTableTestCase):
result = t.group_by(t.c).select(t.a.sum.alias('a'), t.c.alias('b'))
schema = result.get_schema()
- assert schema == TableSchema(["a", "b"], [DataTypes.BIGINT(),
DataTypes.STRING()])
+ self.assertEqual(schema, TableSchema(["a", "b"], [DataTypes.BIGINT(),
DataTypes.STRING()]))
def test_get_resolved_schema(self):
t = self.t_env.from_elements([(1, 'Hi', 'Hello')], ['a', 'b', 'c'])
@@ -42,7 +42,23 @@ class StreamTableSchemaTests(PyFlinkStreamTableTestCase):
['a', 'b', 'c'],
[DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()],
)
- assert resolved_schema == expected_schema
+ self.assertEqual(resolved_schema, expected_schema)
+
+ def test_resolved_schema_get_columns(self):
+ physical_schema = ResolvedSchema.physical(
+ ['a', 'b', 'c'],
+ [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()],
+ )
+
+ columns = physical_schema.get_columns()
+ self.assertEqual(len(columns), 3)
+ for column in columns:
+ self.assertEqual(type(column), PhysicalColumn)
+
+ for idx in range(3):
+ column = physical_schema.get_column(idx)
+ self.assertEqual(type(column), PhysicalColumn)
+
if __name__ == '__main__':
import unittest