Re: Write JSON string to JDBC as JSONB?
There may well be a better way, but I've been able to achieve this by executing the following in the DB: CREATE CAST(VARCHAR AS JSONB) WITH INOUT AS IMPLICIT; I think you can also probably do CREATE CAST(VARCHAR AS JSONB) WITHOUT FUNCTION AS IMPLICIT; However, I am working in AWS and AWS does not give the owner of the database sufficient permissions to execute this. From: John Tipper Sent: 24 July 2022 22:01 To: user@flink.apache.org Subject: Write JSON string to JDBC as JSONB? Hi all, I am using PyFlink and SQL and have a JSON string that I wish to write to a JDBC sink (it's a Postgresql DB). I'd like to write that string to a column that is of type JSONB (or even JSON). I'm getting exceptions when Flink tries to write the column: Batch entry 0 INSERT INTO my_table(my_jsonb_column) VALUES ('{ ... a valid JSON string...}') ... ... Error: column "my_jsonb_column" is of type jsonb but expression is of type character varying Hint: You will need to rewrite or cast the expression My table sink in Flink is defined like this: CREATE TABLE my_table( my_jsonb_column STRING ) WITH ( 'connector' = 'jdbc' ) and in my actual SQL database the column my_jsonb_column is actually defined of type JSONB. I can happily execute SQL statements myself like this without an error where I write a string to the JSONB column: INSERT INTO my_table(my_jsonb_column) VALUES ('{ ... a valid JSON string...}') Is there any way of having Flink write to a JSONB column or otherwise get that JSON string into a JSONB column? Many thanks, John
Write JSON string to JDBC as JSONB?
Hi all, I am using PyFlink and SQL and have a JSON string that I wish to write to a JDBC sink (it's a Postgresql DB). I'd like to write that string to a column that is of type JSONB (or even JSON). I'm getting exceptions when Flink tries to write the column: Batch entry 0 INSERT INTO my_table(my_jsonb_column) VALUES ('{ ... a valid JSON string...}') ... ... Error: column "my_jsonb_column" is of type jsonb but expression is of type character varying Hint: You will need to rewrite or cast the expression My table sink in Flink is defined like this: CREATE TABLE my_table( my_jsonb_column STRING ) WITH ( 'connector' = 'jdbc' ) and in my actual SQL database the column my_jsonb_column is actually defined of type JSONB. I can happily execute SQL statements myself like this without an error where I write a string to the JSONB column: INSERT INTO my_table(my_jsonb_column) VALUES ('{ ... a valid JSON string...}') Is there any way of having Flink write to a JSONB column or otherwise get that JSON string into a JSONB column? Many thanks, John
Re: What do columns for TM memory usage in Flink UI Console mean?
Sorry, pressed send too early. What is the unit of measure for "count" and does this tell me I have too little Direct Memory and if so, what do I do to specifically increase this number? Many thanks, John ____ From: John Tipper Sent: 20 July 2022 17:52 To: user@flink.apache.org Subject: What do columns for TM memory usage in Flink UI Console mean? Hi all, I can't find mention of what the columns mean for the "Outside JVM Memory' for the Task Manager in the Flink console. I have: Count UsedCapacity Direct4,203 227 MB 227MB Mapped0 0 B 0 B What is the unit of measure for "Count"?
What do columns for TM memory usage in Flink UI Console mean?
Hi all, I can't find mention of what the columns mean for the "Outside JVM Memory' for the Task Manager in the Flink console. I have: Count UsedCapacity Direct4,203 227 MB 227MB Mapped0 0 B 0 B What is the unit of measure for "Count"?
Re: PyFlink SQL: force maximum use of slots
Hi Dian, Thanks very much - I suppose the concept I'm struggling with is understanding how parallelism works when using SQL. I understand that in the Datastream world parallelism means each slot will get a subset of events. However, how does that work in the SQL world where you need to do joins between tables? If the events in tables A and B are being processed in parallel, then does this not mean that the slots will have only some events for those tables A and B in any given slot? How does Flink ensure consistency of results irrespective of the parallelism used, or does it just copy all events to all slots, in which case I don't understand how parallelism assists? Many thanks, John From: Dian Fu Sent: 20 July 2022 05:19 To: John Tipper Cc: user@flink.apache.org Subject: Re: PyFlink SQL: force maximum use of slots Hi John, All the operators in the same slot sharing group will be put in one slot. The slot sharing group is only configurable in DataStream API [1]. Usually you don't need to set the slot sharing group explicitly [2] and this is good to share the resource between the operators running in the same slot. If the performance becomes a problem, you could just increase the parallelism or the resource(CPU or Memory) of the TM. For example, if the parallelism is set to 2, you will see that there will be two running slots and each slot containing all the operators. Regards, Dian [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/operators/overview/#set-slot-sharing-group [2] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/concepts/flink-architecture/#task-slots-and-resources On Tue, Jul 19, 2022 at 12:23 AM John Tipper mailto:john_tip...@hotmail.com>> wrote: Hi all, Is there a way of forcing a pipeline to use as many slots as possible? I have a program in PyFlink using SQL and the Table API and currently all of my pipeline is using just a single slot. I've tried this: StreamExecutionEnvironment.get_execution_environment().disable_operator_chaining() I did have a pipeline which had 17 different tasks running in just a single slot, but all this does is give me 79 different operators but all are still running in a single slot. Is there a way to get Flink to run different jobs in different slots whilst using the Table API and SQL? Many thanks, John
PyFlink SQL: force maximum use of slots
Hi all, Is there a way of forcing a pipeline to use as many slots as possible? I have a program in PyFlink using SQL and the Table API and currently all of my pipeline is using just a single slot. I've tried this: StreamExecutionEnvironment.get_execution_environment().disable_operator_chaining() I did have a pipeline which had 17 different tasks running in just a single slot, but all this does is give me 79 different operators but all are still running in a single slot. Is there a way to get Flink to run different jobs in different slots whilst using the Table API and SQL? Many thanks, John
Re: PyFlink and parallelism
Thanks very much, given that I'm using SQL it doesn't look like I'm able to access the operators to be able to change the parallelism without dropping from the table API into the datastream API and then back again. In any case, conversion between a tablesteam and a datastream is broken if the tablestream contains a timestamp, which I reported a little while ago and Dian filed as FLINK-28253. Kind regards, John From: Juntao Hu Sent: 18 July 2022 04:13 To: John Tipper Cc: user@flink.apache.org Subject: Re: PyFlink and parallelism It's not the issue with Python-Java object conversion, you get a DataStream rather than SingleOutputStreamOperator underlying the Python DataStream wrapper after calling `to_data_stream`, and `setParallelism` is only available on SingleOutputStreamOperator. To work around this, change `set_parallelism` to your processing operator, e.g. `filtered_stream.map(...).set_parallelism`. John Tipper mailto:john_tip...@hotmail.com>> 于2022年7月16日周六 17:07写道: I've tried this and can see there appears to be a bigger problem with PyFlink and a call to set_parallelism(): events_table = table_env.from_path(MY_SOURCE_TABLE) filtered_table = events_table.filter( col("event_type") == "event_of_interest" ) filtered_stream = table_env.to_data_stream(filtered_table) # fetch desired parallelism from config, don't hardcode filtered_stream.set_parallelism(int(table_env.get_config().get('my.custom.parallelism', 1))) table_env.create_temporary_view(MY_FILTERED_VIEW, filtered_stream) # now execute SQL onMY_FILTERED_VIEW table_env.execute_sql(...) I now get an error: An error occurred when calling o68.setParallelism(). Trace: org.apache.flink.api.python.shaded.py4j.Py4JException: Method setParallelism([class java.lang.Integer]) does not exist. Looks like Python is converting to the Integer object in Java and not the int primitive. I actually see this if I just call set_parallelism(1) without the call to get_config(). Is this a bug or is there a workaround? ____ From: John Tipper Sent: 15 July 2022 16:44 To: user@flink.apache.org<mailto:user@flink.apache.org> mailto:user@flink.apache.org>> Subject: PyFlink and parallelism Hi all, I have a processing topology using PyFlink and SQL where there is data skew: I'm splitting a stream of heterogenous data into separate streams based on the type of data that's in it and some of these substreams have very many more events than others and this is causing issues when checkpointing (checkpoints are timing out). I'd like to increase parallelism for these problematic streams, I'm just not sure how I do that and target just those elements. Do I need to use the datastream API here? What does this look like please? I have a table defined and I duplicate a stream from that table, then filter so that my substream has only the events I'm interested in: events_table = table_env.from_path(MY_SOURCE_TABLE) filtered_table = events_table.filter( col("event_type") == "event_of_interest" ) table_env.create_temporary_view(MY_FILTERED_VIEW, filtered_table) # now execute SQL on MY_FILTERED_VIEW table_env.execute_sql(...) The default parallelism of the overall table env is 1. Is there a way to increase the parallelism for just this stream? Many thanks, John
Re: PyFlink and parallelism
I've tried this and can see there appears to be a bigger problem with PyFlink and a call to set_parallelism(): events_table = table_env.from_path(MY_SOURCE_TABLE) filtered_table = events_table.filter( col("event_type") == "event_of_interest" ) filtered_stream = table_env.to_data_stream(filtered_table) # fetch desired parallelism from config, don't hardcode filtered_stream.set_parallelism(int(table_env.get_config().get('my.custom.parallelism', 1))) table_env.create_temporary_view(MY_FILTERED_VIEW, filtered_stream) # now execute SQL onMY_FILTERED_VIEW table_env.execute_sql(...) I now get an error: An error occurred when calling o68.setParallelism(). Trace: org.apache.flink.api.python.shaded.py4j.Py4JException: Method setParallelism([class java.lang.Integer]) does not exist. Looks like Python is converting to the Integer object in Java and not the int primitive. I actually see this if I just call set_parallelism(1) without the call to get_config(). Is this a bug or is there a workaround? ________ From: John Tipper Sent: 15 July 2022 16:44 To: user@flink.apache.org Subject: PyFlink and parallelism Hi all, I have a processing topology using PyFlink and SQL where there is data skew: I'm splitting a stream of heterogenous data into separate streams based on the type of data that's in it and some of these substreams have very many more events than others and this is causing issues when checkpointing (checkpoints are timing out). I'd like to increase parallelism for these problematic streams, I'm just not sure how I do that and target just those elements. Do I need to use the datastream API here? What does this look like please? I have a table defined and I duplicate a stream from that table, then filter so that my substream has only the events I'm interested in: events_table = table_env.from_path(MY_SOURCE_TABLE) filtered_table = events_table.filter( col("event_type") == "event_of_interest" ) table_env.create_temporary_view(MY_FILTERED_VIEW, filtered_table) # now execute SQL on MY_FILTERED_VIEW table_env.execute_sql(...) The default parallelism of the overall table env is 1. Is there a way to increase the parallelism for just this stream? Many thanks, John
PyFlink and parallelism
Hi all, I have a processing topology using PyFlink and SQL where there is data skew: I'm splitting a stream of heterogenous data into separate streams based on the type of data that's in it and some of these substreams have very many more events than others and this is causing issues when checkpointing (checkpoints are timing out). I'd like to increase parallelism for these problematic streams, I'm just not sure how I do that and target just those elements. Do I need to use the datastream API here? What does this look like please? I have a table defined and I duplicate a stream from that table, then filter so that my substream has only the events I'm interested in: events_table = table_env.from_path(MY_SOURCE_TABLE) filtered_table = events_table.filter( col("event_type") == "event_of_interest" ) table_env.create_temporary_view(MY_FILTERED_VIEW, filtered_table) # now execute SQL on MY_FILTERED_VIEW table_env.execute_sql(...) The default parallelism of the overall table env is 1. Is there a way to increase the parallelism for just this stream? Many thanks, John
Re: PyFlink: restoring from savepoint
Hi Dian, I'm using version 1.15.0 and 1.15.1 of PyFlink. I think it's to do with how the arguments are ordered, as when I run the container with: “standalone-job”, “-s”, “s3://”, “-n”, “-pym”, “foo.main” then the job starts successfully and loads from the savepoint. Many thanks, John From: Dian Fu Sent: 08 July 2022 02:27 To: John Tipper Cc: user@flink.apache.org Subject: Re: PyFlink: restoring from savepoint Hi John, Could you provide more information, e.g. the exact command submitting the job, the logs file, the PyFlink version, etc? Regards, Dian On Thu, Jul 7, 2022 at 7:53 PM John Tipper mailto:john_tip...@hotmail.com>> wrote: Hi all, I have a PyFlink job running in Kubernetes. Savepoints and checkpoints are being successfully saved to S3. However, I am unable to get the job to start from a save point. The container is started with these args: “standalone-job”, “-pym”, “foo.main”, “-s”, “s3://”, “-n” In the JM logs I can see “Starting StandaloneApplicationClusterEntrypoint…” where my arguments are listed. However, I don’t see any restore occurring in the logs and my application restarts with no state. How do I start a PyFlink job like this from a given savepoint? Many thanks, John Sent from my iPhone
Re: Restoring a job from a savepoint
Thank you all, that’s very helpful. It looks like there’s something else that’s causing my cluster to not load my savepoints, so I’ve submitted a separate query for that. Many thanks, John Sent from my iPhone On 6 Jul 2022, at 21:24, Alexander Fedulov wrote: Hi John, use $ bin/flink run -s s3://my_bucket/path/to/savepoints/ (no trailing slash, including schema). where should contain a valid _metadata file. You should see logs like this: INFO o.a.f.r.c.CheckpointCoordinator [] - Starting job foobar from savepoint s3://my_bucket/path/to/savepoints/ () INFO o.a.f.r.c.CheckpointCoordinator [] org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job foobar from Savepoint 1 @ 0 for foobar located at s3://my_bucket/path/to/savepoints/. The indication of the correct restore should be the absence of exceptions. You might see messages like this one for operators that did not have any state in the savepoint: INFO o.a.f.r.c.CheckpointCoordinator [] - Skipping empty savepoint state for operator a0f11f7a2c416beb6b7aed14be0d63ca. Best, Alexander Fedulov On Wed, Jul 6, 2022 at 9:50 PM John Tipper mailto:john_tip...@hotmail.com>> wrote: Hi all, The docs on restoring a job from a savepoint (https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#resuming-from-savepoints) state that the syntax is: $ bin/flink run -s :savepointPath [:runArgs] and where "you may give a path to either the savepoint’s directory or the _metadata file." If I am using S3 as my store of state: state.savepoints.dir: s3://my_bucket/path/to/savepoints and an example savepoint is at: s3://my_bucket/path/to/savepoints//_metadata then what am I supposed to supply to the flink run command? Is it: 1. The full path including filesystem: s3://my_bucket/path/to/savepoints//_metadata or s3://my_bucket/path/to/savepoints/ 2. or the full path: my_bucket/path/to/savepoints//_metadata or my_bucket/path/to/savepoints/ 3. or the path relative to the savepoints directory: /_metadata or If I supply a directory, do I need to specify a trailing slash? Also, is there anything that I will see in the logs that will indicate that the restore from a savepoint has been successful? Many thanks, John
PyFlink: restoring from savepoint
Hi all, I have a PyFlink job running in Kubernetes. Savepoints and checkpoints are being successfully saved to S3. However, I am unable to get the job to start from a save point. The container is started with these args: “standalone-job”, “-pym”, “foo.main”, “-s”, “s3://”, “-n” In the JM logs I can see “Starting StandaloneApplicationClusterEntrypoint…” where my arguments are listed. However, I don’t see any restore occurring in the logs and my application restarts with no state. How do I start a PyFlink job like this from a given savepoint? Many thanks, John Sent from my iPhone
Restoring a job from a savepoint
Hi all, The docs on restoring a job from a savepoint (https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#resuming-from-savepoints) state that the syntax is: $ bin/flink run -s :savepointPath [:runArgs] and where "you may give a path to either the savepoint’s directory or the _metadata file." If I am using S3 as my store of state: state.savepoints.dir: s3://my_bucket/path/to/savepoints and an example savepoint is at: s3://my_bucket/path/to/savepoints//_metadata then what am I supposed to supply to the flink run command? Is it: 1. The full path including filesystem: s3://my_bucket/path/to/savepoints//_metadata or s3://my_bucket/path/to/savepoints/ 2. or the full path: my_bucket/path/to/savepoints//_metadata or my_bucket/path/to/savepoints/ 3. or the path relative to the savepoints directory: /_metadata or If I supply a directory, do I need to specify a trailing slash? Also, is there anything that I will see in the logs that will indicate that the restore from a savepoint has been successful? Many thanks, John
Re: How to convert Table containing TIMESTAMP_LTZ into DataStream in PyFlink 1.15.0?
Hi Dian, Thanks, much appreciated. Kind regards, John Sent from my iPhone On 27 Jun 2022, at 03:43, Dian Fu wrote: Hi John, This seems like a bug and I have created a ticket https://issues.apache.org/jira/browse/FLINK-28253 to track it. For now, you could try replacing to_data_stream with to_append_stream` to see if it works. Regards, Dian On Sat, Jun 25, 2022 at 4:07 AM John Tipper mailto:john_tip...@hotmail.com>> wrote: Hi, I have a source table using a Kinesis connector reading events from AWS EventBridge using PyFlink 1.15.0. An example of the sorts of data that are in this stream is here: https://docs.aws.amazon.com/codebuild/latest/userguide/sample-build-notifications.html#sample-build-notifications-ref. Note that the stream of data contains many different types of events, where the 'detail' field is completely different between different event types. There is no support for this connector using PyFlink DataStream API, so I use the Table API to construct the source table. The table looks like this: CREATE TABLE events ( `id` VARCHAR, `source` VARCHAR, `account` VARCHAR, `region` VARCHAR, `detail-type` VARCHAR, `detail` VARCHAR, `source` VARCHAR, `resources` VARCHAR, `time` TIMESTAMP(0) WITH LOCAL TIME ZONE, WATERMARK FOR `time` as `time` - INTERVAL '30' SECOND, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( ... ) The table was created using: table_env.execute_sql(CREATE_STRING_ABOVE) I'd like to turn this table into a data stream so I can perform some processing that is easier to do in the DataStream API: events_stream_table = table_env.from_path('events') events_stream = table_env.to_data_stream(events_stream_table) # now do some processing - let's filter by the type of event we get codebuild_stream = events_stream.filter( lambda event: event['source'] == 'aws.codebuild' ) # now do other stuff on a stream containing only events that are identical in shape ... # maybe convert back into a Table and perform SQL on the data When I run this, I get an exception: org.apache.flink.table.api.TableException: Unsupported conversion from data type 'TIMESTAMP(6) WITH TIME ZONE' (conversion class: java.time.OffsetDateTime) to type information. Only data types that originated from type information fully support a reverse conversion. Somebody reported a similar error here (https://stackoverflow.com/questions/58936529/using-jdbctablesource-with-streamtableenvironment-gives-classcastexception) When I try the suggestion there and replace the "TIMESTAMP(0) WITH LOCAL TIME ZONE" with a "TIMESTAMP(3)" I get a different exception: TypeError: The java type info: LocalDateTime is not supported in PyFlink currently. Is there a way of converting this Table into a DataStream (and then back again)? I need to use the data in the "time" field as the source of watermarks for my events. Many thanks, John
How to convert Table containing TIMESTAMP_LTZ into DataStream in PyFlink 1.15.0?
Hi, I have a source table using a Kinesis connector reading events from AWS EventBridge using PyFlink 1.15.0. An example of the sorts of data that are in this stream is here: https://docs.aws.amazon.com/codebuild/latest/userguide/sample-build-notifications.html#sample-build-notifications-ref. Note that the stream of data contains many different types of events, where the 'detail' field is completely different between different event types. There is no support for this connector using PyFlink DataStream API, so I use the Table API to construct the source table. The table looks like this: CREATE TABLE events ( `id` VARCHAR, `source` VARCHAR, `account` VARCHAR, `region` VARCHAR, `detail-type` VARCHAR, `detail` VARCHAR, `source` VARCHAR, `resources` VARCHAR, `time` TIMESTAMP(0) WITH LOCAL TIME ZONE, WATERMARK FOR `time` as `time` - INTERVAL '30' SECOND, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( ... ) The table was created using: table_env.execute_sql(CREATE_STRING_ABOVE) I'd like to turn this table into a data stream so I can perform some processing that is easier to do in the DataStream API: events_stream_table = table_env.from_path('events') events_stream = table_env.to_data_stream(events_stream_table) # now do some processing - let's filter by the type of event we get codebuild_stream = events_stream.filter( lambda event: event['source'] == 'aws.codebuild' ) # now do other stuff on a stream containing only events that are identical in shape ... # maybe convert back into a Table and perform SQL on the data When I run this, I get an exception: org.apache.flink.table.api.TableException: Unsupported conversion from data type 'TIMESTAMP(6) WITH TIME ZONE' (conversion class: java.time.OffsetDateTime) to type information. Only data types that originated from type information fully support a reverse conversion. Somebody reported a similar error here (https://stackoverflow.com/questions/58936529/using-jdbctablesource-with-streamtableenvironment-gives-classcastexception) When I try the suggestion there and replace the "TIMESTAMP(0) WITH LOCAL TIME ZONE" with a "TIMESTAMP(3)" I get a different exception: TypeError: The java type info: LocalDateTime is not supported in PyFlink currently. Is there a way of converting this Table into a DataStream (and then back again)? I need to use the data in the "time" field as the source of watermarks for my events. Many thanks, John
How to use connectors in PyFlink 1.15.0 when not defined in Python API?
Hi all, There are a number of connectors which do not appear to be in the Python API v1.15.0, e.g. Kinesis. I can see that it's possible to use these connectors by using the Table API: CREATE TABLE my_table (...) WITH ('connector' = 'kinesis' ...) I guess if you wanted the stream as a DataStream you'd I guess you'd create the Table and then convert into a DataStream? Is there a way of directly instantiating these connectors in PyFlink without needed to use SQL like this (and without having to wait until v1.16)? e.g. the Java API looks like this: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>( "kinesis_stream_name", new SimpleStringSchema(), consumerConfig)); Many thanks, John
Re: Is it possible to use DataStream API keyBy followed by Table API SQL in PyFlink?
Sorry for the noise, I completely missed this part of the documentation describing exactly how to do this: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/data_stream_api/ From: John Tipper Sent: 23 June 2022 21:35 To: user@flink.apache.org Subject: Is it possible to use DataStream API keyBy followed by Table API SQL in PyFlink? Hi all, In PyFlink, is it possible to use the DataStream API to create a DataStream by means of StreamExecutionEnvironment's addSource(...), then perform transformations on this data stream using the DataStream API and then convert that stream into a form where SQL statements can be executed on it using the TableApi? It's not clear to me whether it's possible to mix and match the APIs, or under what circumstances you can (or should/should not) move between the 2. Many thanks, John
Is it possible to use DataStream API keyBy followed by Table API SQL in PyFlink?
Hi all, In PyFlink, is it possible to use the DataStream API to create a DataStream by means of StreamExecutionEnvironment's addSource(...), then perform transformations on this data stream using the DataStream API and then convert that stream into a form where SQL statements can be executed on it using the TableApi? It's not clear to me whether it's possible to mix and match the APIs, or under what circumstances you can (or should/should not) move between the 2. Many thanks, John
Flink TaskManager memory configuration failed
Hi all, I'm wanting to run quite a number of PyFlink jobs on Kubernetes, where the amount of state and number of events being processed is small and therefore I'd like to use as little memory in my clusters as possible so I can bin pack most efficiently. I'm running a Flink cluster per job. I'm currently trying to see how small I can make the memory settings. I set taskmanager.memory.process.size: 512mb in my Flink config. The container is being started with requested memory of 512Mi. When my TaskManager starts up, I get an error message: IllegalConfigurationException: Sum of configured Framework Heap Memory (128mb), Framework Off-Heap Memory (128mb), Task Off-Heap Memory (0 bytes), Managed Memory (25.6mb) and Network Memory (64mb) exceed configured Total Flink Memory (64mb). My understanding of the docs (https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/memory/mem_setup_tm/) is that I should just need to set taskmanager.memory.process.size. Where is the 64mb figure coming from which makes up the Total Flink Memory? How do I change this? Many thanks, John
Re: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s
Thanks Dian - I’m restricted to 1.13 for the moment because this is running inside AWS Kinesis. Is there a way to manually bypass that issue? J Sent from my iPhone On 17 Jun 2022, at 04:59, Dian Fu wrote: >> This error generally occurs in jobs where there are transfers between Table >> and datastream. AFAIK, this issue should have already been fixed, see https://issues.apache.org/jira/browse/FLINK-26920 and https://issues.apache.org/jira/browse/FLINK-23133 for more details. Regards, Dian On Fri, Jun 17, 2022 at 10:17 AM Xingbo Huang mailto:hxbks...@gmail.com>> wrote: Hi John, Because I can't see your code, I can only provide some possible reasons for this error: 1. This error generally occurs in jobs where there are transfers between Table and datastream. But given that you said you just used the sql + python udf, this shouldn't be the case. 2. The default value of `taskmanager.memory.managed.consumer-weights` is `OPERATOR:70,STATE_BACKEND:70,PYTHON:30`, so in your case, there is actually no need to set it to `PYTHON:30` 3. In fact, for pure sql+python udf jobs, if you don't set error value `PYTHON:0` in `taskmanager.memory.managed.consumer-weights`, I really can't think of any situation where this problem will occur. Best, Xingbo John Tipper mailto:john_tip...@hotmail.com>> 于2022年6月16日周四 19:41写道: Hi Xingbo, Yes, there are a number of temporary views being created, where each is being created using SQL (CREATE TEMPORARY VIEW ...) rather than explicit calls to the Table and DataStream APIs. Is this a good pattern or are there caveats I should be aware of please? Many thanks, John From: Xingbo Huang mailto:hxbks...@gmail.com>> Sent: 16 June 2022 12:34 To: John Tipper mailto:john_tip...@hotmail.com>> Cc: user@flink.apache.org<mailto:user@flink.apache.org> mailto:user@flink.apache.org>> Subject: Re: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s Hi John, Does your job logic include conversion between Table and DataStream? For example, methods such as `create_temporary_view(path: str, data_stream: DataStream): -> Table` are used. Best, Xingbo John Tipper mailto:john_tip...@hotmail.com>> 于2022年6月16日周四 18:31写道: Hi Xingbo, I’m afraid I can’t share my code but Flink is 1.13. The main Flink code is running inside Kinesis on AWS so I cannot change the version. Many thanks, John Sent from my iPhone On 16 Jun 2022, at 10:37, Xingbo Huang mailto:hxbks...@gmail.com>> wrote: Hi John, Could you provide the code snippet and the version of pyflink you used? Best, Xingbo John Tipper mailto:john_tip...@hotmail.com>> 于2022年6月16日周四 17:05写道: Hi all, I'm trying to run a PyFlink unit test to test some PyFlink SQL and where my code uses a Python UDF. I can't share my code but the test case is similar to the code here: https://github.com/apache/flink/blob/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/testing/test_case_utils.py When I have some simple SQL everything is fine. When I add a more complex query I get an error, which looks like it's memory related. java.lang.IllegalArgumentException: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s. It may be because the consumer type "Python" was missing or set to 0 for the config option "taskmanager.memory.managed.consumer-weights".0.0 In my test case setUp(), I try to set that value like this, but it seems to have no effect: self.t_env.get_config().get_configuration().set_string("taskmanager.memory.managed.consumer-weights", "PYTHON:30") Am I not setting it correctly, or is there something else I need to do to fix this error? Many thanks, John
Re: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s
Hi Xingbo, Yes, there are a number of temporary views being created, where each is being created using SQL (CREATE TEMPORARY VIEW ...) rather than explicit calls to the Table and DataStream APIs. Is this a good pattern or are there caveats I should be aware of please? Many thanks, John From: Xingbo Huang Sent: 16 June 2022 12:34 To: John Tipper Cc: user@flink.apache.org Subject: Re: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s Hi John, Does your job logic include conversion between Table and DataStream? For example, methods such as `create_temporary_view(path: str, data_stream: DataStream): -> Table` are used. Best, Xingbo John Tipper mailto:john_tip...@hotmail.com>> 于2022年6月16日周四 18:31写道: Hi Xingbo, I’m afraid I can’t share my code but Flink is 1.13. The main Flink code is running inside Kinesis on AWS so I cannot change the version. Many thanks, John Sent from my iPhone On 16 Jun 2022, at 10:37, Xingbo Huang mailto:hxbks...@gmail.com>> wrote: Hi John, Could you provide the code snippet and the version of pyflink you used? Best, Xingbo John Tipper mailto:john_tip...@hotmail.com>> 于2022年6月16日周四 17:05写道: Hi all, I'm trying to run a PyFlink unit test to test some PyFlink SQL and where my code uses a Python UDF. I can't share my code but the test case is similar to the code here: https://github.com/apache/flink/blob/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/testing/test_case_utils.py When I have some simple SQL everything is fine. When I add a more complex query I get an error, which looks like it's memory related. java.lang.IllegalArgumentException: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s. It may be because the consumer type "Python" was missing or set to 0 for the config option "taskmanager.memory.managed.consumer-weights".0.0 In my test case setUp(), I try to set that value like this, but it seems to have no effect: self.t_env.get_config().get_configuration().set_string("taskmanager.memory.managed.consumer-weights", "PYTHON:30") Am I not setting it correctly, or is there something else I need to do to fix this error? Many thanks, John
Re: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s
Hi Xingbo, I’m afraid I can’t share my code but Flink is 1.13. The main Flink code is running inside Kinesis on AWS so I cannot change the version. Many thanks, John Sent from my iPhone On 16 Jun 2022, at 10:37, Xingbo Huang wrote: Hi John, Could you provide the code snippet and the version of pyflink you used? Best, Xingbo John Tipper mailto:john_tip...@hotmail.com>> 于2022年6月16日周四 17:05写道: Hi all, I'm trying to run a PyFlink unit test to test some PyFlink SQL and where my code uses a Python UDF. I can't share my code but the test case is similar to the code here: https://github.com/apache/flink/blob/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/testing/test_case_utils.py When I have some simple SQL everything is fine. When I add a more complex query I get an error, which looks like it's memory related. java.lang.IllegalArgumentException: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s. It may be because the consumer type "Python" was missing or set to 0 for the config option "taskmanager.memory.managed.consumer-weights".0.0 In my test case setUp(), I try to set that value like this, but it seems to have no effect: self.t_env.get_config().get_configuration().set_string("taskmanager.memory.managed.consumer-weights", "PYTHON:30") Am I not setting it correctly, or is there something else I need to do to fix this error? Many thanks, John
The configured managed memory fraction for Python worker process must be within (0, 1], was: %s
Hi all, I'm trying to run a PyFlink unit test to test some PyFlink SQL and where my code uses a Python UDF. I can't share my code but the test case is similar to the code here: https://github.com/apache/flink/blob/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/testing/test_case_utils.py When I have some simple SQL everything is fine. When I add a more complex query I get an error, which looks like it's memory related. java.lang.IllegalArgumentException: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s. It may be because the consumer type "Python" was missing or set to 0 for the config option "taskmanager.memory.managed.consumer-weights".0.0 In my test case setUp(), I try to set that value like this, but it seems to have no effect: self.t_env.get_config().get_configuration().set_string("taskmanager.memory.managed.consumer-weights", "PYTHON:30") Am I not setting it correctly, or is there something else I need to do to fix this error? Many thanks, John
Re: How to handle deletion of items using PyFlink SQL?
Yes, I’m interested in the best pattern to follow with SQL to allow for a downstream DB using the JDBC SQL connector to reflect the state of rows added and deleted upstream. So imagine there is a crawl event at t=C1 that happens with an associated timestamp and which finds resources A,B,C. Is it better to emit one event into the stream with an array of all resources or many events, each with one resource and a corresponding crawl timestamp. There is obviously a limit to the amount of data that can be in a single event so the latter pattern will scale better for many resources. Flink SQL sees this stream and processes it, then emits to a JDBC sink where there is one row for A, B, C. Later, at t=C2, another crawl happens, finding A, B, D. I want the sink DB to have 3 rows if possible and not have C. Alternatively it should have 4 rows with a tombstone/delete marker on row C so it’s obvious it doesn’t exist any more. I’m interested in a SQL solution if possible. J Sent from my iPhone On 9 Jun 2022, at 11:20, Xuyang wrote: Hi, Dian Fu. I think John's requirement is like a cdc source that the source needs the ability to know which of datas should be deleted and then notify the framework, and that is why I recommendation John to use the UDTF. And hi, John. I'm not sure this doc [1] is enough. BTW, I think you can also try to customize a connector[2] to send `DELETE` RowData to downstream by java and use it in PyFlink SQL, and maybe it's more easy. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/table/udfs/python_udfs/#table-functions [2] https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sourcessinks/#user-defined-sources--sinks -- Best! Xuyang 在 2022-06-09 08:53:36,"Dian Fu" 写道: Hi John, If you are using Table API & SQL, the framework is handling the RowKind and it's transparent for you. So usually you don't need to handle RowKind in Table API & SQL. Regards, Dian On Thu, Jun 9, 2022 at 6:56 AM John Tipper mailto:john_tip...@hotmail.com>> wrote: Hi Xuyang, Thank you very much, I’ll experiment tomorrow. Do you happen to know whether there is a Python example of udtf() with a RowKind being set (or whether it’s supported)? Many thanks, John Sent from my iPhone On 8 Jun 2022, at 16:41, Xuyang mailto:xyzhong...@163.com>> wrote: Hi, John. What about use udtf [1]? In your UDTF, all resources are saved as a set or map as s1. When t=2 arrives, the new resources as s2 will be collected by crawl. I think what you want is the deletion data that means 's1' - 's2'. So just use loop to find out the deletion data and send RowData in function 'eval' in UDTF, and the RowData can be sent with a RowKind 'DELETE'[2]. The 'DELETE' means tell the downstream that this value is deleted. I will be glad if it can help you. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#table-functions [2] https://github.com/apache/flink/blob/44f73c496ed1514ea453615b77bee0486b8998db/flink-core/src/main/java/org/apache/flink/types/RowKind.java#L52 -- Best! Xuyang At 2022-06-08 20:06:17, "John Tipper" mailto:john_tip...@hotmail.com>> wrote: Hi all, I have some reference data that is periodically emitted by a crawler mechanism into an upstream Kinesis data stream, where those rows are used to populate a sink table (and where I am using Flink 1.13 PyFlink SQL within AWS Kinesis Data Analytics). What is the best pattern to handle deletion of upstream data, such that the downstream table remains in sync with upstream? For example, at t=1, rows R1, R2, R3 are processed from the stream, resulting in a DB with 3 rows. At some point between t=1 and t=2, the resource corresponding to R2 was deleted, such that at t=2 when the next crawl was carried out only rows R1 and R2 were emitted into the upstream stream. How should I process the stream of events so that when I have finished processing the events from t=2 my downstream table also has just rows R1 and R3? Many thanks, John
Re: How to implement unnest() as udtf using Python?
You’re a legend, thank you so much, I was looking on the internal functions docs page, not that one! John Sent from my iPhone On 13 Jun 2022, at 13:21, Martijn Visser wrote: Hi John, You're mentioning that Flink doesn't support UNNEST, but it does [1]. Would this work for you? Best regards, Martijn [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/queries/joins/#array-expansion Op ma 13 jun. 2022 om 13:55 schreef John Tipper mailto:john_tip...@hotmail.com>>: Hi all, Flink doesn’t support the unnest() function, which takes an array and creates a row for each element in the array. I have column containing an array of map and I’d like to implement my own unnest. I try this as an initial do-nothing implementation: @udtf(result_types=Datatypes.MAP( key_type=Datatypes.STRING(), value_type=Datatypes.STRING())) def my_unnest(arr): return [] I get an error when Flink starts: No match found for function signature my_unnest() Is there something that I’m missing in my definition please? Many thanks, John
How to implement unnest() as udtf using Python?
Hi all, Flink doesn’t support the unnest() function, which takes an array and creates a row for each element in the array. I have column containing an array of map and I’d like to implement my own unnest. I try this as an initial do-nothing implementation: @udtf(result_types=Datatypes.MAP( key_type=Datatypes.STRING(), value_type=Datatypes.STRING())) def my_unnest(arr): return [] I get an error when Flink starts: No match found for function signature my_unnest() Is there something that I’m missing in my definition please? Many thanks, John
Re: How to handle deletion of items using PyFlink SQL?
Hi Xuyang, Thank you very much, I’ll experiment tomorrow. Do you happen to know whether there is a Python example of udtf() with a RowKind being set (or whether it’s supported)? Many thanks, John Sent from my iPhone On 8 Jun 2022, at 16:41, Xuyang wrote: Hi, John. What about use udtf [1]? In your UDTF, all resources are saved as a set or map as s1. When t=2 arrives, the new resources as s2 will be collected by crawl. I think what you want is the deletion data that means 's1' - 's2'. So just use loop to find out the deletion data and send RowData in function 'eval' in UDTF, and the RowData can be sent with a RowKind 'DELETE'[2]. The 'DELETE' means tell the downstream that this value is deleted. I will be glad if it can help you. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#table-functions [2] https://github.com/apache/flink/blob/44f73c496ed1514ea453615b77bee0486b8998db/flink-core/src/main/java/org/apache/flink/types/RowKind.java#L52 -- Best! Xuyang At 2022-06-08 20:06:17, "John Tipper" wrote: Hi all, I have some reference data that is periodically emitted by a crawler mechanism into an upstream Kinesis data stream, where those rows are used to populate a sink table (and where I am using Flink 1.13 PyFlink SQL within AWS Kinesis Data Analytics). What is the best pattern to handle deletion of upstream data, such that the downstream table remains in sync with upstream? For example, at t=1, rows R1, R2, R3 are processed from the stream, resulting in a DB with 3 rows. At some point between t=1 and t=2, the resource corresponding to R2 was deleted, such that at t=2 when the next crawl was carried out only rows R1 and R2 were emitted into the upstream stream. How should I process the stream of events so that when I have finished processing the events from t=2 my downstream table also has just rows R1 and R3? Many thanks, John
How to handle deletion of items using PyFlink SQL?
Hi all, I have some reference data that is periodically emitted by a crawler mechanism into an upstream Kinesis data stream, where those rows are used to populate a sink table (and where I am using Flink 1.13 PyFlink SQL within AWS Kinesis Data Analytics). What is the best pattern to handle deletion of upstream data, such that the downstream table remains in sync with upstream? For example, at t=1, rows R1, R2, R3 are processed from the stream, resulting in a DB with 3 rows. At some point between t=1 and t=2, the resource corresponding to R2 was deleted, such that at t=2 when the next crawl was carried out only rows R1 and R2 were emitted into the upstream stream. How should I process the stream of events so that when I have finished processing the events from t=2 my downstream table also has just rows R1 and R3? Many thanks, John
Re: Multiple INSERT INTO within single PyFlink job?
Ah, found it: I need to use add_insert_sql() in order to use multiple insert statements: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/insert/#insert-statement https://aws.amazon.com/blogs/big-data/build-a-real-time-streaming-application-using-apache-flink-python-api-with-amazon-kinesis-data-analytics/ Sorry for the noise. From: John Tipper Sent: 29 April 2022 14:07 To: user@flink.apache.org Subject: Multiple INSERT INTO within single PyFlink job? Hi all, Is it possible to have more than one `INSERT INTO ... SELECT ...` statement within a single PyFlink job (on Flink 1.13.6)? I have a number of output tables that I create and I am trying to write to write to these within a single job, where the example SQL looks like (assume there is an input table called 'input'): sql1 = "INSERT INTO out1 (col1, col2) SELECT col1, col2 FROM input" sql2 = "INSERT INTO out2 (col3, col4) SELECT col3, col4 FROM input" env.execute_sql(sql1) env.execute_sql(sql2) When this is run inside a Flink cluster inside Kinesis on AWS, I get a failure: "Cannot have more than one execute() or executeAsync() call in a single environment". When I look at the Flink web UI, I can see that there is one job called "insert-into_default_catalog.default_database.out1". Does Flink separate out each INSERT statement into a separate job? It looks like it tries to create one job for the first query and then fails to create a second job for the second query. Is there any way of getting it to run as a single job using SQL, without having to move away from SQL and the Table API? Many thanks, John
Multiple INSERT INTO within single PyFlink job?
Hi all, Is it possible to have more than one `INSERT INTO ... SELECT ...` statement within a single PyFlink job (on Flink 1.13.6)? I have a number of output tables that I create and I am trying to write to write to these within a single job, where the example SQL looks like (assume there is an input table called 'input'): sql1 = "INSERT INTO out1 (col1, col2) SELECT col1, col2 FROM input" sql2 = "INSERT INTO out2 (col3, col4) SELECT col3, col4 FROM input" env.execute_sql(sql1) env.execute_sql(sql2) When this is run inside a Flink cluster inside Kinesis on AWS, I get a failure: "Cannot have more than one execute() or executeAsync() call in a single environment". When I look at the Flink web UI, I can see that there is one job called "insert-into_default_catalog.default_database.out1". Does Flink separate out each INSERT statement into a separate job? It looks like it tries to create one job for the first query and then fails to create a second job for the second query. Is there any way of getting it to run as a single job using SQL, without having to move away from SQL and the Table API? Many thanks, John
Re: Unit testing PyFlink SQL project
Hi Dian, I've tried this and it works nicely, on both MacOS and Windows, thank you very much indeed for your help. Kind regards, John From: Dian Fu Sent: 25 April 2022 02:42 To: John Tipper Cc: user@flink.apache.org Subject: Re: Unit testing PyFlink SQL project Hi John, I'm also using MacOS. This is the steps I'm following which I have run successfully: 1) python3 -m venv .venv 2) source .venv/bin/activate 3) pip install apache-flink==1.14.4 4) python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')" It will print something like this: "/Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/pyflink/log" 5) check the structure of the installed package: ``` (.venv) (base) dianfu@B-7174MD6R-1908 testing % ls -lh /Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/pyflink/ total 136 -rw-r--r-- 1 dianfu staff 1.3K Apr 25 09:26 README.txt -rw-r--r-- 1 dianfu staff 1.9K Apr 25 09:26 __init__.py drwxr-xr-x 11 dianfu staff 352B Apr 25 09:26 __pycache__ drwxr-xr-x 25 dianfu staff 800B Apr 25 09:26 bin drwxr-xr-x 21 dianfu staff 672B Apr 25 09:26 common drwxr-xr-x 13 dianfu staff 416B Apr 25 09:26 conf drwxr-xr-x 20 dianfu staff 640B Apr 25 09:26 datastream drwxr-xr-x 4 dianfu staff 128B Apr 25 09:26 examples -rw-r--r-- 1 dianfu staff 3.2K Apr 25 09:26 find_flink_home.py drwxr-xr-x 25 dianfu staff 800B Apr 25 09:26 fn_execution -rw-r--r-- 1 dianfu staff 9.1K Apr 25 09:26 gen_protos.py -rw-r--r-- 1 dianfu staff 7.6K Apr 25 09:26 java_gateway.py drwxr-xr-x 11 dianfu staff 352B Apr 25 09:26 lib drwxr-xr-x 28 dianfu staff 896B Apr 25 09:26 licenses drwxr-xr-x 4 dianfu staff 128B Apr 25 09:26 log drwxr-xr-x 5 dianfu staff 160B Apr 25 09:26 metrics drwxr-xr-x 4 dianfu staff 128B Apr 25 09:26 opt drwxr-xr-x 11 dianfu staff 352B Apr 25 09:26 plugins -rw-r--r-- 1 dianfu staff 1.3K Apr 25 09:26 pyflink_callback_server.py -rw-r--r-- 1 dianfu staff12K Apr 25 09:26 pyflink_gateway_server.py -rw-r--r-- 1 dianfu staff 5.3K Apr 25 09:26 serializers.py -rw-r--r-- 1 dianfu staff 7.9K Apr 25 09:26 shell.py drwxr-xr-x 31 dianfu staff 992B Apr 25 09:26 table drwxr-xr-x 6 dianfu staff 192B Apr 25 09:26 util -rw-r--r-- 1 dianfu staff 1.1K Apr 25 09:26 version.py ``` 6) Execute command `python3 -m unittest test_table_api.TableTests.test_scalar_function` The output is as following and you could see that it executes successfully: ``` (.venv) (base) dianfu@B-7174MD6R-1908 testing % python3 -m unittest test_table_api.TableTests.test_scalar_function Using %s as FLINK_HOME... /Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/pyflink Skipped download /Users/dianfu/code/src/github/pyflink-faq/testing/flink-python_2.11-1.14.4-tests.jar since it already exists. /Users/dianfu/miniconda3/lib/python3.8/subprocess.py:946: ResourceWarning: subprocess 71018 is still running _warn("subprocess %s is still running" % self.pid, ResourceWarning: Enable tracemalloc to get the object allocation traceback Downloading jar org.apache.flink:flink-table-planner_2.11:1.14.4:jar:tests /Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/pyflink/table/table_environment.py:538: DeprecationWarning: Deprecated in 1.10. Use create_table instead. warnings.warn("Deprecated in 1.10. Use create_table instead.", DeprecationWarning) . -- Ran 1 test in 32.746s OK ``` I have also tried your commands and run into the same error. I believe the difference comes from `python setup.py install` vs `pip install apache-flink==1.14.4`. When installing with command `python setup.py install`, the structure of the installed package is a little different from `pip install apache-flink==1.14.4`. I will dig into this and share the results when I have some findings. Before that, could you try to create a new clean virtual environment and see if the steps I'm following work for you? Regards, Dian On Mon, Apr 25, 2022 at 6:04 AM John Tipper mailto:john_tip...@hotmail.com>> wrote: And now when I add further dependencies to the classpath to remove all ClassNotFound exceptions, I get a different error which I don't understand (TypeError: Could not found the Java class 'EnvironmentSettings.inStreamingMode'.), see the logs below: $ python test_table_api.py TableTests.test_scalar_function Using %s as FLINK_HOME... /Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink Skipped download /Users/john/PycharmProjects/pyflink-faq/testing/flink-python_2.11-1.14.4-tests.jar since it already exists. Skipped download /Users/john/PycharmProjects/pyflink-faq/testing/
Re: Unit testing PyFlink SQL project
And now when I add further dependencies to the classpath to remove all ClassNotFound exceptions, I get a different error which I don't understand (TypeError: Could not found the Java class 'EnvironmentSettings.inStreamingMode'.), see the logs below: $ python test_table_api.py TableTests.test_scalar_function Using %s as FLINK_HOME... /Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink Skipped download /Users/john/PycharmProjects/pyflink-faq/testing/flink-python_2.11-1.14.4-tests.jar since it already exists. Skipped download /Users/john/PycharmProjects/pyflink-faq/testing/flink-python_2.11-1.14.4.jar since it already exists. Skipped download /Users/john/PycharmProjects/pyflink-faq/testing/flink-shaded-guava-30.1.1-jre-15.0.jar since it already exists. /Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/flink-shaded-guava-30.1.1-jre-15.0.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/flink-java-1.14.4.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/flink-table-planner_2.11-1.14.4.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/flink-table-common-1.14.4.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/jsr305-1.3.9.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/flink-python_2.11-1.14.4.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/flink-python_2.11-1.14.4-tests.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/flink-clients_2.11-1.14.4.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/flink-shaded-force-shading-14.0.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/flink-table-runtime_2.11-1.14.4.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/commons-compress-1.21.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/slf4j-api-1.7.15.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/flink-streaming-java_2.11-1.14.4.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/flink-core-1.14.4.jar SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. /usr/local/Cellar/python@3.8/3.8.13/Frameworks/Python.framework/Versions/3.8/lib/python3.8/subprocess.py:946: ResourceWarning: subprocess 45753 is still running _warn("subprocess %s is still running" % self.pid, ResourceWarning: Enable tracemalloc to get the object allocation traceback E == ERROR: test_scalar_function (__main__.TableTests) -- Traceback (most recent call last): File "/Users/john/PycharmProjects/pyflink-faq/testing/test_utils.py", line 135, in setUp self.t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) File "/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/table/environment_settings.py", line 267, in in_streaming_mode get_gateway().jvm.EnvironmentSettings.inStreamingMode()) File "/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/util/exceptions.py", line 185, in wrapped_call raise TypeError( TypeError: Could not found the Java class 'EnvironmentSettings.inStreamingMode'. The Java dependencies could be specified via command line argument '--jarfile' or the config option 'pipeline.jars' -- Ran 1 test in 0.401s FAILED (errors=1) sys:1: ResourceWarning: unclosed file <_io.BufferedWriter name=4>
Re: Unit testing PyFlink SQL project
le calling t.put -- Ran 1 test in 1.089s FAILED (errors=1) sys:1: ResourceWarning: unclosed file <_io.BufferedWriter name=4> (.venv) Johns-MacBook-Pro:testing john$ Unable to get the Python watchdog object, now exit. From: John Tipper Sent: 24 April 2022 20:30 To: Dian Fu Cc: user@flink.apache.org Subject: Re: Unit testing PyFlink SQL project Hi Dian, Thank you very much, that's very helpful. I'm seeing a couple of errors when I try to run the example though (Python 3.8 on Mac OS). 1. I create a fresh Python virtual env: `python -m venv .venv` 2. `source .venv/bin/activate` 3. When I tried to configure the project by running `python setup.py install` I got errors about Cython not being installed even though it was. I then just had to do a `pip install apache-flink==1.14.4` to install the requirements and be able to move forward. Not sure what the issue here is. 4. $ python test_table_api.py TableTests.test_scalar_function Using %s as FLINK_HOME... /Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink Skipped download /Users/john/PycharmProjects/pyflink-faq/testing/flink-python_2.11-1.14.4-tests.jar since it already exists. The flink-python jar is not found in the opt folder of the FLINK_HOME: /Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink /Users/john/PycharmProjects/pyflink-faq/testing/flink-python_2.11-1.14.4-tests.jar Error: Could not find or load main class org.apache.flink.client.python.PythonGatewayServer Caused by: java.lang.ClassNotFoundException: org.apache.flink.client.python.PythonGatewayServer E/usr/local/Cellar/python@3.8/3.8.13/Frameworks/Python.framework/Versions/3.8/lib/python3.8/unittest/case.py:704: ResourceWarning: unclosed file <_io.BufferedWriter name=4> outcome.errors.clear() ResourceWarning: Enable tracemalloc to get the object allocation traceback == ERROR: test_scalar_function (__main__.TableTests) -- Traceback (most recent call last): File "/Users/john/PycharmProjects/pyflink-faq/testing/test_utils.py", line 123, in setUp self.t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) File "/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/table/environment_settings.py", line 267, in in_streaming_mode get_gateway().jvm.EnvironmentSettings.inStreamingMode()) File "/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/java_gateway.py", line 62, in get_gateway _gateway = launch_gateway() File "/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/java_gateway.py", line 112, in launch_gateway raise Exception("Java gateway process exited before sending its port number") Exception: Java gateway process exited before sending its port number -- Ran 1 test in 0.333s FAILED (errors=1) 5. I then added `("org.apache.flink", "flink-python_2.11", "1.14.4", None)` to the testing_jars list so that the regular Flink jar would be downloaded, created an `opt` directory to the FLINK_HOME directory and copied into it the regular Flink jar. 6. 7. $ python test_table_api.py TableTests.test_scalar_function Using %s as FLINK_HOME... /Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink Skipped download /Users/john/PycharmProjects/pyflink-faq/testing/flink-python_2.11-1.14.4-tests.jar since it already exists. Skipped download /Users/john/PycharmProjects/pyflink-faq/testing/flink-python_2.11-1.14.4.jar since it already exists. /Users/john/PycharmProjects/pyflink-faq/testing/flink-python_2.11-1.14.4.jar:/Users/john/PycharmProjects/pyflink-faq/testing/flink-python_2.11-1.14.4-tests.jar Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/LoggerFactory at org.apache.flink.client.python.PythonEnvUtils.(PythonEnvUtils.java:77) at org.apache.flink.client.python.PythonGatewayServer.main(PythonGatewayServer.java:46) Caused by: java.lang.ClassNotFoundException: org.slf4j.LoggerFactory at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) at java.base/java.la
Re: Unit testing PyFlink SQL project
-- Traceback (most recent call last): File "/Users/john/PycharmProjects/pyflink-faq/testing/test_utils.py", line 131, in setUp self.t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) File "/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/table/environment_settings.py", line 267, in in_streaming_mode get_gateway().jvm.EnvironmentSettings.inStreamingMode()) File "/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/java_gateway.py", line 62, in get_gateway _gateway = launch_gateway() File "/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/java_gateway.py", line 112, in launch_gateway raise Exception("Java gateway process exited before sending its port number") Exception: Java gateway process exited before sending its port number -- Ran 1 test in 0.344s FAILED (errors=1) 8. Now it looks like the code needs all of the transitive dependencies on the classpath? Have you managed to get your example tests to run in a completely clean virtual environment? It looks like if it's working on your computer that your computer perhaps has Java and Python dependencies already downloaded into particular locations. Many thanks, John From: Dian Fu Sent: 24 April 2022 06:21 To: John Tipper Cc: user@flink.apache.org Subject: Re: Unit testing PyFlink SQL project Hi John, I have written an example on how to write unit tests of Flink functionalities with PyFlink in [1]. Hope it is helpful for you. Feel free to let me know if there are any problems. Regards, Dian [1] https://github.com/dianfu/pyflink-faq/tree/main/testing On Sun, Apr 24, 2022 at 9:25 AM Dian Fu mailto:dian0511...@gmail.com>> wrote: Hi John, >> I don't know how to fix this. I've tried adding `flink-table-planner` and >> `flink-table-planner-blink` dependencies with `test-jar` to my >> dummy pom.xml, but it still fails. What's the failure after doing this? The flink-table-planner*-tests.jar should be available in maven repository[1]. >> This is starting to feel like a real pain to do something that should be >> trivial: basic TDD of a PyFlink project. Is there a real-world example of a >> Python project that shows how to set up a testing environment for unit >> testing SQL with PyFlink? I'm not aware of such a project, however I agree that this may be a very important aspect which should be improved. I will look into this. Regards, Dian [1] https://repo1.maven.org/maven2/org/apache/flink/flink-table-planner_2.11/1.13.6/ On Sun, Apr 24, 2022 at 4:44 AM John Tipper mailto:john_tip...@hotmail.com>> wrote: Hi all, Is there an example of a self-contained repository showing how to perform SQL unit testing of PyFlink (specifically 1.13.x if possible)? I have cross-posted the question to Stack Overflow here: https://stackoverflow.com/questions/71983434/is-there-an-example-of-pyflink-sql-unit-testing-in-a-self-contained-repo There is a related SO question (https://stackoverflow.com/questions/69937520/pyflink-sql-local-test), where it is suggested to use some of the tests from PyFlink itself. The issue I'm running into is that the PyFlink repo assumes that a bunch of things are on the Java classpath and that some Python utility classes are available (they're not distributed via PyPi apache-flink). I have done the following: 1. Copied `test_case_utils.py` and `source_sink_utils.py` from PyFlink (https://github.com/apache/flink/tree/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/testing) into my project. 2. Copy an example unit test (https://github.com/apache/flink/blob/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/table/tests/test_sql.py#L39) as suggested by the related SO question. 3. When I try to run the test, I get an error because the test case cannot determine what version of Avro jars to download (`download_apache_avro()` fails, because pyflink_gateway_server.py tries to evaluate the value of `avro.version` by running `mvn help:evaluate -Dexpression=avro.version`) I then added a dummy `pom.xml` defining a Maven property of `avro.version` (with a value of `1.10.0`) and my unit test case is loaded. I now get a new error and my test is skipped: 'flink-table-planner*-tests.jar' is not available. Will skip the related tests. I don't know how to fix this. I've tried adding `flink-table-planner` and `flink-table-planner-blink` dependencies with `test-jar` to my dummy pom.xml, but it still fails. This is starting
Unit testing PyFlink SQL project
Hi all, Is there an example of a self-contained repository showing how to perform SQL unit testing of PyFlink (specifically 1.13.x if possible)? I have cross-posted the question to Stack Overflow here: https://stackoverflow.com/questions/71983434/is-there-an-example-of-pyflink-sql-unit-testing-in-a-self-contained-repo There is a related SO question (https://stackoverflow.com/questions/69937520/pyflink-sql-local-test), where it is suggested to use some of the tests from PyFlink itself. The issue I'm running into is that the PyFlink repo assumes that a bunch of things are on the Java classpath and that some Python utility classes are available (they're not distributed via PyPi apache-flink). I have done the following: 1. Copied `test_case_utils.py` and `source_sink_utils.py` from PyFlink (https://github.com/apache/flink/tree/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/testing) into my project. 2. Copy an example unit test (https://github.com/apache/flink/blob/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/table/tests/test_sql.py#L39) as suggested by the related SO question. 3. When I try to run the test, I get an error because the test case cannot determine what version of Avro jars to download (`download_apache_avro()` fails, because pyflink_gateway_server.py tries to evaluate the value of `avro.version` by running `mvn help:evaluate -Dexpression=avro.version`) I then added a dummy `pom.xml` defining a Maven property of `avro.version` (with a value of `1.10.0`) and my unit test case is loaded. I now get a new error and my test is skipped: 'flink-table-planner*-tests.jar' is not available. Will skip the related tests. I don't know how to fix this. I've tried adding `flink-table-planner` and `flink-table-planner-blink` dependencies with `test-jar` to my dummy pom.xml, but it still fails. This is starting to feel like a real pain to do something that should be trivial: basic TDD of a PyFlink project. Is there a real-world example of a Python project that shows how to set up a testing environment for unit testing SQL with PyFlink? Many thanks, John
XXX doesn't exist in the parameters of the SQL statement
Hi all, I'm having some issues with getting a Flink SQL application to work, where I get an exception and I'm not sure why it's occurring. I have a source table, reading from Kinesis, where the incoming data in JSON format has a "source" and "detail-type" field. CREATE TABLE `input` ( `source` VARCHAR, `detail-type` VARCHAR ) WITH ( 'connector' = 'kinesis', 'format' = 'json', ... ) I have an output table called output writing to a table called dummy in a JDBC database, where the JDBC database table has 2 columns, source and detail-type. CREATE TABLE `output`( `source` VARCHAR, `detail-type` VARCHAR ) WITH ( 'connector' = 'jdbc', 'table-name' = 'dummy', ... ) I am trying to simply confirm I can write the data from input to output without transformation: INSERT INTO `output`(`source`, `detail-type`) SELECT `source`, `detail-type` from `input` However, my Flink job has the following exception: detail-type doesn't exist in the parameters of SQL statement INSERT INTO dummy(source, detail-type) VALUES (:source, :detail-type) I get the same error when I try: INSERT INTO `output` SELECT`source`, `detail-type` from `input` Can anyone please advise what I'm doing wrong? Many thanks, John
Re: How to process events with different JSON schemas in single Kinesis stream using PyFlink?
Hi Dian, Thank you very much, that worked very nicely. Kind regards, John From: Dian Fu Sent: 11 April 2022 06:32 To: John Tipper Cc: user@flink.apache.org Subject: Re: How to process events with different JSON schemas in single Kinesis stream using PyFlink? Hi John, 1) Regarding to Table API, you could declare the column `detail` as STRING and then parse it into a json in the Python use-defined function as following: ``` @udf(result_type=DataTypes.STRING()) def get_id(detail): detail_json = json.loads(detail) if 'build-id' in detail_json: return detail_json['build-id'] else: return detail_json['instance-id'] ``` 2) Regarding the DataStream API, Kinesis is still not supported, however it should be very easy as it is simply a wrapping of the Java Kinesis connector. If you want to use DataStream API, you could wrap it yourself for now and could refer to how the other connectors are handled [1] for more details. Regards, Dian [1] https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py#L1235 On Mon, Apr 11, 2022 at 6:17 AM John Tipper mailto:john_tip...@hotmail.com>> wrote: TLDR; I want to know how best to process a stream of events using PyFlink, where the events in the stream have a number of different schemas. Details: I want to process a stream of events coming from a Kinesis data stream which originate from an AWS EventBridge bus. The events in this stream are all JSON, but have different schema. Specifically, they all have a common set of properties (e.g. version, id, time, source, detail-type), but there is a "detail" section that is a different shape, depending on what the event type is. For instance, an event on the bus from EC2 might look something like this: { "version": "0", "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718", "detail-type": "EC2 Instance State-change Notification", "source": "aws.ec2", "account": "", "time": "2017-12-22T18:43:48Z", "region": "us-west-1", "detail": { "instance-id": " i-1234567890abcdef0", "state": "terminated" } } whereas the "detail" section for an event from Codebuild might look like this (and the top level "source" field would be "aws.codebuild"): "detail":{ "build-status": "SUCCEEDED", "project-name": "my-sample-project", "build-id": "arn:aws:codebuild:us-west-2:123456789012:build/my-sample-project:8745a7a9-c340-456a-9166-edf953571bEX", "additional-information": { "artifact": { "md5sum": "da9c44c8a9a3cd4b443126e823168fEX", "sha256sum": "6ccc2ae1df9d155ba83c597051611c42d60e09c6329dcb14a312cecc0a8e39EX", "location": "arn:aws:s3:::codebuild-123456789012-output-bucket/my-output-artifact.zip" } } } I have the following constraints that I really want to abide by: 1. I cannot create a Kinesis data stream for each event type, i.e. the stream of events is multiplexed. 2. I'd like to create application code in Python using PyFlink (I can code happily in Java but my colleagues who need to contribute cannot). I want to ingest the stream of events, key by the type of event (key by "detail-type", or "source") and then process the events according to their type. I thought my options might be: 1. Use SQL/Table API (my preferred option), but it looks like JSON queries are not scheduled to be released until Flink 1.15, and until then I cannot define the schema of the input table where a field is a generic Map (i.e. Map<>). It appears I have to define the schema of the input table exactly, and I don't see how I can create a table which covers a property that varies in shape, e.g. for EC2 example above the schema for "detail-tye" is Map, but for a CodeBuild event it's a more deeply nested JSON structure. If there were a "JSON" type then this would appear to be the way to go. 2. 3. Use the Datastream API, but it looks like there is not a PyFlink Kinesis connector for the DataStream API. There is a Java connector - what's involved in creating a custom PyFlink Datastream connector? Are there any other options I've missed? What's the best way to approach this? Many thanks, John
How to process events with different JSON schemas in single Kinesis stream using PyFlink?
TLDR; I want to know how best to process a stream of events using PyFlink, where the events in the stream have a number of different schemas. Details: I want to process a stream of events coming from a Kinesis data stream which originate from an AWS EventBridge bus. The events in this stream are all JSON, but have different schema. Specifically, they all have a common set of properties (e.g. version, id, time, source, detail-type), but there is a "detail" section that is a different shape, depending on what the event type is. For instance, an event on the bus from EC2 might look something like this: { "version": "0", "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718", "detail-type": "EC2 Instance State-change Notification", "source": "aws.ec2", "account": "", "time": "2017-12-22T18:43:48Z", "region": "us-west-1", "detail": { "instance-id": " i-1234567890abcdef0", "state": "terminated" } } whereas the "detail" section for an event from Codebuild might look like this (and the top level "source" field would be "aws.codebuild"): "detail":{ "build-status": "SUCCEEDED", "project-name": "my-sample-project", "build-id": "arn:aws:codebuild:us-west-2:123456789012:build/my-sample-project:8745a7a9-c340-456a-9166-edf953571bEX", "additional-information": { "artifact": { "md5sum": "da9c44c8a9a3cd4b443126e823168fEX", "sha256sum": "6ccc2ae1df9d155ba83c597051611c42d60e09c6329dcb14a312cecc0a8e39EX", "location": "arn:aws:s3:::codebuild-123456789012-output-bucket/my-output-artifact.zip" } } } I have the following constraints that I really want to abide by: 1. I cannot create a Kinesis data stream for each event type, i.e. the stream of events is multiplexed. 2. I'd like to create application code in Python using PyFlink (I can code happily in Java but my colleagues who need to contribute cannot). I want to ingest the stream of events, key by the type of event (key by "detail-type", or "source") and then process the events according to their type. I thought my options might be: 1. Use SQL/Table API (my preferred option), but it looks like JSON queries are not scheduled to be released until Flink 1.15, and until then I cannot define the schema of the input table where a field is a generic Map (i.e. Map<>). It appears I have to define the schema of the input table exactly, and I don't see how I can create a table which covers a property that varies in shape, e.g. for EC2 example above the schema for "detail-tye" is Map, but for a CodeBuild event it's a more deeply nested JSON structure. If there were a "JSON" type then this would appear to be the way to go. 2. 3. Use the Datastream API, but it looks like there is not a PyFlink Kinesis connector for the DataStream API. There is a Java connector - what's involved in creating a custom PyFlink Datastream connector? Are there any other options I've missed? What's the best way to approach this? Many thanks, John
Re: Does Flink support raw generic types in a merged stream?
Hi Chesnay, Yes, but the actual use case needs to support more than 2 streams, so if I go down the Either route then I have arbitrarily sized nested Eithers, i.e. Either, C> etc, which gets pretty messy very quickly. Many thanks, John Sent from my iPhone On 17 Jul 2019, at 13:29, Chesnay Schepler mailto:ches...@apache.org>> wrote: Have you looked at org.apache.flink.types.Either? If you'd wrap all elements in both streams before the union you should be able to join them properly. On 17/07/2019 14:18, John Tipper wrote: Hi All, Can I union/join 2 streams containing generic classes, where each stream has a different parameterised type? I'd like to process the combined stream of values as a single raw type, casting to a specific type for detailed processing, based on some information in the type that will allow me to safely cast to the specific type. I can't share my exact code, but the below example shows the sort of thing I want to do. So, as an example, given the following generic type: class MyGenericContainer extends Tuple3 { ... private final String myString; private final IN value; private final Class clazz; // created by constructor private SomeOtherClass someOtherClass; ... } and 2 streams, I'd like to be able to do something like: DataStream> stream1 = ... DataStream> stream2 = ... DataStream<...> merged = stream1.union(stream2).process(new MyProcessFunction()); // within an operator, such as a MyProcessFunction: MyGenericContainer container = raw generic container passed to function; Object rawValue = container.getValue(); performProcessing((container.getClazz())rawValue); // safely cast rawValue However, I get an error when I do this: Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'IN' in 'class com.example.MyGenericTuple3' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). Otherwise the type has to be specified explicitly using type information. at org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:1133) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:853) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:587) at org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:633) If I try to add a returns() to the code, like this: DataStream<...> merged = stream1.union(stream2) .process(...) .returns(new TypeHint() {}) then I get a different exception: Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: The TypeHint is using a generic variable.This is not supported, generic types must be fully specified for the TypeHint. Is this sort of thing supported or is there another way of joining multiple streams into a single stream, where each stream object will have a specific type of a common generic type? Many thanks, John
Does Flink support raw generic types in a merged stream?
Hi All, Can I union/join 2 streams containing generic classes, where each stream has a different parameterised type? I'd like to process the combined stream of values as a single raw type, casting to a specific type for detailed processing, based on some information in the type that will allow me to safely cast to the specific type. I can't share my exact code, but the below example shows the sort of thing I want to do. So, as an example, given the following generic type: class MyGenericContainer extends Tuple3 { ... private final String myString; private final IN value; private final Class clazz; // created by constructor private SomeOtherClass someOtherClass; ... } and 2 streams, I'd like to be able to do something like: DataStream> stream1 = ... DataStream> stream2 = ... DataStream<...> merged = stream1.union(stream2).process(new MyProcessFunction()); // within an operator, such as a MyProcessFunction: MyGenericContainer container = raw generic container passed to function; Object rawValue = container.getValue(); performProcessing((container.getClazz())rawValue); // safely cast rawValue However, I get an error when I do this: Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'IN' in 'class com.example.MyGenericTuple3' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). Otherwise the type has to be specified explicitly using type information. at org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:1133) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:853) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:587) at org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:633) If I try to add a returns() to the code, like this: DataStream<...> merged = stream1.union(stream2) .process(...) .returns(new TypeHint() {}) then I get a different exception: Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: The TypeHint is using a generic variable.This is not supported, generic types must be fully specified for the TypeHint. Is this sort of thing supported or is there another way of joining multiple streams into a single stream, where each stream object will have a specific type of a common generic type? Many thanks, John
What order are events processed in iterative loop?
For the case of a single iteration of an iterative loop where the feedback type is different to the input stream type, what order are events processed in the forward flow? So for example, if we have: * the input stream contains input1 followed by input2 * a ConnectedIterativeStream at the head of an iteration * followed by a CoProcessFunction, which emits a feedback element in response to an inputthat closes the ConnectedIterativeStream For an input stream of input1 followed by input2, what order of events does the CoProcessFunction see? Does it see "input1, feedback1, input2, feedback2", or "input1, input2, feedback1, feedback2", or is it a non-deterministic processing time order based on the execution time of the CoProcessFunction, but where input1 is always processed before input2 and feedback1 is always processed before feedback2, e.g. either of the two orders are possible? Many thanks, John
How to join/group 2 streams by key?
Hi All, I have 2 streams of events that relate to a common base event, where one stream is the result of a flatmap. I want to join all events that share a common identifier. Thus I have something that looks like: DataStream streamA = ... DataStream streamB = someDataStream.flatMap(...) // produces stream of TypeB for each item in someDataStream Both TypeA and TypeB share an identifier and I know how many TypeB objects there are in the parent object. I want to perform some processing when all of the events associated with a particular identifier have arrived, i.e. when I basically can create a Tuple3> object. Is this best done with a WindowJoin and a GlobalWindow, a Window CoGroup and a GlobalWindow or by connecting the 2 streams into a ConnectedStream then performing the joining inside a CoProcessFunction? Many thanks, John
How are timestamps treated within an iterative DataStream loop within Flink?
Hi All, How are timestamps treated within an iterative DataStream loop within Flink? For example, here is an example of a simple iterative loop within Flink where the feedback loop is of a different type to the input stream: DataStream inputStream = env.addSource(new MyInputSourceFunction()); IterativeStream.ConnectedIterativeStreams iterativeStream = inputStream.iterate().withFeedbackType(MyFeedback.class); // define an output tag so we can emit feedback objects via a side output final OutputTag outputTag = new OutputTag("feedback-output"){}; // now do some processing SingleOutputStreamOperator combinedStreams = iterativeStream.process(new CoProcessFunction() { @Override public void processElement1(MyInput value, Context ctx, Collector out) throws Exception { // do some processing of the stream of MyInput values // emit MyOutput values downstream by calling out.collect() out.collect(someInstanceOfMyOutput); } @Override public void processElement2(MyFeedback value, Context ctx, Collector out) throws Exception { // do some more processing on the feedback classes // emit feedback items ctx.output(outputTag, someInstanceOfMyFeedback); } }); iterativeStream.closeWith(combinedStreams.getSideOutput(outputTag)); My questions revolve around how does Flink use timestamps within a feedback loop: * Within the ConnectedIterativeStreams, how does Flink treat ordering of the input objects across the streams of regular inputs and feedback objects? If I emit an object into the feedback loop, when will it be seen by the head of the loop with respect to the regular stream of input objects? * How does the behaviour change when using event time processing? Many thanks, John Question also posted to StackOverflow here: https://stackoverflow.com/questions/56506020/how-does-flink-treat-timestamps-within-iterative-loops
Is it possible to configure Flink pre-flight type serialization scanning?
Flink performs significant scanning during the pre-flight phase of a Flink application (https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html). The act of creating sources, operators and sinks causes Flink to scan the data types of the objects that are used within the topology of a given streaming flow as apparently Flink will try to optimise jobs based on this information. Is this scanning configurable? Can I turn it off and just force Flink to use Kryo serialisation only and not need or use any of this scanned information? I have a very large, deeply nested class in a proprietary library that was auto generated and Flink seems to get into a very large endless loop when scanning it that results in out of memory errors after running for several hours (the application never actually launches via env.execute(), even if I bump up the heap size significantly). The class has a number of circular references, i.e. class and its child classes contains references to other classes of the same type, is this likely to be a problem? Many thanks, John
Re: FLIP-16, FLIP-15 Status Updates?
Hi Timo, That’s great, thank you very much. If I’d like to contribute, is it best to wait until the roadmap has been published? And is this the best list to ask on, or is the development mailing list better? Many thanks, John Sent from my iPhone > On 19 Feb 2019, at 16:29, Timo Walther wrote: > > Hi John, > > you are right that there was not much progress in the last years around these > two FLIPs. Mostly due to shift of priorities. However, with the big Blink > code contribution from Alibaba and joint development forces for a unified > batch and streaming runtime [1], it is very likely that also iterations and > thus machine learning algorithms will see more development efforts. > > The community is working on roadmap page for the website. And I can already > reveal that a new iterations model is mentioned there. The new Flink roadmap > page can be expected in the next 2-3 weeks. > > I hope this information helps. > > Regards, > Timo > > [1] > https://flink.apache.org/news/2019/02/13/unified-batch-streaming-blink.html > >> Am 19.02.19 um 12:47 schrieb John Tipper: >> Hi All, >> >> Does anyone know what the current status is for FLIP-16 (loop fault >> tolerance) and FLIP-15 (redesign iterations) please? I can see lots of work >> back in 2016, but it all seemed to stop and go quiet since about March 2017. >> I see iterations as offering very interesting capabilities for Flink, so it >> would be good to understand how we can get this moving again. >> >> Many thanks, >> >> John >> >> Sent from my iPhone > >
FLIP-16, FLIP-15 Status Updates?
Hi All, Does anyone know what the current status is for FLIP-16 (loop fault tolerance) and FLIP-15 (redesign iterations) please? I can see lots of work back in 2016, but it all seemed to stop and go quiet since about March 2017. I see iterations as offering very interesting capabilities for Flink, so it would be good to understand how we can get this moving again. Many thanks, John Sent from my iPhone