Re: DataStream.keyBy() with keys determined at run time

2022-07-10 Thread yidan zhao
You can use string, and serialize all keys to a string.

Hemanga Borah  于2022年7月11日周一 09:49写道:
>
> Here is the documentation of the Tuple class: 
> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/java/tuple/Tuple.html
>
> If you need a concrete class, you can go from Tuple0 to Tuple25.
>
> On Sun, Jul 10, 2022 at 5:43 PM Thomas Wang  wrote:
>>
>> I didn't copy the exact error message, but basically the idea of the error 
>> message is that I cannot use the abstract class Tuple and instead, I should 
>> use Tuple1, Tuple2 and etc.
>>
>> Thomas
>>
>> On Sun, Jul 10, 2022 at 12:47 PM Hemanga Borah  
>> wrote:
>>>
>>> What error do you see?
>>>
>>> On Sun, Jul 10, 2022 at 6:30 AM Thomas Wang  wrote:

 Hi,

 I have a use case where I need to call DataStream.keyBy() with keys loaded 
 from a configuration. The number of keys and their data types are 
 variables and is determined by the configuration. Once the configuration 
 is loaded, they won't change. I'm trying to use the following key 
 selector, but it looks like I cannot use Tuple as the key type here. Is 
 there any way I can work around this as the rest of the logic of my 
 application is the same. Thank you!

 public class SimpleRecordKeySelector implements 
 KeySelector

 Thomas



Re: DataStream.keyBy() with keys determined at run time

2022-07-10 Thread Hemanga Borah
Here is the documentation of the Tuple class:
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/java/tuple/Tuple.html

If you need a concrete class, you can go from Tuple0 to Tuple25.

On Sun, Jul 10, 2022 at 5:43 PM Thomas Wang  wrote:

> I didn't copy the exact error message, but basically the idea of the error
> message is that I cannot use the abstract class Tuple and instead, I should
> use Tuple1, Tuple2 and etc.
>
> Thomas
>
> On Sun, Jul 10, 2022 at 12:47 PM Hemanga Borah 
> wrote:
>
>> What error do you see?
>>
>> On Sun, Jul 10, 2022 at 6:30 AM Thomas Wang  wrote:
>>
>>> Hi,
>>>
>>> I have a use case where I need to call DataStream.keyBy() with keys
>>> loaded from a configuration. The number of keys and their data types are
>>> variables and is determined by the configuration. Once the configuration is
>>> loaded, they won't change. I'm trying to use the following key selector,
>>> but it looks like I cannot use Tuple as the key type here. Is there any way
>>> I can work around this as the rest of the logic of my application is the
>>> same. Thank you!
>>>
>>> public class SimpleRecordKeySelector implements
>>> KeySelector
>>>
>>> Thomas
>>>
>>>


Re: How can I convert a DataSet into a Table?

2022-07-10 Thread yuxia
I'm afraid we have no way to do such conversion in Flink 1.15. 

But for you case, which is to read from csv file in table api. You can try as 
follows: 

tableEnv.createTemporaryTable(" csvInput ", 
TableDescriptor.forConnector("filesystem") 
.schema(schema) 
.option("path", "/path/to/file") 
.format(FormatDescriptor.forFormat("csv") 
.option("field-delimiter", "|") 
.build()) 
.build()) 

Table table1 = tEnv.from(" csvInput ").xxx 

See more in the Flink doc[1] 
[1]: [ 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/common/#table-api
 | 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/common/#table-api
 ] 


Best regards, 
Yuxia 


发件人: "podunk"  
收件人: "User"  
发送时间: 星期三, 2022年 7 月 06日 上午 5:09:54 
主题: How can I convert a DataSet into a Table? 

My code is: 
package flinkTest2; 
import org.apache.flink.api.java.DataSet; 
import org.apache.flink.api.java.ExecutionEnvironment; 
import org.apache.flink.api.java.tuple.Tuple2; 
import org.apache.flink.table.api.EnvironmentSettings; 
import org.apache.flink.table.api.Table; 
import org.apache.flink.table.api.TableEnvironment; 
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; 
public class flinkTest2 { 
public static void main(String[] args) throws Exception { 
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 

// read a CSV file with five fields, taking only two of them 
DataSet> csvInput = env.readCsvFile("c:/CSV/file") 
.includeFields("10010") // take the first and the fourth field 
.types(String.class, Double.class); 

//register and create table 
EnvironmentSettings settings = EnvironmentSettings 
.newInstance() 
//.inStreamingMode() 
.inBatchMode() 
.build(); 
TableEnvironment tEnv = TableEnvironment.create(settings); 
//Insert CSV content into table, define column names and read some rows from it 
} 
} 
What to do create table, insert DataSet csvInput into table and read some rows 
from it (into text file)? 
Thanks for help 
Mike 



Re: Does Table API connector, csv, has some option to ignore some columns

2022-07-10 Thread Shengkai Fang
Hi.

In Flink SQL, you can select the column that you wants in the query. For
example, you can use

```
SELECT col_a, col_b FROM some_table;
```

Best,
Shengkai


 于2022年7月9日周六 01:48写道:

> Does Table API connector, CSV, has some option to ignore some columns in
> source file?
> For instance read only first, second, nine... but not the others?
>
> Or any other trick?
>
> CREATE TABLE some_table (
>   some_id BIGINT,
>   ...) WITH (
>  'format' = 'csv',
>  ...)
>
>
>
>
>


????

2022-07-10 Thread tears river



tearsriver
2667822...@qq.com





Re: DataStream.keyBy() with keys determined at run time

2022-07-10 Thread Thomas Wang
I didn't copy the exact error message, but basically the idea of the error
message is that I cannot use the abstract class Tuple and instead, I should
use Tuple1, Tuple2 and etc.

Thomas

On Sun, Jul 10, 2022 at 12:47 PM Hemanga Borah 
wrote:

> What error do you see?
>
> On Sun, Jul 10, 2022 at 6:30 AM Thomas Wang  wrote:
>
>> Hi,
>>
>> I have a use case where I need to call DataStream.keyBy() with keys
>> loaded from a configuration. The number of keys and their data types are
>> variables and is determined by the configuration. Once the configuration is
>> loaded, they won't change. I'm trying to use the following key selector,
>> but it looks like I cannot use Tuple as the key type here. Is there any way
>> I can work around this as the rest of the logic of my application is the
>> same. Thank you!
>>
>> public class SimpleRecordKeySelector implements
>> KeySelector
>>
>> Thomas
>>
>>


Re: DataStream.keyBy() with keys determined at run time

2022-07-10 Thread Hemanga Borah
What error do you see?

On Sun, Jul 10, 2022 at 6:30 AM Thomas Wang  wrote:

> Hi,
>
> I have a use case where I need to call DataStream.keyBy() with keys loaded
> from a configuration. The number of keys and their data types are variables
> and is determined by the configuration. Once the configuration is loaded,
> they won't change. I'm trying to use the following key selector, but it
> looks like I cannot use Tuple as the key type here. Is there any way I can
> work around this as the rest of the logic of my application is the same.
> Thank you!
>
> public class SimpleRecordKeySelector implements
> KeySelector
>
> Thomas
>
>


Re: [ANNOUNCE] Apache Flink 1.15.1 released

2022-07-10 Thread podunk
I run Flink in Windows and in version 1.15.1 Task Managers fails to start.

Works without problems in 1.14.5

 
 

Sent: Friday, July 08, 2022 at 12:18 AM
From: "David Anderson" 
To: "dev" , "user" , "user-zh" , annou...@apache.org
Subject: [ANNOUNCE] Apache Flink 1.15.1 released






 



The Apache Flink community is very happy to announce the release of Apache Flink 1.15.1, which is the first bugfix release for the Apache Flink 1.15 series.
 
Apache Flink® is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streaming applications.
 
The release is available for download at:
https://flink.apache.org/downloads.html
 
Please check out the release blog post for an overview of the improvements for this bugfix release:

    https://flink.apache.org/news/2022/07/06/release-1.15.1.html
 
The full release notes are available in Jira:

     https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351546


We would like to thank all contributors of the Apache Flink community who made this release possible!
 
Regards,
David Anderson






 

 


DataStream.keyBy() with keys determined at run time

2022-07-10 Thread Thomas Wang
Hi,

I have a use case where I need to call DataStream.keyBy() with keys loaded
from a configuration. The number of keys and their data types are variables
and is determined by the configuration. Once the configuration is loaded,
they won't change. I'm trying to use the following key selector, but it
looks like I cannot use Tuple as the key type here. Is there any way I can
work around this as the rest of the logic of my application is the same.
Thank you!

public class SimpleRecordKeySelector implements
KeySelector

Thomas


Issues with watermark alignment in Flink 1.15

2022-07-10 Thread Jun Qin
Hi All

I am trying watermark alignment in Flink 1.15 with:

watermarkStrategy = WatermarkStrategy.<~>forBoundedOutOfOrderness(
Duration.ofMillis(outOfOrderness))
.withWatermarkAlignment("wm-group", Duration.ofSeconds(10), 
Duration.ofSeconds(1))
.withTimestampAssigner(
(element, timestamp) -> element.getTimestamp())
.withIdleness(Duration.ofSeconds(1));

And got the following in DEBUG logs:
2022-07-10 06:53:35,713 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing 
maxAllowedWatermark=-9223372036854765808 to subTaskIds=[]
2022-07-10 06:53:36,606 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 1657436016036 (2022-07-10 06:53:36.036) from subTaskId=2
2022-07-10 06:53:36,619 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 1657435956048 (2022-07-10 06:52:36.048) from subTaskId=1
2022-07-10 06:53:36,639 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 1657436016034 (2022-07-10 06:53:36.034) from subTaskId=3
2022-07-10 06:53:36,702 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 1657436016053 (2022-07-10 06:53:36.053) from subTaskId=0
2022-07-10 06:53:36,713 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing 
maxAllowedWatermark=1657435966048 to subTaskIds=[0, 1, 2, 3]
2022-07-10 06:53:37,229 DEBUG 
shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - Update 
lock acquire time to keep lease
2022-07-10 06:53:37,237 DEBUG 
shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - 
TryAcquireOrRenew return success
2022-07-10 06:53:37,237 DEBUG 
shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - 
Successfully renewed lease
2022-07-10 06:53:37,603 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from 
subTaskId=2
2022-07-10 06:53:37,605 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from 
subTaskId=3
2022-07-10 06:53:37,616 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from 
subTaskId=1
2022-07-10 06:53:37,630 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from 
subTaskId=0
2022-07-10 06:53:37,713 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing 
maxAllowedWatermark=-9223372036854765809 to subTaskIds=[0, 1, 2, 3]
2022-07-10 06:53:38,603 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from 
subTaskId=2
2022-07-10 06:53:38,604 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from 
subTaskId=3
2022-07-10 06:53:38,616 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from 
subTaskId=1
2022-07-10 06:53:38,630 DEBUG 
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported 
watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from 
subTaskId=0
2022-07-10 06:53:38,713 INFO  
org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing 
maxAllowedWatermark=-9223372036854765809 to subTaskIds=[0, 1, 2, 3]


Then it stays with maxAllowedWatermark=-9223372036854765809 all the time. The 
watermark looks to be correct at beginning, then changed to a something related 
Long.MAX_VALUE… Feels like a buffer overflow issue..

As long as I remove the call .withWatermarkAlignment(), then all worked fine.

Any idea? 

Thanks
Jun