Re: custom table source, how to support json?

2022-06-13 Thread Dian Fu
Hi Ivan,

Is your question how to parse the JSON string in PyFlink? If so, maybe you
could take a look at this [1].

Regards,
Dian

[1]
https://stackoverflow.com/questions/71820015/how-to-reference-nested-json-within-pyflink-sql-when-json-schema-varies

On Fri, Jun 10, 2022 at 7:40 PM ivan.ros...@agilent.com <
ivan.ros...@agilent.com> wrote:

> Hello,
>
>
>
> I have a flink table source working using
>
>
>
> """
>
> create table source (
>
> ts TIMESTAMP(3),
>
> log_line STRING,
>
> WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
>
> ) with (
>
> 'connector'='lokitail', 'query'='blah', 'url'='blah'
>
> )
>
> """)
>
>
>
> It uses a simple custom table source, which collects rows like this:
>
>
>
> ctx.collect(GenericRowData.of(
>
>
> TimestampData.fromEpochMillis(Instant.now().toEpochMilli()),
>
> StringData.fromString("field0_counter_" + count++))
>
> )
>
>
>
> I would like, instead, to just send a single JSON string, like:
>
>
>
> ctx.collect(GenericRowData.of(
>
> StringData.fromString("{\"value\" : \"field0_counter_" +
> count++ + "\", \"ts\":\"" + Instant.now().toEpochMilli() + "\"}"))
>
> );
>
>
>
> And handle parsing in python flink.  Can this be done simply at the point
> of collecting the row data?
>
>
>
> Thank you,
>
>
> Ivan
>


custom table source, how to support json?

2022-06-10 Thread ivan.ros...@agilent.com
Hello,

I have a flink table source working using

"""
create table source (
ts TIMESTAMP(3),
log_line STRING,
WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
) with (
'connector'='lokitail', 'query'='blah', 'url'='blah'
)
""")

It uses a simple custom table source, which collects rows like this:

ctx.collect(GenericRowData.of(
TimestampData.fromEpochMillis(Instant.now().toEpochMilli()),
StringData.fromString("field0_counter_" + count++))
)

I would like, instead, to just send a single JSON string, like:

ctx.collect(GenericRowData.of(
StringData.fromString("{\"value\" : \"field0_counter_" + 
count++ + "\", \"ts\":\"" + Instant.now().toEpochMilli() + "\"}"))
);

And handle parsing in python flink.  Can this be done simply at the point of 
collecting the row data?

Thank you,

Ivan