Re: DataStream to Table API

2021-08-23 Thread Matthias Broecheler
Perfect, that worked.

Thanks a lot, JING!

On Sun, Aug 22, 2021 at 1:25 AM JING ZHANG  wrote:

> Hi Matthias,
> Before the bug is fixed, you could specify the return type explicitly in
> the second parameter of the map function.
>
> DataStream rows = integers.map(i -> Row.of("Name"+i, i));   ->
>
> DataStream rows = integers.map(i -> Row.of("Name"+i, i), new 
> RowTypeInfo(Types.STRING, Types.INT));
>
> Best,
> JING ZHANG
>
>
>
> Matthias Broecheler  于2021年8月21日周六 上午12:40写道:
>
>> Thank you, Caizhi, for looking into this and identifying the source of
>> the bug. Is there a way to work around this at the API level until this bug
>> is resolved? Can I somehow "inject" the type?
>>
>> Thanks a lot for your help,
>> Matthias
>>
>> On Thu, Aug 19, 2021 at 10:15 PM Caizhi Weng 
>> wrote:
>>
>>> Hi!
>>>
>>> I've created a JIRA ticket[1] for this issue. Please check it out and
>>> track the progress there.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-23885
>>>
>>> Caizhi Weng  于2021年8月20日周五 上午10:47写道:
>>>
 Hi!

 This is because TypeExtractor#getMapReturnTypes are not dealing with
 row types (see that method and also TypeExtractor#privateGetForClass). You
 might want to open a JIRA ticket for this.

 Matthias Broecheler  于2021年8月20日周五 上午7:01写道:

> Hey Flinkers,
>
> I am trying to follow the docs
> 
>  to
> convert a DataStream to a Table. Specifically, I have a DataStream of Row
> and want the columns of the row to become the columns of the resulting
> table.
>
> That works but only if I construct the Rows statically. If I construct
> them dynamically (in a map) then Flink turns the entire Row into one 
> column
> of type "RAW('org.apache.flink.types.Row', '...')".
>
> Does anybody know why this is the case or how to fix it? Take a look
> at the simple Flink program below where I construct the DataStream "rows"
> in two different ways. I would expect those to be identical (and the sink
> does print identical information) but the inferred table schema is
> different.
>
> Thanks a ton,
> Matthias
>
> --
>
> StreamExecutionEnvironment flinkEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>
> DataStream integers = flinkEnv.fromElements(12, 5);
>
> DataStream rows = integers.map(i -> Row.of("Name"+i, i));
>
> //  This alternative way of constructing this data stream produces the 
> expected table schema
> //  DataStream rows = flinkEnv.fromElements(Row.of("Name12", 
> 12), Row.of("Name5", 5));
>
> StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(flinkEnv);
> Table table = tableEnv.fromDataStream(rows);
> table.printSchema();
>
> rows.addSink(new PrintSinkFunction<>());
>
> flinkEnv.execute();
>
>


Re: DataStream to Table API

2021-08-22 Thread JING ZHANG
Hi Matthias,
Before the bug is fixed, you could specify the return type explicitly in
the second parameter of the map function.

DataStream rows = integers.map(i -> Row.of("Name"+i, i));   ->

DataStream rows = integers.map(i -> Row.of("Name"+i, i), new
RowTypeInfo(Types.STRING, Types.INT));

Best,
JING ZHANG



Matthias Broecheler  于2021年8月21日周六 上午12:40写道:

> Thank you, Caizhi, for looking into this and identifying the source of the
> bug. Is there a way to work around this at the API level until this bug is
> resolved? Can I somehow "inject" the type?
>
> Thanks a lot for your help,
> Matthias
>
> On Thu, Aug 19, 2021 at 10:15 PM Caizhi Weng  wrote:
>
>> Hi!
>>
>> I've created a JIRA ticket[1] for this issue. Please check it out and
>> track the progress there.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-23885
>>
>> Caizhi Weng  于2021年8月20日周五 上午10:47写道:
>>
>>> Hi!
>>>
>>> This is because TypeExtractor#getMapReturnTypes are not dealing with row
>>> types (see that method and also TypeExtractor#privateGetForClass). You
>>> might want to open a JIRA ticket for this.
>>>
>>> Matthias Broecheler  于2021年8月20日周五 上午7:01写道:
>>>
 Hey Flinkers,

 I am trying to follow the docs
 
  to
 convert a DataStream to a Table. Specifically, I have a DataStream of Row
 and want the columns of the row to become the columns of the resulting
 table.

 That works but only if I construct the Rows statically. If I construct
 them dynamically (in a map) then Flink turns the entire Row into one column
 of type "RAW('org.apache.flink.types.Row', '...')".

 Does anybody know why this is the case or how to fix it? Take a look at
 the simple Flink program below where I construct the DataStream "rows" in
 two different ways. I would expect those to be identical (and the sink does
 print identical information) but the inferred table schema is different.

 Thanks a ton,
 Matthias

 --

 StreamExecutionEnvironment flinkEnv = 
 StreamExecutionEnvironment.getExecutionEnvironment();
 flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);

 DataStream integers = flinkEnv.fromElements(12, 5);

 DataStream rows = integers.map(i -> Row.of("Name"+i, i));

 //  This alternative way of constructing this data stream produces the 
 expected table schema
 //  DataStream rows = flinkEnv.fromElements(Row.of("Name12", 12), 
 Row.of("Name5", 5));

 StreamTableEnvironment tableEnv = 
 StreamTableEnvironment.create(flinkEnv);
 Table table = tableEnv.fromDataStream(rows);
 table.printSchema();

 rows.addSink(new PrintSinkFunction<>());

 flinkEnv.execute();




Re: DataStream to Table API

2021-08-20 Thread Matthias Broecheler
Thank you, Caizhi, for looking into this and identifying the source of the
bug. Is there a way to work around this at the API level until this bug is
resolved? Can I somehow "inject" the type?

Thanks a lot for your help,
Matthias

On Thu, Aug 19, 2021 at 10:15 PM Caizhi Weng  wrote:

> Hi!
>
> I've created a JIRA ticket[1] for this issue. Please check it out and
> track the progress there.
>
> [1] https://issues.apache.org/jira/browse/FLINK-23885
>
> Caizhi Weng  于2021年8月20日周五 上午10:47写道:
>
>> Hi!
>>
>> This is because TypeExtractor#getMapReturnTypes are not dealing with row
>> types (see that method and also TypeExtractor#privateGetForClass). You
>> might want to open a JIRA ticket for this.
>>
>> Matthias Broecheler  于2021年8月20日周五 上午7:01写道:
>>
>>> Hey Flinkers,
>>>
>>> I am trying to follow the docs
>>> 
>>>  to
>>> convert a DataStream to a Table. Specifically, I have a DataStream of Row
>>> and want the columns of the row to become the columns of the resulting
>>> table.
>>>
>>> That works but only if I construct the Rows statically. If I construct
>>> them dynamically (in a map) then Flink turns the entire Row into one column
>>> of type "RAW('org.apache.flink.types.Row', '...')".
>>>
>>> Does anybody know why this is the case or how to fix it? Take a look at
>>> the simple Flink program below where I construct the DataStream "rows" in
>>> two different ways. I would expect those to be identical (and the sink does
>>> print identical information) but the inferred table schema is different.
>>>
>>> Thanks a ton,
>>> Matthias
>>>
>>> --
>>>
>>> StreamExecutionEnvironment flinkEnv = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>>
>>> DataStream integers = flinkEnv.fromElements(12, 5);
>>>
>>> DataStream rows = integers.map(i -> Row.of("Name"+i, i));
>>>
>>> //  This alternative way of constructing this data stream produces the 
>>> expected table schema
>>> //  DataStream rows = flinkEnv.fromElements(Row.of("Name12", 12), 
>>> Row.of("Name5", 5));
>>>
>>> StreamTableEnvironment tableEnv = 
>>> StreamTableEnvironment.create(flinkEnv);
>>> Table table = tableEnv.fromDataStream(rows);
>>> table.printSchema();
>>>
>>> rows.addSink(new PrintSinkFunction<>());
>>>
>>> flinkEnv.execute();
>>>
>>>


Re: DataStream to Table API

2021-08-19 Thread Caizhi Weng
Hi!

I've created a JIRA ticket[1] for this issue. Please check it out and track
the progress there.

[1] https://issues.apache.org/jira/browse/FLINK-23885

Caizhi Weng  于2021年8月20日周五 上午10:47写道:

> Hi!
>
> This is because TypeExtractor#getMapReturnTypes are not dealing with row
> types (see that method and also TypeExtractor#privateGetForClass). You
> might want to open a JIRA ticket for this.
>
> Matthias Broecheler  于2021年8月20日周五 上午7:01写道:
>
>> Hey Flinkers,
>>
>> I am trying to follow the docs
>> 
>>  to
>> convert a DataStream to a Table. Specifically, I have a DataStream of Row
>> and want the columns of the row to become the columns of the resulting
>> table.
>>
>> That works but only if I construct the Rows statically. If I construct
>> them dynamically (in a map) then Flink turns the entire Row into one column
>> of type "RAW('org.apache.flink.types.Row', '...')".
>>
>> Does anybody know why this is the case or how to fix it? Take a look at
>> the simple Flink program below where I construct the DataStream "rows" in
>> two different ways. I would expect those to be identical (and the sink does
>> print identical information) but the inferred table schema is different.
>>
>> Thanks a ton,
>> Matthias
>>
>> --
>>
>> StreamExecutionEnvironment flinkEnv = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>
>> DataStream integers = flinkEnv.fromElements(12, 5);
>>
>> DataStream rows = integers.map(i -> Row.of("Name"+i, i));
>>
>> //  This alternative way of constructing this data stream produces the 
>> expected table schema
>> //  DataStream rows = flinkEnv.fromElements(Row.of("Name12", 12), 
>> Row.of("Name5", 5));
>>
>> StreamTableEnvironment tableEnv = 
>> StreamTableEnvironment.create(flinkEnv);
>> Table table = tableEnv.fromDataStream(rows);
>> table.printSchema();
>>
>> rows.addSink(new PrintSinkFunction<>());
>>
>> flinkEnv.execute();
>>
>>


Re: DataStream to Table API

2021-08-19 Thread Caizhi Weng
Hi!

This is because TypeExtractor#getMapReturnTypes are not dealing with row
types (see that method and also TypeExtractor#privateGetForClass). You
might want to open a JIRA ticket for this.

Matthias Broecheler  于2021年8月20日周五 上午7:01写道:

> Hey Flinkers,
>
> I am trying to follow the docs
> 
>  to
> convert a DataStream to a Table. Specifically, I have a DataStream of Row
> and want the columns of the row to become the columns of the resulting
> table.
>
> That works but only if I construct the Rows statically. If I construct
> them dynamically (in a map) then Flink turns the entire Row into one column
> of type "RAW('org.apache.flink.types.Row', '...')".
>
> Does anybody know why this is the case or how to fix it? Take a look at
> the simple Flink program below where I construct the DataStream "rows" in
> two different ways. I would expect those to be identical (and the sink does
> print identical information) but the inferred table schema is different.
>
> Thanks a ton,
> Matthias
>
> --
>
> StreamExecutionEnvironment flinkEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>
> DataStream integers = flinkEnv.fromElements(12, 5);
>
> DataStream rows = integers.map(i -> Row.of("Name"+i, i));
>
> //  This alternative way of constructing this data stream produces the 
> expected table schema
> //  DataStream rows = flinkEnv.fromElements(Row.of("Name12", 12), 
> Row.of("Name5", 5));
>
> StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(flinkEnv);
> Table table = tableEnv.fromDataStream(rows);
> table.printSchema();
>
> rows.addSink(new PrintSinkFunction<>());
>
> flinkEnv.execute();
>
>


DataStream to Table API

2021-08-19 Thread Matthias Broecheler
Hey Flinkers,

I am trying to follow the docs

to
convert a DataStream to a Table. Specifically, I have a DataStream of Row
and want the columns of the row to become the columns of the resulting
table.

That works but only if I construct the Rows statically. If I construct them
dynamically (in a map) then Flink turns the entire Row into one column of
type "RAW('org.apache.flink.types.Row', '...')".

Does anybody know why this is the case or how to fix it? Take a look at the
simple Flink program below where I construct the DataStream "rows" in two
different ways. I would expect those to be identical (and the sink does
print identical information) but the inferred table schema is different.

Thanks a ton,
Matthias

--

StreamExecutionEnvironment flinkEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);

DataStream integers = flinkEnv.fromElements(12, 5);

DataStream rows = integers.map(i -> Row.of("Name"+i, i));

//  This alternative way of constructing this data stream produces the
expected table schema
//  DataStream rows = flinkEnv.fromElements(Row.of("Name12",
12), Row.of("Name5", 5));

StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(flinkEnv);
Table table = tableEnv.fromDataStream(rows);
table.printSchema();

rows.addSink(new PrintSinkFunction<>());

flinkEnv.execute();


Re: DataStream to Table Api idioms

2017-11-06 Thread Seth Wiesman
Not a problem, thanks for the quick feedback.

https://issues.apache.org/jira/browse/FLINK-7999

Seth Wiesman

From: Fabian Hueske <fhue...@gmail.com>
Date: Monday, November 6, 2017 at 9:14 AM
To: Seth Wiesman <swies...@mediamath.com>
Cc: user <user@flink.apache.org>
Subject: Re: DataStream to Table Api idioms

Hi Seth,

I think the Table API is not there yet to address you use case.

1. Allowed lateness cannot be configured but it is on the list of features that 
we plan to add in the future.
2. Custom triggers are not supported. We are planning to add an option to 
support your use case (early firing and updates).
3. The window joins that will be release with 1.4 require constant boundaries 
(left.time > right.time - X and left.time < right.time + Y).
Variable join window boundaries have not been considered yet and would be quite 
tricky to implement. Would you mind opening a JIRA issue for this feature?

Best, Fabian

2017-11-06 14:56 GMT+01:00 Seth Wiesman 
<swies...@mediamath.com<mailto:swies...@mediamath.com>>:
Hi,

I am experimenting with rewriting some of my datastream projects with the table 
api and I had some questions on how to express certain idioms. I am using 
1.4-SNAPSHOT.


1)   Can I express allowed lateness?

2)   Can I use a custom trigger? More specifically, I have a 24hr window 
but would like to receive partial results say every hour.

3)   Do window join time intervals have to be constant or can they depend 
on row attributes. I am running campaigns that have start and end dates and so 
I would like my join window to be that interval.

Thank you,

Seth Wiesman




Re: DataStream to Table Api idioms

2017-11-06 Thread Fabian Hueske
Hi Seth,

I think the Table API is not there yet to address you use case.

1. Allowed lateness cannot be configured but it is on the list of features
that we plan to add in the future.
2. Custom triggers are not supported. We are planning to add an option to
support your use case (early firing and updates).
3. The window joins that will be release with 1.4 require constant
boundaries (left.time > right.time - X and left.time < right.time + Y).
Variable join window boundaries have not been considered yet and would be
quite tricky to implement. Would you mind opening a JIRA issue for this
feature?

Best, Fabian

2017-11-06 14:56 GMT+01:00 Seth Wiesman :

> Hi,
>
>
>
> I am experimenting with rewriting some of my datastream projects with the
> table api and I had some questions on how to express certain idioms. I am
> using 1.4-SNAPSHOT.
>
>
>
> 1)   Can I express allowed lateness?
>
> 2)   Can I use a custom trigger? More specifically, I have a 24hr
> window but would like to receive partial results say every hour.
>
> 3)   Do window join time intervals have to be constant or can they
> depend on row attributes. I am running campaigns that have start and end
> dates and so I would like my join window to be that interval.
>
>
>
> Thank you,
>
>
>
> Seth Wiesman
>
>
>


DataStream to Table Api idioms

2017-11-06 Thread Seth Wiesman
Hi,

I am experimenting with rewriting some of my datastream projects with the table 
api and I had some questions on how to express certain idioms. I am using 
1.4-SNAPSHOT.


1)   Can I express allowed lateness?

2)   Can I use a custom trigger? More specifically, I have a 24hr window 
but would like to receive partial results say every hour.

3)   Do window join time intervals have to be constant or can they depend 
on row attributes. I am running campaigns that have start and end dates and so 
I would like my join window to be that interval.

Thank you,

Seth Wiesman