[
https://issues.apache.org/jira/browse/FLINK-5280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15747806#comment-15747806
]
Ivan Mushketyk commented on FLINK-5280:
---------------------------------------
Hi Fabian, Jark,
Thank you for all your comments and for your patience.
Let me try to propose a solution and see if this will work.
I performed a simple test using TableSource, and it seems that we can access
nested fields. Here is my test *TableSource* that returns POJOs:
https://gist.github.com/mushketyk/acffb701a1f71a6e9bd661c781d7b18c
And here is a test code that uses it:
{code:java}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env,
config());
tableEnv.registerTableSource("MyTable", new TestBatchTableSource());
Table result = tableEnv
.sql("SELECT MyTable.amount * MyTable.id, MyTable.name,
MyTable.childPojo.child.str FROM MyTable WHERE amount < 4");
DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
List<Row> results = resultSet.collect();
{code}
And the result of the test seems feesible:
{code}
[0,pojo16,mostChildPojo16, 0,pojo32,mostChildPojo32, 1,pojo1,mostChildPojo1,
17,pojo17,mostChildPojo17, 33,pojo33,mostChildPojo33,
36,pojo18,mostChildPojo18, 4,pojo2,mostChildPojo2, 57,pojo19,mostChildPojo19,
9,pojo3,mostChildPojo3]
{code}
Since we can access nested fields, it looks like we only need to convert the
first level of fields into a *Row*. The result *Row* will contain potentially
nested POJOs, but this does not seem to be an issue. I don't see why do we need
to go beyond one level of unpacking when we create a *Row*, so will make an
assumption this is all we need.
To do this, we need to specify how each field of a result *Row* should be
extracted from *TableSource*'s type T. We can add a new method called:
*getFieldMapping* that will return an array of strings. A String in position
*i* will be a field name that should be accessed to get i-th *Row* field value.
So for example in this comment it can be implemented simply like this:
{code:java}
@Override
public String[] getFieldMapping() {
return new String[]{"amount", "childPojo", "id", "name"};
}
{code}
Which means that to get value for a 0-th field in the result *Row* we need to
access field *amount*, to get 1-st field we need to access field "childPojo"
and so on.
In cases, if we need to convert an indexable type like a tuple or an array we
do not need this mapping. In this case, we can return *null* or an empty array.
*Optional* would be a better option, but I think that Flink should work for
both Java 7 and Java 8.
The only problem with this approach that the *FlinkTable* class accepts an
array of field indexes that is used to convert values from original type into a
*Row*:
{code:scala}
abstract class FlinkTable[T](
val typeInfo: TypeInformation[T],
val fieldIndexes: Array[Int],
val fieldNames: Array[String])
extends AbstractTable {
...
}
{code}
So to work around this I propose to change this to:
{code:scala}
abstract class FlinkTable[T](
val typeInfo: TypeInformation[T],
val fieldIndexes: Array[Int],
val fieldMappings: Optional[Array[String]], // <--- New argument
val fieldNames: Array[String])
extends AbstractTable {
...
}
{code}
We can then use this fieldMappings in *CodeGenerator* to generate a proper
mapper.
This will technically make it possible to convert *GenericRecord* into a *Row*.
But since *GenericRecord* implements Avro's interfaces do we need to add a
dependency on Avro in flink-table to access these fields? Or should we use
reflection to access these methods? Or should we ignore *GenericRecord* case
altogether and simply return *Row* from *KafkaTableSource*?
I also wonder why do we need this method in the TableSource interface:
{code:scala}
/** Returns the number of fields of the table. */
def getNumberOfFields: Int
{code}
and I wonder if we can drop it.
What do you think about it? Am I missing something?
> Extend TableSource to support nested data
> -----------------------------------------
>
> Key: FLINK-5280
> URL: https://issues.apache.org/jira/browse/FLINK-5280
> Project: Flink
> Issue Type: Improvement
> Components: Table API & SQL
> Affects Versions: 1.2.0
> Reporter: Fabian Hueske
> Assignee: Ivan Mushketyk
>
> The {{TableSource}} interface does currently only support the definition of
> flat rows.
> However, there are several storage formats for nested data that should be
> supported such as Avro, Json, Parquet, and Orc. The Table API and SQL can
> also natively handle nested rows.
> The {{TableSource}} interface and the code to register table sources in
> Calcite's schema need to be extended to support nested data.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)