Hi all,

I am using flink to process external data. The source format is json, and the 
underlying data types are defined in a external library.
I generated table schema with `TableSchema.fromTypeInfo` and 
`TypeInformation.of[_]`. From what I read, this method is deprecated.
But I didn't find any alternatives. Manually tweaking table schema is not 
viable as there are simply too many types.

One of the field in the source type is `java.util.Date`. I tried to convert the 
obtained table to a datastream with Table.toAppendStream.
When I ran 
`tEnv.from("rawEvent").select('_isComplete).toAppendStream[(Boolean)].print()`, 
the following exception occurred.

Exception in thread "main" org.apache.flink.table.api.TableException: Type is 
not supported: Date
at 
org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:350)
at 
org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromTypeInfo(FlinkTypeFactory.scala:63)
at 
org.apache.flink.table.calcite.FlinkTypeFactory.$anonfun$buildLogicalRowType$1(FlinkTypeFactory.scala:201)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at 
org.apache.flink.table.calcite.FlinkTypeFactory.buildLogicalRowType(FlinkTypeFactory.scala:198)
at 
org.apache.flink.table.plan.schema.TableSourceTable.getRowType(TableSourceTable.scala:96)
at 
org.apache.calcite.prepare.CalciteCatalogReader.getTable(CalciteCatalogReader.java:131)
at 
org.apache.calcite.prepare.CalciteCatalogReader.getTableForMember(CalciteCatalogReader.java:228)
at 
org.apache.calcite.prepare.CalciteCatalogReader.getTableForMember(CalciteCatalogReader.java:84)
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1068)
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1094)
at 
org.apache.flink.table.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:268)
at 
org.apache.flink.table.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:134)
at 
org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
at 
org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:131)
at 
org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:111)
at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:91)
at 
org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
at 
org.apache.flink.table.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:130)
at java.util.Collections$SingletonList.forEach(Collections.java:4824)
at 
org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:130)
at 
org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:111)
at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:46)
at 
org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
at 
org.apache.flink.table.calcite.FlinkRelBuilder.tableOperation(FlinkRelBuilder.scala:106)
at 
org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:390)
at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:185)
at 
org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:117)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:273)
at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:210)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.scala:107)
at 
org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:101)
at io.redacted.test.package$.testJoin(package.scala:31)
at io.redacted.test.package$.process(package.scala:26)
at io.redacted.DataAggregator$.main(DataAggregator.scala:15)
at io.redacted.DataAggregator.main(DataAggregator.scala)

This exception is thrown even though I didn't select RAW data field 
`_startTime` which is of type `java.util.Date`. I believe this exception is 
undesirable.
Is there any way to obtain a RAW data from flink tables? If there isn't any, 
how do I circumvent my current issue? Do I need to manually update all table 
schema?

There is a relevant issue in 
http://mail-archives.apache.org/mod_mbox/flink-user/201907.mbox/%3CCA+3UsY2-L1OKTjNBwX2ajG3o6v5M6QS=jbwyybemzlvdm5x...@mail.gmail.com%3E,
Unfortunately, I didn't find a satisfatory solutions.

Cheers,
Yi

Reply via email to