Re: S3 file source parallelism reverting to 1
Hi Steve, When you call env.readFile(…), internally env creates: ContinuousFileMonitoringFunction monitoringFunction = new ContinuousFileMonitoringFunction<>(inputFormat, monitoringMode, getParallelism(), interval); ContinuousFileReaderOperator reader = new ContinuousFileReaderOperator<>(inputFormat); SingleOutputStreamOperator source = addSource(monitoringFunction, sourceName) .transform("Split Reader: " + sourceName, typeInfo, reader); return new DataStreamSource<>(source); ContinuousFileMonitoringFunction is RichSource function (not parallel), so it has parallelism 1. On each execution (controlled by monitor parameter), it builds recursive tree of all new file splits which are passed to ContinuousFileReaderOperator. ContinuousFileReaderOperator in parallel process each filesplit. From: Steve Whelan Date: Tuesday, 11 August 2020 at 04:48 To: user Subject: S3 file source parallelism reverting to 1 Hi, I have an S3 file source being consumed as FileProcessingMode.PROCESS_CONTINUOUSLY with a parallelism of 3. I can confirm the parallelism is set by printing it out. However, in the UI, the file source has a parallelism of 1. I'm not changing it after its being initially set. DataStream s = env.readFile( new JsonInputFormat(...), filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 5, myTypeInformation) .setParallelism(3); System.out.println(s.getParallelism()); // prints 3 The DataStreamSource is a parallel operator otherwise `setParallelism(3)` would throw an `IllegalArgumentException`[1]. The only other thing I do with the DataStream is register it with the TableEnvironment. tableEnvironment.registerDataStream("my_table", dataStream); Is Flink resetting it to 1 for some reason? I'm running v1.9.0. Thanks, Steve [1] https://github.com/apache/flink/blob/release-1.9.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java#L53
Re: multiple kafka topics
Hi Aissa, 1. To join 3 streams you can chain 2 coflatmap functions: https://stackoverflow.com/questions/54277910/how-do-i-join-two-streams-in-apache-flink 1. If your aggregation function can be also applied partially you can chain 2 joins: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/joining.html 1. create tables from each stream and make one sql join: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#joins It is important to understand what type of join is suits to your scenario, could one of streams miss some data + max delay. From: Aissa Elaffani Date: Monday, 10 August 2020 at 05:11 To: Subject: multiple kafka topics Hello Guys, I am working on a Flink application, in which I consume data from Apache Kafka, the data is published in three topics of the cluster, and I need to read from them, I suppose I can create three FlikKafkaConsumer constructors. The data I am consuming is in the same format {Id_sensor:, Id_equipement, Date:, Value{...}, ...}, the problem is the "Value" field changes from topic to topic, in fact in the first topic I have the temperature as a value "Value":{"temperature":26} , the second topic contains oil data as a value "Value":{"oil_data":26}, the third topic the value field is "Value": {"Pitch":, "Roll", "Yaw"}. So I created three FlinkKafkaConsumer, and I defined three DeserializationSchema for each data of a topic, the problem is I want to do some aggregations on those data all together in order to apply a function. So I am wondering if It is a problem to join the three streams together in one stream and then do my aggregation by a field, and then apply the function, and finally sink it. and if so, am I going to have a problem sinking the data, because actually as I explained the value field is different from topic to another. Can anyone give me an explanation Please, I would be so grateful. Thank you for your time !! Aissa
Re: Flink 1.11.0 UDAF fails with "No match found for function signature fun()"
Hi Timo, Thank you for your time and your help! Described approach works. On 29/07/2020, 10:22, "Timo Walther" wrote: Hi Dmytro, I would not recommend to use internal functions from `org.apache.flink.table.planner.functions.aggfunctions`. They are called by a slightly different stack that might cause this exception. Instead you can use the testing functions in `org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions`. Your examples work in my 1.11 branch: @Test public void testWithCreateFunction() { initInput(); String functionClass = "org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions$WeightedAvg"; String createFunQuery = String.format("CREATE TEMPORARY FUNCTION a AS '%s'", functionClass); tableEnv.executeSql(createFunQuery); tableEnv.createTemporaryView("B", tableEnv.from("A") .groupBy($("symbol")) .select($("symbol"), call("a", $("price").cast(DataTypes.INT()), 12)) ); Table res = tableEnv.from("B"); res.execute().print(); } @Test public void testWithRegisterFunction() { initInput(); String functionClass = "org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions$WeightedAvg"; String createFunQuery = String.format("CREATE TEMPORARY FUNCTION a AS '%s'", functionClass); tableEnv.executeSql(createFunQuery); Table res = tableEnv.sqlQuery("select a(CAST(price AS INT), 12) as max_price from A group by symbol"); res.execute().print(); } Regards, Timo On 28.07.20 17:20, Dmytro Dragan wrote: > Hi Timo, > > I have switched to 1.11.1. > > Create function using "create function ..." fails with magic: > Caused by: java.lang.IndexOutOfBoundsException: Index: 110, Size: 0 > at java.util.ArrayList.rangeCheck(ArrayList.java:657) > at java.util.ArrayList.get(ArrayList.java:433) > at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) > at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) > at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) > at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:148) > at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43) > at org.apache.flink.table.dataview.MapViewSerializer.deserialize(MapViewSerializer.java:96) > at org.apache.flink.table.dataview.MapViewSerializer.deserialize(MapViewSerializer.java:46) > at org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:536) > at org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:63) > at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:709) > at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:714) > at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:401) > at org.apache.flink.table.data.util.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1392) > at org.apache.flink.table.data.util.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1356) > at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:383) > at SortAggregateWithKeys$79.processElement(Unknown Source) > > "No match found for function signature fun()" still exist. > Mentioned bug was about TableFunction, so maybe it is something different, but related. > > I have created a small github project with both cases: > https://github.com/dmytroDragan/flink-1.11-sql-agg-issue/blob/master/src/test/java/lets/test/flink/AggFunTest.java > I would appreciate if you could take a look. > > > On 27/07/2020, 16:49, "Timo Walther" wrote: > > Hi Dmytro, > > aggregate functions will support the new type system in Flink 1.12. > Until then, they cannot be used with the new `call()` syntax as >
Re: Flink 1.11.0 UDAF fails with "No match found for function signature fun()"
Hi Timo, I have switched to 1.11.1. Create function using "create function ..." fails with magic: Caused by: java.lang.IndexOutOfBoundsException: Index: 110, Size: 0 at java.util.ArrayList.rangeCheck(ArrayList.java:657) at java.util.ArrayList.get(ArrayList.java:433) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346) at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:148) at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43) at org.apache.flink.table.dataview.MapViewSerializer.deserialize(MapViewSerializer.java:96) at org.apache.flink.table.dataview.MapViewSerializer.deserialize(MapViewSerializer.java:46) at org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:536) at org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:63) at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:709) at org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:714) at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:401) at org.apache.flink.table.data.util.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1392) at org.apache.flink.table.data.util.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1356) at org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:383) at SortAggregateWithKeys$79.processElement(Unknown Source) "No match found for function signature fun()" still exist. Mentioned bug was about TableFunction, so maybe it is something different, but related. I have created a small github project with both cases: https://github.com/dmytroDragan/flink-1.11-sql-agg-issue/blob/master/src/test/java/lets/test/flink/AggFunTest.java I would appreciate if you could take a look. On 27/07/2020, 16:49, "Timo Walther" wrote: Hi Dmytro, aggregate functions will support the new type system in Flink 1.12. Until then, they cannot be used with the new `call()` syntax as anonymous functions. In order to use the old type system, you need to register the function explicilty using SQL `CREATE FUNCTION a AS 'myFunc'` and then use them in `call("myFunc", ...)`. The mentioned "No match found for function signature fun()" was a bug that got fixed in 1.11.1: https://issues.apache.org/jira/browse/FLINK-18520 This bug only exists for catalog functions, not temporary system functions. Regards, Timo On 27.07.20 16:35, Dmytro Dragan wrote: > Hi All, > > I see strange behavior of UDAF functions: > > Let`s say we have a simple table: > > EnvironmentSettings settings = > EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build(); > TableEnvironment t = TableEnvironment./create/(settings); > > Table table = t.fromValues(DataTypes./ROW/( > DataTypes./FIELD/("price", DataTypes./DOUBLE/().notNull()), > DataTypes./FIELD/("symbol", DataTypes./STRING/().notNull()) > ), > /row/(1.0, "S"), /row/(2.0, "S")); > t.createTemporaryView("A", table); > > As example we will use build-in function with a new name: > > t.createTemporaryFunction("max_value", new > MaxWithRetractAggFunction.DoubleMaxWithRetractAggFunction()); > > Using Table API we can write: > > t.createTemporaryView("B", table > .groupBy(/$/("symbol")) > .select(/$/("symbol"),/call/("max_value", /$/("price"))) > ); > > and get: > > org.apache.flink.table.api.TableException: Aggregate functions are not > updated to the new type system yet. > > Using SQL API we can write: > > t.createTemporaryView("B", t.sqlQuery("select max_value(price) from A > group by symbol")); > > and get: > > org.apache.flink.table.api.ValidationException: SQL validation failed. > From line 1, column 8 to l
Flink 1.11.0 UDAF fails with "No match found for function signature fun()"
Hi All, I see strange behavior of UDAF functions: Let`s say we have a simple table: EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment t = TableEnvironment.create(settings); Table table = t.fromValues(DataTypes.ROW( DataTypes.FIELD("price", DataTypes.DOUBLE().notNull()), DataTypes.FIELD("symbol", DataTypes.STRING().notNull()) ), row(1.0, "S"), row(2.0, "S")); t.createTemporaryView("A", table); As example we will use build-in function with a new name: t.createTemporaryFunction("max_value", new MaxWithRetractAggFunction.DoubleMaxWithRetractAggFunction()); Using Table API we can write: t.createTemporaryView("B", table .groupBy($("symbol")) .select($("symbol"),call("max_value", $("price"))) ); and get: org.apache.flink.table.api.TableException: Aggregate functions are not updated to the new type system yet. Using SQL API we can write: t.createTemporaryView("B", t.sqlQuery("select max_value(price) from A group by symbol")); and get: org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 8 to line 1, column 23: No match found for function signature max_value() Calling build-in max function instead of provided alias will produce correct results. In addition, non-retract implementation of max function (MaxAggFunction.DoubleMaxAggFunction) would produce: org.apache.flink.table.api.ValidationException: Could not register temporary catalog function 'default_catalog.default_database.max_value' due to implementation errors. Cause DoubleMaxAggFunction is not serializable. Am I missing something?
Re: Flink 1.11 Simple pipeline (data stream -> table with egg -> data stream) failed
Hi Timo, Thank you for response. Well, it was working. We have a number of pipelines in production which reuse DataStream and Table API parts on Flink 1.10, both for stream and batch. The same that simple case without aggregation would work in Flink 1.11 But let`s assume there are some incompatible changes and such approach would not work anymore. In case of TableEnvironment there is no way to create/retract stream. I would assume that it is possible to wrapped stream in bounded StreamTableSource/ StreamTableSink and use deprecated TableEnvironment methods to register them, but I`m wonder if there is a better way to do it. It sounds a quite strange that with having Blink planner which optimise DataStream pipelines for stream and batch jobs, there is necessity to write the same things on DataStream and DataSet API. On 24/07/2020, 15:36, "Timo Walther" wrote: Hi Dmytro, `StreamTableEnvironment` does not support batch mode currently. Only `TableEnvironment` supports the unified story. I saw that you disabled the check in the `create()` method. This check exists for a reason. For batch execution, the planner sets specific properties on the stream graph that the StreamExecutionEnvironment cannot handle (e.g. blocking inputs). My guess would be that this is the reason for your exception. Have you tried to use the unified `TableEnvironment`? Regards, Timo On 23.07.20 15:14, Dmytro Dragan wrote: > Hi All, > > We are working on migration existing pipelines from Flink 1.10 to Flink > 1.11. > > We are using Blink planner and have unified pipelines which can be used > in stream and batch mode. > > Stream pipelines works as expected, but batch once fail on Flink 1.11 if > they have any table aggregation transformation. > > Simple example of failed pipeline: > > StreamExecutionEnvironment env = > StreamExecutionEnvironment./getExecutionEnvironment/(); > env.setStreamTimeCharacteristic(TimeCharacteristic./ProcessingTime/); > > TableConfig tableConfig = new TableConfig(); > tableConfig.setIdleStateRetentionTime( > org.apache.flink.api.common.time.Time./minutes/(10), > org.apache.flink.api.common.time.Time./minutes/(30) > ); > EnvironmentSettings settings = > EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build(); > > // is created using work around with ignoring settings.isStreamingMode() > check > StreamTableEnvironment tEnv = /create/(env, settings, tableConfig); > > DataStreamSource streamSource = env.fromCollection(/asList/(new > A("1"), new A("2"))); > > Table table = tEnv.fromDataStream(streamSource); > tEnv.createTemporaryView("A", table); > > String sql = "select s from A group by s"; > > tEnv > .toRetractStream(tEnv.sqlQuery(sql), Row.class) > .flatMap(new RetractFlatMap()) > .map(Row::toString) > .addSink(new TestSinkFunction<>()); > > env.execute(""); > > /values/.forEach(System./out/::println); > > Exception: > > Caused by: java.lang.IllegalStateException: Trying to consume an input > partition whose producer is not ready (result type: BLOCKING, partition > consumable: false, producer state: DEPLOYING, partition id: > 9eb6904501e90d90797a264aeb95a7c2#0@9c8833afe58af5854324c882252c267b). > > at > org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.handleConsumedPartitionShuffleDescriptorErrors(TaskDeploymentDescriptorFactory.java:242) > > … > > Adding StreamTableEnvironment execute does not help. > > Could you please advise what I`m missing? >
Flink 1.11 Simple pipeline (data stream -> table with egg -> data stream) failed
Hi All, We are working on migration existing pipelines from Flink 1.10 to Flink 1.11. We are using Blink planner and have unified pipelines which can be used in stream and batch mode. Stream pipelines works as expected, but batch once fail on Flink 1.11 if they have any table aggregation transformation. Simple example of failed pipeline: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); TableConfig tableConfig = new TableConfig(); tableConfig.setIdleStateRetentionTime( org.apache.flink.api.common.time.Time.minutes(10), org.apache.flink.api.common.time.Time.minutes(30) ); EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); // is created using work around with ignoring settings.isStreamingMode() check StreamTableEnvironment tEnv = create(env, settings, tableConfig); DataStreamSource streamSource = env.fromCollection(asList(new A("1"), new A("2"))); Table table = tEnv.fromDataStream(streamSource); tEnv.createTemporaryView("A", table); String sql = "select s from A group by s"; tEnv .toRetractStream(tEnv.sqlQuery(sql), Row.class) .flatMap(new RetractFlatMap()) .map(Row::toString) .addSink(new TestSinkFunction<>()); env.execute(""); values.forEach(System.out::println); Exception: Caused by: java.lang.IllegalStateException: Trying to consume an input partition whose producer is not ready (result type: BLOCKING, partition consumable: false, producer state: DEPLOYING, partition id: 9eb6904501e90d90797a264aeb95a7c2#0@9c8833afe58af5854324c882252c267b). at org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.handleConsumedPartitionShuffleDescriptorErrors(TaskDeploymentDescriptorFactory.java:242) … Adding StreamTableEnvironment execute does not help. Could you please advise what I`m missing?
Re: Writing to S3 parquet files in Blink batch mode. Flink 1.10
Hi Jingsong, Thank you for detailed clarification. Best regards, Dmytro Dragan | dd...@softserveinc.com | Lead Big Data Engineer | Big Data & Analytics | SoftServe From: Jingsong Li Sent: Thursday, June 18, 2020 4:58:22 AM To: Dmytro Dragan Cc: user@flink.apache.org Subject: Re: Writing to S3 parquet files in Blink batch mode. Flink 1.10 Hi Dmytro, Yes, Batch mode must disabled checkpoint, So StreamingFileSink can not be used in batch mode (StreamingFileSink requires checkpoint whatever formats), we are refactoring it to more generic, and can be used in batch mode, but this is a future topic. Currently, in batch mode, for sink, we must use `OutputFormat` with `FinalizeOnMaster` instead of `SinkFunction`. We should implement the file committing in the method of `FinalizeOnMaster`. If you have enough time, you can implement a custom `OutputFormat`, it is complicated. Now the status quo is: - For 1.10, blink batch support writing to the hive table, if you can convert your table to a hive table with parquet and S3, it can be. [1] - For 1.11, there is a new connector named `filesystem connector`, [2], you can define a table with parquet and S3, and writing to the table by SQL. - For 1.11, moreover, both hive and filesystem connector support streaming writing by built-in reusing StreamingFileSink. [1]https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/hive/read_write_hive.html#writing-to-hive [2]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html Best, Jingsong On Tue, Jun 16, 2020 at 10:50 PM Dmytro Dragan mailto:dd...@softserveinc.com>> wrote: Hi guys, In our use case we consider to write data to AWS S3 in parquet format using Blink Batch mode. As far as I see from one side to write parquet file valid approach is to use StreamingFileSink with Parquet bulk-encoded format, but Based to documentation and tests it works only with OnCheckpointRollingPolicy. While Blink Batch mode requires disabled checkpoint. Has anyone faced with similar issue? -- Best, Jingsong Lee
Writing to S3 parquet files in Blink batch mode. Flink 1.10
Hi guys, In our use case we consider to write data to AWS S3 in parquet format using Blink Batch mode. As far as I see from one side to write parquet file valid approach is to use StreamingFileSink with Parquet bulk-encoded format, but Based to documentation and tests it works only with OnCheckpointRollingPolicy. While Blink Batch mode requires disabled checkpoint. Has anyone faced with similar issue?
Registering UDAF in blink batch app
Hi All, Could you please tell how to register custom Aggregation function in blink batch app? In case of streaming mode: We create EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings); Which has: void registerFunction(String name, AggregateFunction aggregateFunction); But in case of batchMode, we need to create TableEnvironment: EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); tEnv = TableEnvironment.create(bsSettings); Which does not have this function to register AggregationFunction, only Scalar one. Details: Flink 1.10, Java API