Anton Solovev created FLINK-5592:
------------------------------------

             Summary: Wrong number of RowSerializers with nested Rows in 
Collection mode
                 Key: FLINK-5592
                 URL: https://issues.apache.org/jira/browse/FLINK-5592
             Project: Flink
          Issue Type: Bug
            Reporter: Anton Solovev
            Assignee: Anton Solovev


{code}
@Test
  def testNestedRowTypes(): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env, config)

    tEnv.registerTableSource("rows", new MockSource)

    val table: Table = tEnv.scan("rows")
    val nestedTable: Table = tEnv.scan("rows").select('person)

    table.printSchema()
    nestedTable.printSchema()
    val collect: Seq[Row] = nestedTable.collect()
    print(collect)
  }

  class MockSource extends BatchTableSource[Row] {
    import org.apache.flink.api.java.ExecutionEnvironment
    import org.apache.flink.api.java.DataSet

    override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
      val data = List(
        Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")),
        Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")),
        Row.of(Row.of("data_1", "dob"), Row.of("info_4", "dub")))
      execEnv.fromCollection(data.asJava, getReturnType)
    }

    override def getReturnType: TypeInformation[Row] = {
      new RowTypeInfo(
        Array[TypeInformation[_]](
          new RowTypeInfo(
            Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO),
            Array("name", "age"))),
        Array("person")
      )
    }
  }
{code}

throws {{java.lang.RuntimeException: Row arity of from does not match 
serializers}}

stacktrace 
{code}
at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:82)
        at 
org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:36)
        at 
org.apache.flink.api.common.operators.GenericDataSourceBase.executeOnCollections(GenericDataSourceBase.java:234)
        at 
org.apache.flink.api.common.operators.CollectionExecutor.executeDataSource(CollectionExecutor.java:218)
        at 
org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:154)
        at 
org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130)
        at 
org.apache.flink.api.common.operators.CollectionExecutor.executeDataSink(CollectionExecutor.java:181)
        at 
org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:157)
        at 
org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:130)
        at 
org.apache.flink.api.common.operators.CollectionExecutor.execute(CollectionExecutor.java:114)
        at 
org.apache.flink.api.java.CollectionEnvironment.execute(CollectionEnvironment.java:35)
        at 
org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:47)
        at 
org.apache.flink.test.util.CollectionTestEnvironment.execute(CollectionTestEnvironment.java:42)
        at 
org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:672)
        at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:547)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to