Re: DataStream.keyBy() with keys determined at run time
You can use string, and serialize all keys to a string. Hemanga Borah 于2022年7月11日周一 09:49写道: > > Here is the documentation of the Tuple class: > https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/java/tuple/Tuple.html > > If you need a concrete class, you can go from Tuple0 to Tuple25. > > On Sun, Jul 10, 2022 at 5:43 PM Thomas Wang wrote: >> >> I didn't copy the exact error message, but basically the idea of the error >> message is that I cannot use the abstract class Tuple and instead, I should >> use Tuple1, Tuple2 and etc. >> >> Thomas >> >> On Sun, Jul 10, 2022 at 12:47 PM Hemanga Borah >> wrote: >>> >>> What error do you see? >>> >>> On Sun, Jul 10, 2022 at 6:30 AM Thomas Wang wrote: Hi, I have a use case where I need to call DataStream.keyBy() with keys loaded from a configuration. The number of keys and their data types are variables and is determined by the configuration. Once the configuration is loaded, they won't change. I'm trying to use the following key selector, but it looks like I cannot use Tuple as the key type here. Is there any way I can work around this as the rest of the logic of my application is the same. Thank you! public class SimpleRecordKeySelector implements KeySelector Thomas
Re: DataStream.keyBy() with keys determined at run time
Here is the documentation of the Tuple class: https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/java/tuple/Tuple.html If you need a concrete class, you can go from Tuple0 to Tuple25. On Sun, Jul 10, 2022 at 5:43 PM Thomas Wang wrote: > I didn't copy the exact error message, but basically the idea of the error > message is that I cannot use the abstract class Tuple and instead, I should > use Tuple1, Tuple2 and etc. > > Thomas > > On Sun, Jul 10, 2022 at 12:47 PM Hemanga Borah > wrote: > >> What error do you see? >> >> On Sun, Jul 10, 2022 at 6:30 AM Thomas Wang wrote: >> >>> Hi, >>> >>> I have a use case where I need to call DataStream.keyBy() with keys >>> loaded from a configuration. The number of keys and their data types are >>> variables and is determined by the configuration. Once the configuration is >>> loaded, they won't change. I'm trying to use the following key selector, >>> but it looks like I cannot use Tuple as the key type here. Is there any way >>> I can work around this as the rest of the logic of my application is the >>> same. Thank you! >>> >>> public class SimpleRecordKeySelector implements >>> KeySelector >>> >>> Thomas >>> >>>
Re: How can I convert a DataSet into a Table?
I'm afraid we have no way to do such conversion in Flink 1.15. But for you case, which is to read from csv file in table api. You can try as follows: tableEnv.createTemporaryTable(" csvInput ", TableDescriptor.forConnector("filesystem") .schema(schema) .option("path", "/path/to/file") .format(FormatDescriptor.forFormat("csv") .option("field-delimiter", "|") .build()) .build()) Table table1 = tEnv.from(" csvInput ").xxx See more in the Flink doc[1] [1]: [ https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/common/#table-api | https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/common/#table-api ] Best regards, Yuxia 发件人: "podunk" 收件人: "User" 发送时间: 星期三, 2022年 7 月 06日 上午 5:09:54 主题: How can I convert a DataSet into a Table? My code is: package flinkTest2; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class flinkTest2 { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // read a CSV file with five fields, taking only two of them DataSet> csvInput = env.readCsvFile("c:/CSV/file") .includeFields("10010") // take the first and the fourth field .types(String.class, Double.class); //register and create table EnvironmentSettings settings = EnvironmentSettings .newInstance() //.inStreamingMode() .inBatchMode() .build(); TableEnvironment tEnv = TableEnvironment.create(settings); //Insert CSV content into table, define column names and read some rows from it } } What to do create table, insert DataSet csvInput into table and read some rows from it (into text file)? Thanks for help Mike
Re: Does Table API connector, csv, has some option to ignore some columns
Hi. In Flink SQL, you can select the column that you wants in the query. For example, you can use ``` SELECT col_a, col_b FROM some_table; ``` Best, Shengkai 于2022年7月9日周六 01:48写道: > Does Table API connector, CSV, has some option to ignore some columns in > source file? > For instance read only first, second, nine... but not the others? > > Or any other trick? > > CREATE TABLE some_table ( > some_id BIGINT, > ...) WITH ( > 'format' = 'csv', > ...) > > > > >
????
tearsriver 2667822...@qq.com
Re: DataStream.keyBy() with keys determined at run time
I didn't copy the exact error message, but basically the idea of the error message is that I cannot use the abstract class Tuple and instead, I should use Tuple1, Tuple2 and etc. Thomas On Sun, Jul 10, 2022 at 12:47 PM Hemanga Borah wrote: > What error do you see? > > On Sun, Jul 10, 2022 at 6:30 AM Thomas Wang wrote: > >> Hi, >> >> I have a use case where I need to call DataStream.keyBy() with keys >> loaded from a configuration. The number of keys and their data types are >> variables and is determined by the configuration. Once the configuration is >> loaded, they won't change. I'm trying to use the following key selector, >> but it looks like I cannot use Tuple as the key type here. Is there any way >> I can work around this as the rest of the logic of my application is the >> same. Thank you! >> >> public class SimpleRecordKeySelector implements >> KeySelector >> >> Thomas >> >>
Re: DataStream.keyBy() with keys determined at run time
What error do you see? On Sun, Jul 10, 2022 at 6:30 AM Thomas Wang wrote: > Hi, > > I have a use case where I need to call DataStream.keyBy() with keys loaded > from a configuration. The number of keys and their data types are variables > and is determined by the configuration. Once the configuration is loaded, > they won't change. I'm trying to use the following key selector, but it > looks like I cannot use Tuple as the key type here. Is there any way I can > work around this as the rest of the logic of my application is the same. > Thank you! > > public class SimpleRecordKeySelector implements > KeySelector > > Thomas > >
Re: [ANNOUNCE] Apache Flink 1.15.1 released
I run Flink in Windows and in version 1.15.1 Task Managers fails to start. Works without problems in 1.14.5 Sent: Friday, July 08, 2022 at 12:18 AM From: "David Anderson" To: "dev" , "user" , "user-zh" , annou...@apache.org Subject: [ANNOUNCE] Apache Flink 1.15.1 released The Apache Flink community is very happy to announce the release of Apache Flink 1.15.1, which is the first bugfix release for the Apache Flink 1.15 series. Apache Flink® is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streaming applications. The release is available for download at: https://flink.apache.org/downloads.html Please check out the release blog post for an overview of the improvements for this bugfix release: https://flink.apache.org/news/2022/07/06/release-1.15.1.html The full release notes are available in Jira: https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351546 We would like to thank all contributors of the Apache Flink community who made this release possible! Regards, David Anderson
DataStream.keyBy() with keys determined at run time
Hi, I have a use case where I need to call DataStream.keyBy() with keys loaded from a configuration. The number of keys and their data types are variables and is determined by the configuration. Once the configuration is loaded, they won't change. I'm trying to use the following key selector, but it looks like I cannot use Tuple as the key type here. Is there any way I can work around this as the rest of the logic of my application is the same. Thank you! public class SimpleRecordKeySelector implements KeySelector Thomas
Issues with watermark alignment in Flink 1.15
Hi All I am trying watermark alignment in Flink 1.15 with: watermarkStrategy = WatermarkStrategy.<~>forBoundedOutOfOrderness( Duration.ofMillis(outOfOrderness)) .withWatermarkAlignment("wm-group", Duration.ofSeconds(10), Duration.ofSeconds(1)) .withTimestampAssigner( (element, timestamp) -> element.getTimestamp()) .withIdleness(Duration.ofSeconds(1)); And got the following in DEBUG logs: 2022-07-10 06:53:35,713 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing maxAllowedWatermark=-9223372036854765808 to subTaskIds=[] 2022-07-10 06:53:36,606 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 1657436016036 (2022-07-10 06:53:36.036) from subTaskId=2 2022-07-10 06:53:36,619 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 1657435956048 (2022-07-10 06:52:36.048) from subTaskId=1 2022-07-10 06:53:36,639 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 1657436016034 (2022-07-10 06:53:36.034) from subTaskId=3 2022-07-10 06:53:36,702 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 1657436016053 (2022-07-10 06:53:36.053) from subTaskId=0 2022-07-10 06:53:36,713 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing maxAllowedWatermark=1657435966048 to subTaskIds=[0, 1, 2, 3] 2022-07-10 06:53:37,229 DEBUG shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - Update lock acquire time to keep lease 2022-07-10 06:53:37,237 DEBUG shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - TryAcquireOrRenew return success 2022-07-10 06:53:37,237 DEBUG shaded.io.kubernetes.client.extended.leaderelection.LeaderElector [] - Successfully renewed lease 2022-07-10 06:53:37,603 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from subTaskId=2 2022-07-10 06:53:37,605 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from subTaskId=3 2022-07-10 06:53:37,616 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from subTaskId=1 2022-07-10 06:53:37,630 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from subTaskId=0 2022-07-10 06:53:37,713 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing maxAllowedWatermark=-9223372036854765809 to subTaskIds=[0, 1, 2, 3] 2022-07-10 06:53:38,603 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from subTaskId=2 2022-07-10 06:53:38,604 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from subTaskId=3 2022-07-10 06:53:38,616 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from subTaskId=1 2022-07-10 06:53:38,630 DEBUG org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - New reported watermark=Watermark @ 9223372036854775807 (292278994-08-17 07:12:55.807) from subTaskId=0 2022-07-10 06:53:38,713 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Distributing maxAllowedWatermark=-9223372036854765809 to subTaskIds=[0, 1, 2, 3] Then it stays with maxAllowedWatermark=-9223372036854765809 all the time. The watermark looks to be correct at beginning, then changed to a something related Long.MAX_VALUE… Feels like a buffer overflow issue.. As long as I remove the call .withWatermarkAlignment(), then all worked fine. Any idea? Thanks Jun