Anton Solovev created FLINK-5592: ------------------------------------ Summary: Wrong number of RowSerializers with nested Rows in Collection mode Key: FLINK-5592 URL: https://issues.apache.org/jira/browse/FLINK-5592 Project: Flink Issue Type: Bug Reporter: Anton Solovev Assignee: Anton Solovev
{code} @Test def testNestedRowTypes(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) tEnv.registerTableSource("rows", new MockSource) val table: Table = tEnv.scan("rows") val nestedTable: Table = tEnv.scan("rows").select('person) table.printSchema() nestedTable.printSchema() val collect: Seq[Row] = nestedTable.collect() print(collect) } class MockSource extends BatchTableSource[Row] { import org.apache.flink.api.java.ExecutionEnvironment import org.apache.flink.api.java.DataSet override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { val data = List( Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")), Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")), Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub"))) execEnv.fromCollection(data.asJava, getReturnType) } override def getReturnType: TypeInformation[Row] = { new RowTypeInfo( Array[TypeInformation[_]]( new RowTypeInfo( Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Array("name", "age"))), Array("person") ) } } {code} throws {{java.lang.RuntimeException: Row arity of from does not match serializers}} stacktrace {code} at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:82) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:36) at org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:234) at org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:218) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:154) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130) at org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:181) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:157) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130) at org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:114) at org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35) at org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47) at org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:42) at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:672) at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)