Hi Dian,

Thanks, much appreciated.

Kind regards,

John

Sent from my iPhone

On 27 Jun 2022, at 03:43, Dian Fu <dian0511...@gmail.com> wrote:


Hi John,

This seems like a bug and I have created a ticket 
https://issues.apache.org/jira/browse/FLINK-28253 to track it.

For now, you could try replacing to_data_stream with to_append_stream` to see 
if it works.

Regards,
Dian

On Sat, Jun 25, 2022 at 4:07 AM John Tipper 
<john_tip...@hotmail.com<mailto:john_tip...@hotmail.com>> wrote:
Hi,

I have a source table using a Kinesis connector reading events from AWS 
EventBridge using PyFlink 1.15.0. An example of the sorts of data that are in 
this stream is here: 
https://docs.aws.amazon.com/codebuild/latest/userguide/sample-build-notifications.html#sample-build-notifications-ref.
 Note that the stream of data contains many different types of events, where 
the 'detail' field is completely different between different event types. There 
is no support for this connector using PyFlink DataStream API, so I use the 
Table API to construct the source table.  The table looks like this:


CREATE TABLE events (
     `id` VARCHAR,
     `source` VARCHAR,
     `account` VARCHAR,
     `region` VARCHAR,
     `detail-type` VARCHAR,
     `detail` VARCHAR,
     `source` VARCHAR,
     `resources` VARCHAR,
     `time` TIMESTAMP(0) WITH LOCAL TIME ZONE,
     WATERMARK FOR `time` as `time` - INTERVAL '30' SECOND,
     PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
...
)


The table was created using:

 table_env.execute_sql(CREATE_STRING_ABOVE)

I'd like to turn this table into a data stream so I can perform some processing 
that is easier to do in the DataStream API:


events_stream_table = table_env.from_path('events')

events_stream = table_env.to_data_stream(events_stream_table)

# now do some processing - let's filter by the type of event we get

codebuild_stream = events_stream.filter(
    lambda event: event['source'] == 'aws.codebuild'
)

# now do other stuff on a stream containing only events that are identical in 
shape
...
# maybe convert back into a Table and perform SQL on the data


When I run this, I get an exception:



org.apache.flink.table.api.TableException: Unsupported conversion from data type

 'TIMESTAMP(6) WITH TIME ZONE' (conversion class: java.time.OffsetDateTime) to

type information. Only data types that originated from type information fully

support a reverse conversion.

Somebody reported a similar error here 
(https://stackoverflow.com/questions/58936529/using-jdbctablesource-with-streamtableenvironment-gives-classcastexception)
 When I try the suggestion there and replace the "TIMESTAMP(0) WITH LOCAL TIME 
ZONE" with a "TIMESTAMP(3)" I get a different exception:

TypeError: The java type info: LocalDateTime is not supported in PyFlink 
currently.

Is there a way of converting this Table into a DataStream (and then back 
again)? I need to use the data in the "time"​ field as the source of watermarks 
for my events.

Many thanks,

John

Reply via email to