Re: Convert flink table with field of type RAW to datastream

2020-06-18 Thread Jark Wu
Flink SQL/Table requires to know the field data types explicitly. Maybe you
can apply a MapFunction before `toTable` to convert/normalize the data and
type.

Best,
Jark

On Thu, 18 Jun 2020 at 14:12, YI  wrote:

> Hi Jark,
>
> Thank you for your suggestion. My current problem is that there are quite
> a few data types. All these data types are defined upstream which I have no
> control.
> I don't think I can easily change the type information of a specific
> field. Can I? Things become nasty when there are so many `java.util.Date` I
> need to change.
>
> The reason I want to use flink table is that it allows me to easily join
> several tables. As an alternative, I think I can use stream join operator.
> My only complaint is that it become tedious when I want to join more than
> once. I think I need to define all the intermediate data types.
>
> Best,
> Yi
>
>
>
> ‐‐‐ Original Message ‐‐‐
> On Thursday, June 18, 2020 12:11 PM, Jark Wu  wrote:
>
> Hi YI,
>
> Flink doesn't have a TypeInformation for `java.util.Date`, but
> only SqlTimeTypeInfo.DATE for `java.sql.Date`.
> That's why the TypeInformation.of(java.util.Date) is being recognized as a
> RAW type.
>
> To resolve your problem, I think in `TypeInformation.of(..)` you should
> use a concrete type for `java.util.Date`, e.g. `java.sql.Timestamp`,
> `java.sql.Date`, `java.sql.Time`.
>
> Best,
> Jark
>
> On Thu, 18 Jun 2020 at 10:32, YI  wrote:
>
>> Hi all,
>>
>> I am using flink to process external data. The source format is json, and
>> the underlying data types are defined in a external library.
>> I generated table schema with `TableSchema.fromTypeInfo` and
>> `TypeInformation.of[_]`. From what I read, this method is deprecated.
>> But I didn't find any alternatives. Manually tweaking table schema is not
>> viable as there are simply too many types.
>>
>> One of the field in the source type is `java.util.Date`. I tried to
>> convert the obtained table to a datastream with Table.toAppendStream.
>> When I ran
>> `tEnv.from("rawEvent").select('_isComplete).toAppendStream[(Boolean)].print()`,
>> the following exception occurred.
>>
>> Exception in thread "main" org.apache.flink.table.api.TableException:
>> Type is not supported: Date
>> at
>> org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:350)
>> at
>> org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromTypeInfo(FlinkTypeFactory.scala:63)
>> at
>> org.apache.flink.table.calcite.FlinkTypeFactory.$anonfun$buildLogicalRowType$1(FlinkTypeFactory.scala:201)
>> at
>> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>> at
>> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>> at
>> org.apache.flink.table.calcite.FlinkTypeFactory.buildLogicalRowType(FlinkTypeFactory.scala:198)
>> at
>> org.apache.flink.table.plan.schema.TableSourceTable.getRowType(TableSourceTable.scala:96)
>> at
>> org.apache.calcite.prepare.CalciteCatalogReader.getTable(CalciteCatalogReader.java:131)
>> at
>> org.apache.calcite.prepare.CalciteCatalogReader.getTableForMember(CalciteCatalogReader.java:228)
>> at
>> org.apache.calcite.prepare.CalciteCatalogReader.getTableForMember(CalciteCatalogReader.java:84)
>> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1068)
>> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1094)
>> at
>> org.apache.flink.table.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:268)
>> at
>> org.apache.flink.table.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:134)
>> at
>> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
>> at
>> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:131)
>> at
>> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:111)
>> at
>> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:91)
>> at
>> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
>> at
>> org.apache.flink.table.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:130)
>> at java.util.Collections$SingletonList.forEach(Collections.java:4824)
>> at
>> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:130)
>> at
>> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:111)
>> at
>> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:46)
>> at
>> org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
>> at
>> 

Re: Convert flink table with field of type RAW to datastream

2020-06-18 Thread YI
Hi Jark,

Thank you for your suggestion. My current problem is that there are quite a few 
data types. All these data types are defined upstream which I have no control.
I don't think I can easily change the type information of a specific field. Can 
I? Things become nasty when there are so many `java.util.Date` I need to change.

The reason I want to use flink table is that it allows me to easily join 
several tables. As an alternative, I think I can use stream join operator.
My only complaint is that it become tedious when I want to join more than once. 
I think I need to define all the intermediate data types.

Best,
Yi

‐‐‐ Original Message ‐‐‐
On Thursday, June 18, 2020 12:11 PM, Jark Wu  wrote:

> Hi YI,
>
> Flink doesn't have a TypeInformation for `java.util.Date`, but only 
> SqlTimeTypeInfo.DATE for `java.sql.Date`.
> That's why the TypeInformation.of(java.util.Date) is being recognized as a 
> RAW type.
>
> To resolve your problem, I think in `TypeInformation.of(..)` you should use a 
> concrete type for `java.util.Date`, e.g. `java.sql.Timestamp`, 
> `java.sql.Date`, `java.sql.Time`.
>
> Best,
> Jark
>
> On Thu, 18 Jun 2020 at 10:32, YI  wrote:
>
>> Hi all,
>>
>> I am using flink to process external data. The source format is json, and 
>> the underlying data types are defined in a external library.
>> I generated table schema with `TableSchema.fromTypeInfo` and 
>> `TypeInformation.of[_]`. From what I read, this method is deprecated.
>> But I didn't find any alternatives. Manually tweaking table schema is not 
>> viable as there are simply too many types.
>>
>> One of the field in the source type is `java.util.Date`. I tried to convert 
>> the obtained table to a datastream with Table.toAppendStream.
>> When I ran 
>> `tEnv.from("rawEvent").select('_isComplete).toAppendStream[(Boolean)].print()`,
>>  the following exception occurred.
>>
>> Exception in thread "main" org.apache.flink.table.api.TableException: Type 
>> is not supported: Date
>> at 
>> org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:350)
>> at 
>> org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromTypeInfo(FlinkTypeFactory.scala:63)
>> at 
>> org.apache.flink.table.calcite.FlinkTypeFactory.$anonfun$buildLogicalRowType$1(FlinkTypeFactory.scala:201)
>> at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>> at 
>> org.apache.flink.table.calcite.FlinkTypeFactory.buildLogicalRowType(FlinkTypeFactory.scala:198)
>> at 
>> org.apache.flink.table.plan.schema.TableSourceTable.getRowType(TableSourceTable.scala:96)
>> at 
>> org.apache.calcite.prepare.CalciteCatalogReader.getTable(CalciteCatalogReader.java:131)
>> at 
>> org.apache.calcite.prepare.CalciteCatalogReader.getTableForMember(CalciteCatalogReader.java:228)
>> at 
>> org.apache.calcite.prepare.CalciteCatalogReader.getTableForMember(CalciteCatalogReader.java:84)
>> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1068)
>> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1094)
>> at 
>> org.apache.flink.table.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:268)
>> at 
>> org.apache.flink.table.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:134)
>> at 
>> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
>> at 
>> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:131)
>> at 
>> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:111)
>> at 
>> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:91)
>> at 
>> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
>> at 
>> org.apache.flink.table.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:130)
>> at java.util.Collections$SingletonList.forEach(Collections.java:4824)
>> at 
>> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:130)
>> at 
>> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:111)
>> at 
>> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:46)
>> at 
>> org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
>> at 
>> org.apache.flink.table.calcite.FlinkRelBuilder.tableOperation(FlinkRelBuilder.scala:106)
>> at 
>> org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:390)
>> at 
>> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:185)
>> at 
>> 

Re: Convert flink table with field of type RAW to datastream

2020-06-17 Thread Jark Wu
Hi YI,

Flink doesn't have a TypeInformation for `java.util.Date`, but
only SqlTimeTypeInfo.DATE for `java.sql.Date`.
That's why the TypeInformation.of(java.util.Date) is being recognized as a
RAW type.

To resolve your problem, I think in `TypeInformation.of(..)` you should use
a concrete type for `java.util.Date`, e.g. `java.sql.Timestamp`,
`java.sql.Date`, `java.sql.Time`.

Best,
Jark

On Thu, 18 Jun 2020 at 10:32, YI  wrote:

> Hi all,
>
> I am using flink to process external data. The source format is json, and
> the underlying data types are defined in a external library.
> I generated table schema with `TableSchema.fromTypeInfo` and
> `TypeInformation.of[_]`. From what I read, this method is deprecated.
> But I didn't find any alternatives. Manually tweaking table schema is not
> viable as there are simply too many types.
>
> One of the field in the source type is `java.util.Date`. I tried to
> convert the obtained table to a datastream with Table.toAppendStream.
> When I ran
> `tEnv.from("rawEvent").select('_isComplete).toAppendStream[(Boolean)].print()`,
> the following exception occurred.
>
> Exception in thread "main" org.apache.flink.table.api.TableException: Type
> is not supported: Date
> at
> org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:350)
> at
> org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromTypeInfo(FlinkTypeFactory.scala:63)
> at
> org.apache.flink.table.calcite.FlinkTypeFactory.$anonfun$buildLogicalRowType$1(FlinkTypeFactory.scala:201)
> at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> at
> org.apache.flink.table.calcite.FlinkTypeFactory.buildLogicalRowType(FlinkTypeFactory.scala:198)
> at
> org.apache.flink.table.plan.schema.TableSourceTable.getRowType(TableSourceTable.scala:96)
> at
> org.apache.calcite.prepare.CalciteCatalogReader.getTable(CalciteCatalogReader.java:131)
> at
> org.apache.calcite.prepare.CalciteCatalogReader.getTableForMember(CalciteCatalogReader.java:228)
> at
> org.apache.calcite.prepare.CalciteCatalogReader.getTableForMember(CalciteCatalogReader.java:84)
> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1068)
> at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1094)
> at
> org.apache.flink.table.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:268)
> at
> org.apache.flink.table.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:134)
> at
> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
> at
> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:131)
> at
> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:111)
> at
> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:91)
> at
> org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
> at
> org.apache.flink.table.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:130)
> at java.util.Collections$SingletonList.forEach(Collections.java:4824)
> at
> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:130)
> at
> org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:111)
> at
> org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:46)
> at
> org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
> at
> org.apache.flink.table.calcite.FlinkRelBuilder.tableOperation(FlinkRelBuilder.scala:106)
> at
> org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:390)
> at
> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:185)
> at
> org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:117)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at scala.collection.TraversableLike.map(TraversableLike.scala:273)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
> at scala.collection.AbstractTraversable.map(Traversable.scala:108)
> at
> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117)
> at
> 

Convert flink table with field of type RAW to datastream

2020-06-17 Thread YI
Hi all,

I am using flink to process external data. The source format is json, and the 
underlying data types are defined in a external library.
I generated table schema with `TableSchema.fromTypeInfo` and 
`TypeInformation.of[_]`. From what I read, this method is deprecated.
But I didn't find any alternatives. Manually tweaking table schema is not 
viable as there are simply too many types.

One of the field in the source type is `java.util.Date`. I tried to convert the 
obtained table to a datastream with Table.toAppendStream.
When I ran 
`tEnv.from("rawEvent").select('_isComplete).toAppendStream[(Boolean)].print()`, 
the following exception occurred.

Exception in thread "main" org.apache.flink.table.api.TableException: Type is 
not supported: Date
at 
org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:350)
at 
org.apache.flink.table.calcite.FlinkTypeFactory.createTypeFromTypeInfo(FlinkTypeFactory.scala:63)
at 
org.apache.flink.table.calcite.FlinkTypeFactory.$anonfun$buildLogicalRowType$1(FlinkTypeFactory.scala:201)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at 
org.apache.flink.table.calcite.FlinkTypeFactory.buildLogicalRowType(FlinkTypeFactory.scala:198)
at 
org.apache.flink.table.plan.schema.TableSourceTable.getRowType(TableSourceTable.scala:96)
at 
org.apache.calcite.prepare.CalciteCatalogReader.getTable(CalciteCatalogReader.java:131)
at 
org.apache.calcite.prepare.CalciteCatalogReader.getTableForMember(CalciteCatalogReader.java:228)
at 
org.apache.calcite.prepare.CalciteCatalogReader.getTableForMember(CalciteCatalogReader.java:84)
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1068)
at org.apache.calcite.tools.RelBuilder.scan(RelBuilder.java:1094)
at 
org.apache.flink.table.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:268)
at 
org.apache.flink.table.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:134)
at 
org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
at 
org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:131)
at 
org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:111)
at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:91)
at 
org.apache.flink.table.operations.CatalogQueryOperation.accept(CatalogQueryOperation.java:69)
at 
org.apache.flink.table.plan.QueryOperationConverter.lambda$defaultMethod$0(QueryOperationConverter.java:130)
at java.util.Collections$SingletonList.forEach(Collections.java:4824)
at 
org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:130)
at 
org.apache.flink.table.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:111)
at 
org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:46)
at 
org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)
at 
org.apache.flink.table.calcite.FlinkRelBuilder.tableOperation(FlinkRelBuilder.scala:106)
at 
org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:390)
at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:185)
at 
org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:117)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:273)
at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at 
org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:210)
at 
org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.scala:107)
at 
org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:101)
at io.redacted.test.package$.testJoin(package.scala:31)
at io.redacted.test.package$.process(package.scala:26)
at io.redacted.DataAggregator$.main(DataAggregator.scala:15)
at io.redacted.DataAggregator.main(DataAggregator.scala)