Re: Write JSON string to JDBC as JSONB?

2022-07-24 Thread John Tipper
There may well be a better way, but I've been able to achieve this by executing 
the following in the DB:


CREATE CAST(VARCHAR AS JSONB) WITH INOUT AS IMPLICIT;

I think you can also probably do

CREATE CAST(VARCHAR AS JSONB) WITHOUT FUNCTION AS IMPLICIT;

However, I am working in AWS and AWS does not give the owner of the database 
sufficient permissions to execute this.



From: John Tipper 
Sent: 24 July 2022 22:01
To: user@flink.apache.org 
Subject: Write JSON string to JDBC as JSONB?

Hi all,

I am using PyFlink and SQL and have a JSON string that I wish to write to a 
JDBC sink (it's a Postgresql DB). I'd like to write that string to a column 
that is of type JSONB (or even JSON).

I'm getting exceptions when Flink tries to write the column:

Batch entry 0 INSERT INTO my_table(my_jsonb_column) VALUES ('{ ... a valid JSON 
string...}') ...
...
Error: column "my_jsonb_column" is of type jsonb but expression is of type 
character varying
Hint: You will need to rewrite or cast the expression

My table sink in Flink is defined like this:

CREATE TABLE my_table(
  my_jsonb_column STRING
) WITH (
  'connector' = 'jdbc'
)

and in my actual SQL database the column my_jsonb_column is actually defined of 
type JSONB.

I can happily execute SQL statements myself like this without an error where I 
write a string to the JSONB column:

INSERT INTO my_table(my_jsonb_column) VALUES ('{ ... a valid JSON string...}')


Is there any way of having Flink write to a JSONB column or otherwise get that 
JSON string into a JSONB column?

Many thanks,

John




Write JSON string to JDBC as JSONB?

2022-07-24 Thread John Tipper
Hi all,

I am using PyFlink and SQL and have a JSON string that I wish to write to a 
JDBC sink (it's a Postgresql DB). I'd like to write that string to a column 
that is of type JSONB (or even JSON).

I'm getting exceptions when Flink tries to write the column:

Batch entry 0 INSERT INTO my_table(my_jsonb_column) VALUES ('{ ... a valid JSON 
string...}') ...
...
Error: column "my_jsonb_column" is of type jsonb but expression is of type 
character varying
Hint: You will need to rewrite or cast the expression

My table sink in Flink is defined like this:

CREATE TABLE my_table(
  my_jsonb_column STRING
) WITH (
  'connector' = 'jdbc'
)

and in my actual SQL database the column my_jsonb_column is actually defined of 
type JSONB.

I can happily execute SQL statements myself like this without an error where I 
write a string to the JSONB column:

INSERT INTO my_table(my_jsonb_column) VALUES ('{ ... a valid JSON string...}')


Is there any way of having Flink write to a JSONB column or otherwise get that 
JSON string into a JSONB column?

Many thanks,

John




Re: What do columns for TM memory usage in Flink UI Console mean?

2022-07-20 Thread John Tipper
Sorry, pressed send too early.

What is the unit of measure for "count" and does this tell me I have too little 
Direct Memory and if so, what do I do to specifically increase this number?

Many thanks,

John
____
From: John Tipper
Sent: 20 July 2022 17:52
To: user@flink.apache.org 
Subject: What do columns for TM memory usage in Flink UI Console mean?

Hi all,

I can't find mention of what the columns mean for the "Outside JVM Memory' for 
the Task Manager in the Flink console.

I have:


  Count UsedCapacity
Direct4,203   227 MB  227MB
Mapped0   0 B   0 B

What is the unit of measure for "Count"?




What do columns for TM memory usage in Flink UI Console mean?

2022-07-20 Thread John Tipper
Hi all,

I can't find mention of what the columns mean for the "Outside JVM Memory' for 
the Task Manager in the Flink console.

I have:


  Count UsedCapacity
Direct4,203   227 MB  227MB
Mapped0   0 B   0 B

What is the unit of measure for "Count"?




Re: PyFlink SQL: force maximum use of slots

2022-07-20 Thread John Tipper
Hi Dian,

Thanks very much - I suppose the concept I'm struggling with is understanding 
how parallelism works when using SQL. I understand that in the Datastream world 
parallelism means each slot will get a subset of events. However, how does that 
work in the SQL world where you need to do joins between tables? If the events 
in tables A and B are being processed in parallel, then does this not mean that 
the slots will have only some events for those tables A and B in any given 
slot? How does Flink ensure consistency of results irrespective of the 
parallelism used, or does it just copy all events to all slots, in which case I 
don't understand how parallelism assists?

Many thanks,

John


From: Dian Fu 
Sent: 20 July 2022 05:19
To: John Tipper 
Cc: user@flink.apache.org 
Subject: Re: PyFlink SQL: force maximum use of slots

Hi John,

All the operators in the same slot sharing group will be put in one slot. The 
slot sharing group is only configurable in DataStream API [1]. Usually you 
don't need to set the slot sharing group explicitly [2] and this is good to 
share the resource between the operators running in the same slot. If the 
performance becomes a problem, you could just increase the parallelism or the 
resource(CPU or Memory) of the TM. For example, if the parallelism is set to 2, 
you will see that there will be two running slots and each slot containing all 
the operators.

Regards,
Dian

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/operators/overview/#set-slot-sharing-group
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/concepts/flink-architecture/#task-slots-and-resources

On Tue, Jul 19, 2022 at 12:23 AM John Tipper 
mailto:john_tip...@hotmail.com>> wrote:
Hi all,

Is there a way of forcing a pipeline to use as many slots as possible?  I have 
a program in PyFlink using SQL and the Table API and currently all of my 
pipeline is using just a single slot.  I've tried this:

  
StreamExecutionEnvironment.get_execution_environment().disable_operator_chaining()

I did have a pipeline which had 17 different tasks running in just a single 
slot, but all this does is give me 79 different operators but all are still 
running in a single slot. Is there a way to get Flink to run different jobs in 
different slots whilst using the Table API and SQL?

Many thanks,

John


PyFlink SQL: force maximum use of slots

2022-07-18 Thread John Tipper
Hi all,

Is there a way of forcing a pipeline to use as many slots as possible?  I have 
a program in PyFlink using SQL and the Table API and currently all of my 
pipeline is using just a single slot.  I've tried this:

  
StreamExecutionEnvironment.get_execution_environment().disable_operator_chaining()

I did have a pipeline which had 17 different tasks running in just a single 
slot, but all this does is give me 79 different operators but all are still 
running in a single slot. Is there a way to get Flink to run different jobs in 
different slots whilst using the Table API and SQL?

Many thanks,

John


Re: PyFlink and parallelism

2022-07-18 Thread John Tipper
Thanks very much, given that I'm using SQL it doesn't look like I'm able to 
access the operators to be able to change the parallelism without dropping from 
the table API into the datastream API and then back again.  In any case, 
conversion between a tablesteam and a datastream is broken if the tablestream 
contains a timestamp, which I reported a little while ago and Dian filed as 
FLINK-28253.

Kind regards,

John

From: Juntao Hu 
Sent: 18 July 2022 04:13
To: John Tipper 
Cc: user@flink.apache.org 
Subject: Re: PyFlink and parallelism

It's not the issue with Python-Java object conversion, you get a DataStream 
rather than SingleOutputStreamOperator underlying the Python DataStream wrapper 
after calling `to_data_stream`, and `setParallelism` is only available on 
SingleOutputStreamOperator. To work around this, change `set_parallelism` to 
your processing operator, e.g. `filtered_stream.map(...).set_parallelism`.

John Tipper mailto:john_tip...@hotmail.com>> 
于2022年7月16日周六 17:07写道:
I've tried this and can see there appears to be a bigger problem with PyFlink 
and a call to set_parallelism()​:


events_table = table_env.from_path(MY_SOURCE_TABLE)

filtered_table = events_table.filter(
  col("event_type") == "event_of_interest"
)

filtered_stream = table_env.to_data_stream(filtered_table)

# fetch desired parallelism from config, don't hardcode
filtered_stream.set_parallelism(int(table_env.get_config().get('my.custom.parallelism',
 1)))

table_env.create_temporary_view(MY_FILTERED_VIEW, filtered_stream)

# now execute SQL onMY_FILTERED_VIEW
table_env.execute_sql(...)


I now get an error:

An error occurred when calling o68.setParallelism(). Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method 
setParallelism([class java.lang.Integer]) does not exist.


Looks like Python is converting to the Integer object in Java and not the int 
primitive.  I actually see this if I just call set_parallelism(1)​ without the 
call to get_config()​. Is this a bug or is there a workaround?

____
From: John Tipper
Sent: 15 July 2022 16:44
To: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: PyFlink and parallelism

Hi all,

I have a processing topology using PyFlink and SQL where there is data skew: 
I'm splitting a stream of heterogenous data into separate streams based on the 
type of data that's in it and some of these substreams have very many more 
events than others and this is causing issues when checkpointing (checkpoints 
are timing out). I'd like to increase parallelism for these problematic 
streams, I'm just not sure how I do that and target just those elements. Do I 
need to use the datastream API here? What does this look like please?


I have a table defined and I duplicate a stream from that table, then filter so 
that my substream has only the events I'm interested in:


events_table = table_env.from_path(MY_SOURCE_TABLE)

filtered_table = events_table.filter(
col("event_type") == "event_of_interest"
)

table_env.create_temporary_view(MY_FILTERED_VIEW, filtered_table)

# now execute SQL on MY_FILTERED_VIEW
table_env.execute_sql(...)


The default parallelism of the overall table env is 1. Is there a way to 
increase the parallelism for just this stream?

Many thanks,

John


Re: PyFlink and parallelism

2022-07-16 Thread John Tipper
I've tried this and can see there appears to be a bigger problem with PyFlink 
and a call to set_parallelism()​:


events_table = table_env.from_path(MY_SOURCE_TABLE)

filtered_table = events_table.filter(
  col("event_type") == "event_of_interest"
)

filtered_stream = table_env.to_data_stream(filtered_table)

# fetch desired parallelism from config, don't hardcode
filtered_stream.set_parallelism(int(table_env.get_config().get('my.custom.parallelism',
 1)))

table_env.create_temporary_view(MY_FILTERED_VIEW, filtered_stream)

# now execute SQL onMY_FILTERED_VIEW
table_env.execute_sql(...)


I now get an error:

An error occurred when calling o68.setParallelism(). Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method 
setParallelism([class java.lang.Integer]) does not exist.


Looks like Python is converting to the Integer object in Java and not the int 
primitive.  I actually see this if I just call set_parallelism(1)​ without the 
call to get_config()​. Is this a bug or is there a workaround?

________
From: John Tipper
Sent: 15 July 2022 16:44
To: user@flink.apache.org 
Subject: PyFlink and parallelism

Hi all,

I have a processing topology using PyFlink and SQL where there is data skew: 
I'm splitting a stream of heterogenous data into separate streams based on the 
type of data that's in it and some of these substreams have very many more 
events than others and this is causing issues when checkpointing (checkpoints 
are timing out). I'd like to increase parallelism for these problematic 
streams, I'm just not sure how I do that and target just those elements. Do I 
need to use the datastream API here? What does this look like please?


I have a table defined and I duplicate a stream from that table, then filter so 
that my substream has only the events I'm interested in:


events_table = table_env.from_path(MY_SOURCE_TABLE)

filtered_table = events_table.filter(
col("event_type") == "event_of_interest"
)

table_env.create_temporary_view(MY_FILTERED_VIEW, filtered_table)

# now execute SQL on MY_FILTERED_VIEW
table_env.execute_sql(...)


The default parallelism of the overall table env is 1. Is there a way to 
increase the parallelism for just this stream?

Many thanks,

John


PyFlink and parallelism

2022-07-15 Thread John Tipper
Hi all,

I have a processing topology using PyFlink and SQL where there is data skew: 
I'm splitting a stream of heterogenous data into separate streams based on the 
type of data that's in it and some of these substreams have very many more 
events than others and this is causing issues when checkpointing (checkpoints 
are timing out). I'd like to increase parallelism for these problematic 
streams, I'm just not sure how I do that and target just those elements. Do I 
need to use the datastream API here? What does this look like please?


I have a table defined and I duplicate a stream from that table, then filter so 
that my substream has only the events I'm interested in:


events_table = table_env.from_path(MY_SOURCE_TABLE)

filtered_table = events_table.filter(
col("event_type") == "event_of_interest"
)

table_env.create_temporary_view(MY_FILTERED_VIEW, filtered_table)

# now execute SQL on MY_FILTERED_VIEW
table_env.execute_sql(...)


The default parallelism of the overall table env is 1. Is there a way to 
increase the parallelism for just this stream?

Many thanks,

John


Re: PyFlink: restoring from savepoint

2022-07-08 Thread John Tipper
Hi Dian,

I'm using version 1.15.0 and 1.15.1 of PyFlink. I think it's to do with how the 
arguments are ordered, as when I run the container with:

  “standalone-job”, “-s”, “s3://”, “-n”, “-pym”, 
“foo.main”

then the job starts successfully and loads from the savepoint.

Many thanks,

John



From: Dian Fu 
Sent: 08 July 2022 02:27
To: John Tipper 
Cc: user@flink.apache.org 
Subject: Re: PyFlink: restoring from savepoint

Hi John,

Could you provide more information, e.g. the exact command submitting the job, 
the logs file, the PyFlink version, etc?

Regards,
Dian


On Thu, Jul 7, 2022 at 7:53 PM John Tipper 
mailto:john_tip...@hotmail.com>> wrote:
Hi all,

I have a PyFlink job running in Kubernetes. Savepoints and checkpoints are 
being successfully saved to S3. However, I am unable to get the job to start 
from a save point.

The container is started with these args:

“standalone-job”, “-pym”, “foo.main”, “-s”, “s3://”, “-n”

In the JM logs I can see “Starting StandaloneApplicationClusterEntrypoint…” 
where my arguments are listed.

However, I don’t see any restore occurring in the logs and my application 
restarts with no state. How do I start a PyFlink job like this from a given 
savepoint?

Many thanks,

John

Sent from my iPhone


Re: Restoring a job from a savepoint

2022-07-07 Thread John Tipper
Thank you all, that’s very helpful. It looks like there’s something else that’s 
causing my cluster to not load my savepoints, so I’ve submitted a separate 
query for that.

Many thanks,

John

Sent from my iPhone

On 6 Jul 2022, at 21:24, Alexander Fedulov  wrote:


Hi John,

use  $ bin/flink run -s s3://my_bucket/path/to/savepoints/  (no 
trailing slash, including schema).

where  should contain a valid _metadata file.

You should see logs like this:
INFO o.a.f.r.c.CheckpointCoordinator [] - Starting job foobar from savepoint 
s3://my_bucket/path/to/savepoints/ ()
INFO o.a.f.r.c.CheckpointCoordinator []  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job 
foobar from Savepoint 1 @ 0 for foobar located at 
s3://my_bucket/path/to/savepoints/.

The indication of the correct restore should be the absence of exceptions. You 
might see messages like this one for operators that did not have any state in 
the savepoint:
INFO o.a.f.r.c.CheckpointCoordinator [] - Skipping empty savepoint state for 
operator a0f11f7a2c416beb6b7aed14be0d63ca.

Best,
Alexander Fedulov


On Wed, Jul 6, 2022 at 9:50 PM John Tipper 
mailto:john_tip...@hotmail.com>> wrote:
Hi all,


The docs on restoring a job from a savepoint 
(https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#resuming-from-savepoints)
 state that the syntax is:


$ bin/flink run -s :savepointPath [:runArgs]

and where "you may give a path to either the savepoint’s directory or the 
_metadata file."


If I am using S3 as my store of state:

state.savepoints.dir: s3://my_bucket/path/to/savepoints

and an example savepoint is at:


s3://my_bucket/path/to/savepoints//_metadata


then what am I supposed to supply to the flink run​ command?  Is it:


  1.  The full path including filesystem: 
s3://my_bucket/path/to/savepoints//_metadata or 
s3://my_bucket/path/to/savepoints/
  2.  or the full path: my_bucket/path/to/savepoints//_metadata 
or my_bucket/path/to/savepoints/
  3.  or the path relative to the savepoints directory: /_metadata or 

If I supply a directory, do I need to specify a trailing slash?

Also, is there anything that I will see in the logs that will indicate that the 
restore from a savepoint has been successful?

Many thanks,

John



PyFlink: restoring from savepoint

2022-07-07 Thread John Tipper
Hi all,

I have a PyFlink job running in Kubernetes. Savepoints and checkpoints are 
being successfully saved to S3. However, I am unable to get the job to start 
from a save point.

The container is started with these args:

“standalone-job”, “-pym”, “foo.main”, “-s”, “s3://”, “-n”

In the JM logs I can see “Starting StandaloneApplicationClusterEntrypoint…” 
where my arguments are listed.

However, I don’t see any restore occurring in the logs and my application 
restarts with no state. How do I start a PyFlink job like this from a given 
savepoint?

Many thanks,

John

Sent from my iPhone

Restoring a job from a savepoint

2022-07-06 Thread John Tipper
Hi all,


The docs on restoring a job from a savepoint 
(https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#resuming-from-savepoints)
 state that the syntax is:


$ bin/flink run -s :savepointPath [:runArgs]

and where "you may give a path to either the savepoint’s directory or the 
_metadata file."


If I am using S3 as my store of state:

state.savepoints.dir: s3://my_bucket/path/to/savepoints

and an example savepoint is at:


s3://my_bucket/path/to/savepoints//_metadata


then what am I supposed to supply to the flink run​ command?  Is it:


  1.  The full path including filesystem: 
s3://my_bucket/path/to/savepoints//_metadata or 
s3://my_bucket/path/to/savepoints/
  2.  or the full path: my_bucket/path/to/savepoints//_metadata 
or my_bucket/path/to/savepoints/
  3.  or the path relative to the savepoints directory: /_metadata or 

If I supply a directory, do I need to specify a trailing slash?

Also, is there anything that I will see in the logs that will indicate that the 
restore from a savepoint has been successful?

Many thanks,

John



Re: How to convert Table containing TIMESTAMP_LTZ into DataStream in PyFlink 1.15.0?

2022-06-27 Thread John Tipper
Hi Dian,

Thanks, much appreciated.

Kind regards,

John

Sent from my iPhone

On 27 Jun 2022, at 03:43, Dian Fu  wrote:


Hi John,

This seems like a bug and I have created a ticket 
https://issues.apache.org/jira/browse/FLINK-28253 to track it.

For now, you could try replacing to_data_stream with to_append_stream` to see 
if it works.

Regards,
Dian

On Sat, Jun 25, 2022 at 4:07 AM John Tipper 
mailto:john_tip...@hotmail.com>> wrote:
Hi,

I have a source table using a Kinesis connector reading events from AWS 
EventBridge using PyFlink 1.15.0. An example of the sorts of data that are in 
this stream is here: 
https://docs.aws.amazon.com/codebuild/latest/userguide/sample-build-notifications.html#sample-build-notifications-ref.
 Note that the stream of data contains many different types of events, where 
the 'detail' field is completely different between different event types. There 
is no support for this connector using PyFlink DataStream API, so I use the 
Table API to construct the source table.  The table looks like this:


CREATE TABLE events (
 `id` VARCHAR,
 `source` VARCHAR,
 `account` VARCHAR,
 `region` VARCHAR,
 `detail-type` VARCHAR,
 `detail` VARCHAR,
 `source` VARCHAR,
 `resources` VARCHAR,
 `time` TIMESTAMP(0) WITH LOCAL TIME ZONE,
 WATERMARK FOR `time` as `time` - INTERVAL '30' SECOND,
 PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
...
)


The table was created using:

 table_env.execute_sql(CREATE_STRING_ABOVE)

I'd like to turn this table into a data stream so I can perform some processing 
that is easier to do in the DataStream API:


events_stream_table = table_env.from_path('events')

events_stream = table_env.to_data_stream(events_stream_table)

# now do some processing - let's filter by the type of event we get

codebuild_stream = events_stream.filter(
lambda event: event['source'] == 'aws.codebuild'
)

# now do other stuff on a stream containing only events that are identical in 
shape
...
# maybe convert back into a Table and perform SQL on the data


When I run this, I get an exception:



org.apache.flink.table.api.TableException: Unsupported conversion from data type

 'TIMESTAMP(6) WITH TIME ZONE' (conversion class: java.time.OffsetDateTime) to

type information. Only data types that originated from type information fully

support a reverse conversion.

Somebody reported a similar error here 
(https://stackoverflow.com/questions/58936529/using-jdbctablesource-with-streamtableenvironment-gives-classcastexception)
 When I try the suggestion there and replace the "TIMESTAMP(0) WITH LOCAL TIME 
ZONE" with a "TIMESTAMP(3)" I get a different exception:

TypeError: The java type info: LocalDateTime is not supported in PyFlink 
currently.

Is there a way of converting this Table into a DataStream (and then back 
again)? I need to use the data in the "time"​ field as the source of watermarks 
for my events.

Many thanks,

John


How to convert Table containing TIMESTAMP_LTZ into DataStream in PyFlink 1.15.0?

2022-06-24 Thread John Tipper
Hi,

I have a source table using a Kinesis connector reading events from AWS 
EventBridge using PyFlink 1.15.0. An example of the sorts of data that are in 
this stream is here: 
https://docs.aws.amazon.com/codebuild/latest/userguide/sample-build-notifications.html#sample-build-notifications-ref.
 Note that the stream of data contains many different types of events, where 
the 'detail' field is completely different between different event types. There 
is no support for this connector using PyFlink DataStream API, so I use the 
Table API to construct the source table.  The table looks like this:


CREATE TABLE events (
 `id` VARCHAR,
 `source` VARCHAR,
 `account` VARCHAR,
 `region` VARCHAR,
 `detail-type` VARCHAR,
 `detail` VARCHAR,
 `source` VARCHAR,
 `resources` VARCHAR,
 `time` TIMESTAMP(0) WITH LOCAL TIME ZONE,
 WATERMARK FOR `time` as `time` - INTERVAL '30' SECOND,
 PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
...
)


The table was created using:

 table_env.execute_sql(CREATE_STRING_ABOVE)

I'd like to turn this table into a data stream so I can perform some processing 
that is easier to do in the DataStream API:


events_stream_table = table_env.from_path('events')

events_stream = table_env.to_data_stream(events_stream_table)

# now do some processing - let's filter by the type of event we get

codebuild_stream = events_stream.filter(
lambda event: event['source'] == 'aws.codebuild'
)

# now do other stuff on a stream containing only events that are identical in 
shape
...
# maybe convert back into a Table and perform SQL on the data


When I run this, I get an exception:



org.apache.flink.table.api.TableException: Unsupported conversion from data type

 'TIMESTAMP(6) WITH TIME ZONE' (conversion class: java.time.OffsetDateTime) to

type information. Only data types that originated from type information fully

support a reverse conversion.

Somebody reported a similar error here 
(https://stackoverflow.com/questions/58936529/using-jdbctablesource-with-streamtableenvironment-gives-classcastexception)
 When I try the suggestion there and replace the "TIMESTAMP(0) WITH LOCAL TIME 
ZONE" with a "TIMESTAMP(3)" I get a different exception:

TypeError: The java type info: LocalDateTime is not supported in PyFlink 
currently.

Is there a way of converting this Table into a DataStream (and then back 
again)? I need to use the data in the "time"​ field as the source of watermarks 
for my events.

Many thanks,

John


How to use connectors in PyFlink 1.15.0 when not defined in Python API?

2022-06-24 Thread John Tipper
Hi all,

There are a number of connectors which do not appear to be in the Python API 
v1.15.0, e.g. Kinesis. I can see that it's possible to use these connectors by 
using the Table API:

CREATE TABLE my_table (...)
WITH ('connector' = 'kinesis' ...)

I guess if you wanted the stream as a DataStream you'd I guess you'd create the 
Table and then convert into a DataStream?

Is there a way of directly instantiating these connectors in PyFlink without 
needed to use SQL like this (and without having to wait until v1.16)? e.g. the 
Java API looks like this:

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

DataStream kinesis = env.addSource(new FlinkKinesisConsumer<>(
"kinesis_stream_name", new SimpleStringSchema(), consumerConfig));

Many thanks,

John


Re: Is it possible to use DataStream API keyBy followed by Table API SQL in PyFlink?

2022-06-23 Thread John Tipper
Sorry for the noise, I completely missed this part of the documentation 
describing exactly how to do this: 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/data_stream_api/


From: John Tipper 
Sent: 23 June 2022 21:35
To: user@flink.apache.org 
Subject: Is it possible to use DataStream API keyBy followed by Table API SQL 
in PyFlink?

Hi all,

In PyFlink, is it possible to use the DataStream API to create a DataStream by 
means of StreamExecutionEnvironment's addSource(...), then perform 
transformations on this data stream using the DataStream API and then convert 
that stream into a form where SQL statements can be executed on it using the 
TableApi?

It's not clear to me whether it's possible to mix and match the APIs, or under 
what circumstances you can (or should/should not) move between the 2.

Many thanks,

John



Is it possible to use DataStream API keyBy followed by Table API SQL in PyFlink?

2022-06-23 Thread John Tipper
Hi all,

In PyFlink, is it possible to use the DataStream API to create a DataStream by 
means of StreamExecutionEnvironment's addSource(...), then perform 
transformations on this data stream using the DataStream API and then convert 
that stream into a form where SQL statements can be executed on it using the 
TableApi?

It's not clear to me whether it's possible to mix and match the APIs, or under 
what circumstances you can (or should/should not) move between the 2.

Many thanks,

John



Flink TaskManager memory configuration failed

2022-06-22 Thread John Tipper
Hi all,

I'm wanting to run quite a number of PyFlink jobs on Kubernetes, where the 
amount of state and number of events being processed is small and therefore I'd 
like to use as little memory in my clusters as possible so I can bin pack most 
efficiently. I'm running a Flink cluster per job. I'm currently trying to see 
how small I can make the memory settings.

I set taskmanager.memory.process.size: 512mb​ in my Flink config. The container 
is being started with requested memory of 512Mi.

 When my TaskManager starts up, I get an error message:

IllegalConfigurationException: Sum of configured Framework Heap Memory (128mb), 
Framework Off-Heap Memory (128mb)​, Task Off-Heap Memory (0 bytes), Managed 
Memory (25.6mb) and Network Memory (64mb) exceed configured Total Flink Memory 
(64mb).

My understanding of the docs 
(https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/memory/mem_setup_tm/)
 is that I should just need to set taskmanager.memory.process.size. Where is 
the 64mb figure coming from which makes up the Total Flink Memory? How do I 
change this?

Many thanks,

John


Re: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s

2022-06-17 Thread John Tipper
Thanks Dian - I’m restricted to 1.13 for the moment because this is running 
inside AWS Kinesis. Is there a way to manually bypass that issue?

J

Sent from my iPhone

On 17 Jun 2022, at 04:59, Dian Fu  wrote:


>> This error generally occurs in jobs where there are transfers between Table 
>> and datastream.
AFAIK, this issue should have already been fixed, see 
https://issues.apache.org/jira/browse/FLINK-26920 and 
https://issues.apache.org/jira/browse/FLINK-23133 for more details.

Regards,
Dian

On Fri, Jun 17, 2022 at 10:17 AM Xingbo Huang 
mailto:hxbks...@gmail.com>> wrote:
Hi John,

Because I can't see your code, I can only provide some possible reasons for 
this error:
1. This error generally occurs in jobs where there are transfers between Table 
and datastream. But given that you said you just used the sql + python udf, 
this shouldn't be the case.
2. The default value of `taskmanager.memory.managed.consumer-weights` is 
`OPERATOR:70,STATE_BACKEND:70,PYTHON:30`, so in your case, there is actually no 
need to set it to `PYTHON:30`
3. In fact, for pure sql+python udf jobs, if you don't set error value 
`PYTHON:0` in `taskmanager.memory.managed.consumer-weights`, I really can't 
think of any situation where this problem will occur.

Best,
Xingbo

John Tipper mailto:john_tip...@hotmail.com>> 
于2022年6月16日周四 19:41写道:
Hi Xingbo,

Yes, there are a number of temporary views being created, where each is being 
created using SQL (CREATE TEMPORARY VIEW ...) rather than explicit calls to the 
Table and DataStream APIs.

Is this a good pattern or are there caveats I should be aware of please?

Many thanks,

John



From: Xingbo Huang mailto:hxbks...@gmail.com>>
Sent: 16 June 2022 12:34
To: John Tipper mailto:john_tip...@hotmail.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Re: The configured managed memory fraction for Python worker process 
must be within (0, 1], was: %s

Hi John,

Does your job logic include conversion between Table and DataStream? For 
example, methods such as `create_temporary_view(path: str, data_stream: 
DataStream): -> Table`  are used.

Best,
Xingbo

John Tipper mailto:john_tip...@hotmail.com>> 
于2022年6月16日周四 18:31写道:
Hi Xingbo,

I’m afraid I can’t share my code but Flink is 1.13. The main Flink code is 
running inside Kinesis on AWS so I cannot change the version.

Many thanks,

John

Sent from my iPhone

On 16 Jun 2022, at 10:37, Xingbo Huang 
mailto:hxbks...@gmail.com>> wrote:


Hi John,

Could you provide the code snippet and the version of pyflink you used?

Best,
Xingbo


John Tipper mailto:john_tip...@hotmail.com>> 
于2022年6月16日周四 17:05写道:
Hi all,

I'm trying to run a PyFlink unit test to test some PyFlink SQL and where my 
code uses a Python UDF.  I can't share my code but the test case is similar to 
the code here: 
https://github.com/apache/flink/blob/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/testing/test_case_utils.py
  When I have some simple SQL everything is fine. When I add a more complex 
query I get an error, which looks like it's memory related.


java.lang.IllegalArgumentException: The configured managed memory fraction
for Python worker process must be within (0, 1], was: %s. It may be because
the consumer type "Python" was missing or set to 0 for the config option
"taskmanager.memory.managed.consumer-weights".0.0


In my test case setUp(), I try to set that value like this, but it seems to 
have no effect:

self.t_env.get_config().get_configuration().set_string("taskmanager.memory.managed.consumer-weights",
 "PYTHON:30")


Am I not setting it correctly, or is there something else I need to do to fix 
this error?

Many thanks,

John



Re: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s

2022-06-16 Thread John Tipper
Hi Xingbo,

Yes, there are a number of temporary views being created, where each is being 
created using SQL (CREATE TEMPORARY VIEW ...) rather than explicit calls to the 
Table and DataStream APIs.

Is this a good pattern or are there caveats I should be aware of please?

Many thanks,

John



From: Xingbo Huang 
Sent: 16 June 2022 12:34
To: John Tipper 
Cc: user@flink.apache.org 
Subject: Re: The configured managed memory fraction for Python worker process 
must be within (0, 1], was: %s

Hi John,

Does your job logic include conversion between Table and DataStream? For 
example, methods such as `create_temporary_view(path: str, data_stream: 
DataStream): -> Table`  are used.

Best,
Xingbo

John Tipper mailto:john_tip...@hotmail.com>> 
于2022年6月16日周四 18:31写道:
Hi Xingbo,

I’m afraid I can’t share my code but Flink is 1.13. The main Flink code is 
running inside Kinesis on AWS so I cannot change the version.

Many thanks,

John

Sent from my iPhone

On 16 Jun 2022, at 10:37, Xingbo Huang 
mailto:hxbks...@gmail.com>> wrote:


Hi John,

Could you provide the code snippet and the version of pyflink you used?

Best,
Xingbo


John Tipper mailto:john_tip...@hotmail.com>> 
于2022年6月16日周四 17:05写道:
Hi all,

I'm trying to run a PyFlink unit test to test some PyFlink SQL and where my 
code uses a Python UDF.  I can't share my code but the test case is similar to 
the code here: 
https://github.com/apache/flink/blob/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/testing/test_case_utils.py
  When I have some simple SQL everything is fine. When I add a more complex 
query I get an error, which looks like it's memory related.


java.lang.IllegalArgumentException: The configured managed memory fraction
for Python worker process must be within (0, 1], was: %s. It may be because
the consumer type "Python" was missing or set to 0 for the config option
"taskmanager.memory.managed.consumer-weights".0.0


In my test case setUp(), I try to set that value like this, but it seems to 
have no effect:

self.t_env.get_config().get_configuration().set_string("taskmanager.memory.managed.consumer-weights",
 "PYTHON:30")


Am I not setting it correctly, or is there something else I need to do to fix 
this error?

Many thanks,

John



Re: The configured managed memory fraction for Python worker process must be within (0, 1], was: %s

2022-06-16 Thread John Tipper
Hi Xingbo,

I’m afraid I can’t share my code but Flink is 1.13. The main Flink code is 
running inside Kinesis on AWS so I cannot change the version.

Many thanks,

John

Sent from my iPhone

On 16 Jun 2022, at 10:37, Xingbo Huang  wrote:


Hi John,

Could you provide the code snippet and the version of pyflink you used?

Best,
Xingbo


John Tipper mailto:john_tip...@hotmail.com>> 
于2022年6月16日周四 17:05写道:
Hi all,

I'm trying to run a PyFlink unit test to test some PyFlink SQL and where my 
code uses a Python UDF.  I can't share my code but the test case is similar to 
the code here: 
https://github.com/apache/flink/blob/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/testing/test_case_utils.py
  When I have some simple SQL everything is fine. When I add a more complex 
query I get an error, which looks like it's memory related.


java.lang.IllegalArgumentException: The configured managed memory fraction
for Python worker process must be within (0, 1], was: %s. It may be because
the consumer type "Python" was missing or set to 0 for the config option
"taskmanager.memory.managed.consumer-weights".0.0


In my test case setUp(), I try to set that value like this, but it seems to 
have no effect:

self.t_env.get_config().get_configuration().set_string("taskmanager.memory.managed.consumer-weights",
 "PYTHON:30")


Am I not setting it correctly, or is there something else I need to do to fix 
this error?

Many thanks,

John



The configured managed memory fraction for Python worker process must be within (0, 1], was: %s

2022-06-16 Thread John Tipper
Hi all,

I'm trying to run a PyFlink unit test to test some PyFlink SQL and where my 
code uses a Python UDF.  I can't share my code but the test case is similar to 
the code here: 
https://github.com/apache/flink/blob/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/testing/test_case_utils.py
  When I have some simple SQL everything is fine. When I add a more complex 
query I get an error, which looks like it's memory related.


java.lang.IllegalArgumentException: The configured managed memory fraction
for Python worker process must be within (0, 1], was: %s. It may be because
the consumer type "Python" was missing or set to 0 for the config option
"taskmanager.memory.managed.consumer-weights".0.0


In my test case setUp(), I try to set that value like this, but it seems to 
have no effect:

self.t_env.get_config().get_configuration().set_string("taskmanager.memory.managed.consumer-weights",
 "PYTHON:30")


Am I not setting it correctly, or is there something else I need to do to fix 
this error?

Many thanks,

John



Re: How to handle deletion of items using PyFlink SQL?

2022-06-14 Thread John Tipper
Yes, I’m interested in the best pattern to follow with SQL to allow for a 
downstream DB using the JDBC SQL connector to reflect the state of rows added 
and deleted upstream.

So imagine there is a crawl event at t=C1 that happens with an associated 
timestamp and which finds resources A,B,C. Is it better to emit one event into 
the stream with an array of all resources or many events, each with one 
resource and a corresponding crawl timestamp. There is obviously a limit to the 
amount of data that can be in a single event so the latter pattern will scale 
better for many resources.

Flink SQL sees this stream and processes it, then emits to a JDBC sink where 
there is one row for A, B, C.

Later, at t=C2, another crawl happens, finding A, B, D. I want the sink DB to 
have 3 rows if possible and not have C. Alternatively it should have 4 rows 
with a tombstone/delete marker on row C so it’s obvious it doesn’t exist any 
more.

I’m interested in a SQL solution if possible.

J

Sent from my iPhone

On 9 Jun 2022, at 11:20, Xuyang  wrote:



Hi, Dian Fu.

  I think John's requirement is like a cdc source that the source needs the 
ability to know which of datas should be deleted and then notify the framework, 
and that is why I recommendation John to use the UDTF.


And hi, John.
  I'm not sure this doc [1] is enough. BTW, I think you can also try to 
customize a connector[2] to send `DELETE` RowData to downstream by java and use 
it in PyFlink SQL, and maybe it's more easy.


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/table/udfs/python_udfs/#table-functions

[2] 
https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/sourcessinks/#user-defined-sources--sinks


--

Best!
Xuyang


在 2022-06-09 08:53:36,"Dian Fu"  写道:

Hi John,

If you are using Table API & SQL, the framework is handling the RowKind and 
it's transparent for you. So usually you don't need to handle RowKind in Table 
API & SQL.

Regards,
Dian

On Thu, Jun 9, 2022 at 6:56 AM John Tipper 
mailto:john_tip...@hotmail.com>> wrote:
Hi Xuyang,

Thank you very much, I’ll experiment tomorrow. Do you happen to know whether 
there is a Python example of udtf() with a RowKind being set (or whether it’s 
supported)?

Many thanks,

John

Sent from my iPhone

On 8 Jun 2022, at 16:41, Xuyang mailto:xyzhong...@163.com>> 
wrote:


Hi, John.
What about use udtf [1]?
In your UDTF, all resources are saved as a set or map as s1. When t=2 arrives, 
the new resources as s2 will be collected by crawl. I think what you want is 
the deletion data that means 's1' - 's2'.
So just use loop to find out the deletion data and send RowData in function 
'eval' in UDTF, and the RowData can be sent with a RowKind 'DELETE'[2]. The 
'DELETE' means tell the downstream that this value is deleted.

I will be glad if it can help you.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#table-functions
[2] 
https://github.com/apache/flink/blob/44f73c496ed1514ea453615b77bee0486b8998db/flink-core/src/main/java/org/apache/flink/types/RowKind.java#L52



--

Best!
Xuyang


At 2022-06-08 20:06:17, "John Tipper" 
mailto:john_tip...@hotmail.com>> wrote:

Hi all,

I have some reference data that is periodically emitted by a crawler mechanism 
into an upstream Kinesis data stream, where those rows are used to populate a 
sink table (and where I am using Flink 1.13 PyFlink SQL within AWS Kinesis Data 
Analytics).  What is the best pattern to handle deletion of upstream data, such 
that the downstream table remains in sync with upstream?

For example, at t=1, rows R1, R2, R3 are processed from the stream, resulting 
in a DB with 3 rows.  At some point between t=1 and t=2, the resource 
corresponding to R2 was deleted, such that at t=2 when the next crawl was 
carried out only rows R1 and R2 were emitted into the upstream stream.  How 
should I process the stream of events so that when I have finished processing 
the events from t=2 my downstream table also has just rows R1 and R3?

Many thanks,

John


Re: How to implement unnest() as udtf using Python?

2022-06-13 Thread John Tipper
You’re a legend, thank you so much, I was looking on the internal functions 
docs page, not that one!

John

Sent from my iPhone

On 13 Jun 2022, at 13:21, Martijn Visser  wrote:


Hi John,

You're mentioning that Flink doesn't support UNNEST, but it does [1]. Would 
this work for you?

Best regards,

Martijn

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/queries/joins/#array-expansion

Op ma 13 jun. 2022 om 13:55 schreef John Tipper 
mailto:john_tip...@hotmail.com>>:
Hi all,

Flink doesn’t support the unnest() function, which takes an array and creates a 
row for each element in the array. I have column containing an array of 
map and I’d like to implement my own unnest.

I try this as an initial do-nothing implementation:

@udtf(result_types=Datatypes.MAP(
key_type=Datatypes.STRING(), value_type=Datatypes.STRING()))
def my_unnest(arr):
return []

I get an error when Flink starts:

No match found for function signature my_unnest()

Is there something that I’m missing in my definition please?

Many thanks,

John


How to implement unnest() as udtf using Python?

2022-06-13 Thread John Tipper
Hi all,

Flink doesn’t support the unnest() function, which takes an array and creates a 
row for each element in the array. I have column containing an array of 
map and I’d like to implement my own unnest.

I try this as an initial do-nothing implementation:

@udtf(result_types=Datatypes.MAP(
key_type=Datatypes.STRING(), value_type=Datatypes.STRING()))
def my_unnest(arr):
return []

I get an error when Flink starts:

No match found for function signature my_unnest()

Is there something that I’m missing in my definition please?

Many thanks,

John


Re: How to handle deletion of items using PyFlink SQL?

2022-06-08 Thread John Tipper
Hi Xuyang,

Thank you very much, I’ll experiment tomorrow. Do you happen to know whether 
there is a Python example of udtf() with a RowKind being set (or whether it’s 
supported)?

Many thanks,

John

Sent from my iPhone

On 8 Jun 2022, at 16:41, Xuyang  wrote:


Hi, John.
What about use udtf [1]?
In your UDTF, all resources are saved as a set or map as s1. When t=2 arrives, 
the new resources as s2 will be collected by crawl. I think what you want is 
the deletion data that means 's1' - 's2'.
So just use loop to find out the deletion data and send RowData in function 
'eval' in UDTF, and the RowData can be sent with a RowKind 'DELETE'[2]. The 
'DELETE' means tell the downstream that this value is deleted.

I will be glad if it can help you.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#table-functions
[2] 
https://github.com/apache/flink/blob/44f73c496ed1514ea453615b77bee0486b8998db/flink-core/src/main/java/org/apache/flink/types/RowKind.java#L52



--

Best!
Xuyang


At 2022-06-08 20:06:17, "John Tipper"  wrote:

Hi all,

I have some reference data that is periodically emitted by a crawler mechanism 
into an upstream Kinesis data stream, where those rows are used to populate a 
sink table (and where I am using Flink 1.13 PyFlink SQL within AWS Kinesis Data 
Analytics).  What is the best pattern to handle deletion of upstream data, such 
that the downstream table remains in sync with upstream?

For example, at t=1, rows R1, R2, R3 are processed from the stream, resulting 
in a DB with 3 rows.  At some point between t=1 and t=2, the resource 
corresponding to R2 was deleted, such that at t=2 when the next crawl was 
carried out only rows R1 and R2 were emitted into the upstream stream.  How 
should I process the stream of events so that when I have finished processing 
the events from t=2 my downstream table also has just rows R1 and R3?

Many thanks,

John


How to handle deletion of items using PyFlink SQL?

2022-06-08 Thread John Tipper
Hi all,

I have some reference data that is periodically emitted by a crawler mechanism 
into an upstream Kinesis data stream, where those rows are used to populate a 
sink table (and where I am using Flink 1.13 PyFlink SQL within AWS Kinesis Data 
Analytics).  What is the best pattern to handle deletion of upstream data, such 
that the downstream table remains in sync with upstream?

For example, at t=1, rows R1, R2, R3 are processed from the stream, resulting 
in a DB with 3 rows.  At some point between t=1 and t=2, the resource 
corresponding to R2 was deleted, such that at t=2 when the next crawl was 
carried out only rows R1 and R2 were emitted into the upstream stream.  How 
should I process the stream of events so that when I have finished processing 
the events from t=2 my downstream table also has just rows R1 and R3?

Many thanks,

John


Re: Multiple INSERT INTO within single PyFlink job?

2022-04-29 Thread John Tipper
Ah, found it: I need to use add_insert_sql()​ in order to use multiple insert 
statements:

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/insert/#insert-statement

https://aws.amazon.com/blogs/big-data/build-a-real-time-streaming-application-using-apache-flink-python-api-with-amazon-kinesis-data-analytics/


Sorry for the noise.

From: John Tipper 
Sent: 29 April 2022 14:07
To: user@flink.apache.org 
Subject: Multiple INSERT INTO within single PyFlink job?

Hi all,

Is it possible to have more than one `INSERT INTO ... SELECT ...` statement 
within a single PyFlink job (on Flink 1.13.6)?

I have a number of output tables that I create and I am trying to write to 
write to these within a single job, where the example SQL looks like (assume 
there is an input table called 'input'):

sql1 = "INSERT INTO out1 (col1, col2) SELECT col1, col2 FROM input"
sql2 = "INSERT INTO out2 (col3, col4) SELECT col3, col4 FROM input"

env.execute_sql(sql1)
env.execute_sql(sql2)


When this is run inside a Flink cluster inside Kinesis on AWS, I get a failure: 
"Cannot have more than one execute() or executeAsync() call in a single 
environment".  When I look at the Flink web UI, I can see that there is one job 
called "insert-into_default_catalog.default_database.out1".  Does Flink 
separate out each INSERT statement into a separate job? It looks like it tries 
to create one job for the first query and then fails to create a second job for 
the second query.

Is there any way of getting it to run as a single job using SQL, without having 
to move away from SQL and the Table API?

Many thanks,

John


Multiple INSERT INTO within single PyFlink job?

2022-04-29 Thread John Tipper
Hi all,

Is it possible to have more than one `INSERT INTO ... SELECT ...` statement 
within a single PyFlink job (on Flink 1.13.6)?

I have a number of output tables that I create and I am trying to write to 
write to these within a single job, where the example SQL looks like (assume 
there is an input table called 'input'):

sql1 = "INSERT INTO out1 (col1, col2) SELECT col1, col2 FROM input"
sql2 = "INSERT INTO out2 (col3, col4) SELECT col3, col4 FROM input"

env.execute_sql(sql1)
env.execute_sql(sql2)


When this is run inside a Flink cluster inside Kinesis on AWS, I get a failure: 
"Cannot have more than one execute() or executeAsync() call in a single 
environment".  When I look at the Flink web UI, I can see that there is one job 
called "insert-into_default_catalog.default_database.out1".  Does Flink 
separate out each INSERT statement into a separate job? It looks like it tries 
to create one job for the first query and then fails to create a second job for 
the second query.

Is there any way of getting it to run as a single job using SQL, without having 
to move away from SQL and the Table API?

Many thanks,

John


Re: Unit testing PyFlink SQL project

2022-04-25 Thread John Tipper
Hi Dian,

I've tried this and it works nicely, on both MacOS and Windows, thank you very 
much indeed for your help.

Kind regards,

John

From: Dian Fu 
Sent: 25 April 2022 02:42
To: John Tipper 
Cc: user@flink.apache.org 
Subject: Re: Unit testing PyFlink SQL project

Hi John,

I'm also using MacOS. This is the steps I'm following which I have run 
successfully:
1) python3 -m venv .venv
2) source .venv/bin/activate
3) pip install apache-flink==1.14.4
4) python -c "import pyflink;import 
os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
It will print something like this: 
"/Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/pyflink/log"
5) check the structure of the installed package:
```
(.venv) (base) dianfu@B-7174MD6R-1908 testing % ls -lh 
/Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/pyflink/
total 136
-rw-r--r--   1 dianfu  staff   1.3K Apr 25 09:26 README.txt
-rw-r--r--   1 dianfu  staff   1.9K Apr 25 09:26 __init__.py
drwxr-xr-x  11 dianfu  staff   352B Apr 25 09:26 __pycache__
drwxr-xr-x  25 dianfu  staff   800B Apr 25 09:26 bin
drwxr-xr-x  21 dianfu  staff   672B Apr 25 09:26 common
drwxr-xr-x  13 dianfu  staff   416B Apr 25 09:26 conf
drwxr-xr-x  20 dianfu  staff   640B Apr 25 09:26 datastream
drwxr-xr-x   4 dianfu  staff   128B Apr 25 09:26 examples
-rw-r--r--   1 dianfu  staff   3.2K Apr 25 09:26 find_flink_home.py
drwxr-xr-x  25 dianfu  staff   800B Apr 25 09:26 fn_execution
-rw-r--r--   1 dianfu  staff   9.1K Apr 25 09:26 gen_protos.py
-rw-r--r--   1 dianfu  staff   7.6K Apr 25 09:26 java_gateway.py
drwxr-xr-x  11 dianfu  staff   352B Apr 25 09:26 lib
drwxr-xr-x  28 dianfu  staff   896B Apr 25 09:26 licenses
drwxr-xr-x   4 dianfu  staff   128B Apr 25 09:26 log
drwxr-xr-x   5 dianfu  staff   160B Apr 25 09:26 metrics
drwxr-xr-x   4 dianfu  staff   128B Apr 25 09:26 opt
drwxr-xr-x  11 dianfu  staff   352B Apr 25 09:26 plugins
-rw-r--r--   1 dianfu  staff   1.3K Apr 25 09:26 pyflink_callback_server.py
-rw-r--r--   1 dianfu  staff12K Apr 25 09:26 pyflink_gateway_server.py
-rw-r--r--   1 dianfu  staff   5.3K Apr 25 09:26 serializers.py
-rw-r--r--   1 dianfu  staff   7.9K Apr 25 09:26 shell.py
drwxr-xr-x  31 dianfu  staff   992B Apr 25 09:26 table
drwxr-xr-x   6 dianfu  staff   192B Apr 25 09:26 util
-rw-r--r--   1 dianfu  staff   1.1K Apr 25 09:26 version.py
```
6) Execute command `python3 -m unittest 
test_table_api.TableTests.test_scalar_function`
The output is as following and you could see that it executes successfully:
```
(.venv) (base) dianfu@B-7174MD6R-1908 testing % python3 -m unittest 
test_table_api.TableTests.test_scalar_function
Using %s as FLINK_HOME... 
/Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/pyflink
Skipped download 
/Users/dianfu/code/src/github/pyflink-faq/testing/flink-python_2.11-1.14.4-tests.jar
 since it already exists.
/Users/dianfu/miniconda3/lib/python3.8/subprocess.py:946: ResourceWarning: 
subprocess 71018 is still running
  _warn("subprocess %s is still running" % self.pid,
ResourceWarning: Enable tracemalloc to get the object allocation traceback
Downloading jar org.apache.flink:flink-table-planner_2.11:1.14.4:jar:tests
/Users/dianfu/code/src/github/pyflink-faq/testing/.venv/lib/python3.8/site-packages/pyflink/table/table_environment.py:538:
 DeprecationWarning: Deprecated in 1.10. Use create_table instead.
  warnings.warn("Deprecated in 1.10. Use create_table instead.", 
DeprecationWarning)
.
--
Ran 1 test in 32.746s

OK
```

I have also tried your commands and run into the same error. I believe the 
difference comes from `python setup.py install` vs `pip install 
apache-flink==1.14.4`. When installing with command `python setup.py install`, 
the structure of the installed package is a little different from `pip install 
apache-flink==1.14.4`. I will dig into this and share the results when I have 
some findings.

Before that, could you try to create a new clean virtual environment and see if 
the steps I'm following work for you?

Regards,
Dian

On Mon, Apr 25, 2022 at 6:04 AM John Tipper 
mailto:john_tip...@hotmail.com>> wrote:
And now when I add further dependencies to the classpath to remove all 
ClassNotFound exceptions, I get a different error which I don't understand 
(TypeError: Could not found the Java class 
'EnvironmentSettings.inStreamingMode'.), see the logs below:


$ python test_table_api.py TableTests.test_scalar_function

Using %s as FLINK_HOME... 
/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink

Skipped download 
/Users/john/PycharmProjects/pyflink-faq/testing/flink-python_2.11-1.14.4-tests.jar
 since it already exists.

Skipped download 
/Users/john/PycharmProjects/pyflink-faq/testing/

Re: Unit testing PyFlink SQL project

2022-04-24 Thread John Tipper
And now when I add further dependencies to the classpath to remove all 
ClassNotFound exceptions, I get a different error which I don't understand 
(TypeError: Could not found the Java class 
'EnvironmentSettings.inStreamingMode'.), see the logs below:


$ python test_table_api.py TableTests.test_scalar_function

Using %s as FLINK_HOME... 
/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink

Skipped download 
/Users/john/PycharmProjects/pyflink-faq/testing/flink-python_2.11-1.14.4-tests.jar
 since it already exists.

Skipped download 
/Users/john/PycharmProjects/pyflink-faq/testing/flink-python_2.11-1.14.4.jar 
since it already exists.

Skipped download 
/Users/john/PycharmProjects/pyflink-faq/testing/flink-shaded-guava-30.1.1-jre-15.0.jar
 since it already exists.

/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/flink-shaded-guava-30.1.1-jre-15.0.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/flink-java-1.14.4.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/flink-table-planner_2.11-1.14.4.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/flink-table-common-1.14.4.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/jsr305-1.3.9.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/flink-python_2.11-1.14.4.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/flink-python_2.11-1.14.4-tests.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/flink-clients_2.11-1.14.4.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/flink-shaded-force-shading-14.0.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/flink-table-runtime_2.11-1.14.4.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/commons-compress-1.21.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/slf4j-api-1.7.15.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/flink-streaming-java_2.11-1.14.4.jar:/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/opt/flink-core-1.14.4.jar

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".

SLF4J: Defaulting to no-operation (NOP) logger implementation

SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.

/usr/local/Cellar/python@3.8/3.8.13/Frameworks/Python.framework/Versions/3.8/lib/python3.8/subprocess.py:946:
 ResourceWarning: subprocess 45753 is still running

  _warn("subprocess %s is still running" % self.pid,

ResourceWarning: Enable tracemalloc to get the object allocation traceback

E

==

ERROR: test_scalar_function (__main__.TableTests)

--

Traceback (most recent call last):

  File "/Users/john/PycharmProjects/pyflink-faq/testing/test_utils.py", line 
135, in setUp

self.t_env = 
TableEnvironment.create(EnvironmentSettings.in_streaming_mode())

  File 
"/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/table/environment_settings.py",
 line 267, in in_streaming_mode

get_gateway().jvm.EnvironmentSettings.inStreamingMode())

  File 
"/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/util/exceptions.py",
 line 185, in wrapped_call

raise TypeError(

TypeError: Could not found the Java class 
'EnvironmentSettings.inStreamingMode'. The Java dependencies could be specified 
via command line argument '--jarfile' or the config option 'pipeline.jars'


--

Ran 1 test in 0.401s


FAILED (errors=1)

sys:1: ResourceWarning: unclosed file <_io.BufferedWriter name=4>



Re: Unit testing PyFlink SQL project

2022-04-24 Thread John Tipper
le calling t.put


--

Ran 1 test in 1.089s


FAILED (errors=1)

sys:1: ResourceWarning: unclosed file <_io.BufferedWriter name=4>

(.venv) Johns-MacBook-Pro:testing john$ Unable to get the Python watchdog 
object, now exit.



From: John Tipper 
Sent: 24 April 2022 20:30
To: Dian Fu 
Cc: user@flink.apache.org 
Subject: Re: Unit testing PyFlink SQL project

Hi Dian,

Thank you very much, that's very helpful.  I'm seeing a couple of errors when I 
try to run the example though (Python 3.8 on Mac OS).


  1.  I create a fresh Python virtual env: `python -m venv .venv`
  2.  `source .venv/bin/activate`
  3.  When I tried to configure the project by running `python setup.py 
install` I got errors about Cython not being installed even though it was.  I 
then just had to do a `pip install apache-flink==1.14.4` to install the 
requirements and be able to move forward. Not sure what the issue here is.
  4.  $ python test_table_api.py TableTests.test_scalar_function

Using %s as FLINK_HOME... 
/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink

Skipped download 
/Users/john/PycharmProjects/pyflink-faq/testing/flink-python_2.11-1.14.4-tests.jar
 since it already exists.

The flink-python jar is not found in the opt folder of the FLINK_HOME: 
/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink

/Users/john/PycharmProjects/pyflink-faq/testing/flink-python_2.11-1.14.4-tests.jar

Error: Could not find or load main class 
org.apache.flink.client.python.PythonGatewayServer

Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.client.python.PythonGatewayServer

E/usr/local/Cellar/python@3.8/3.8.13/Frameworks/Python.framework/Versions/3.8/lib/python3.8/unittest/case.py:704:
 ResourceWarning: unclosed file <_io.BufferedWriter name=4>

  outcome.errors.clear()

ResourceWarning: Enable tracemalloc to get the object allocation traceback


==

ERROR: test_scalar_function (__main__.TableTests)

--

Traceback (most recent call last):

  File "/Users/john/PycharmProjects/pyflink-faq/testing/test_utils.py", line 
123, in setUp

self.t_env = 
TableEnvironment.create(EnvironmentSettings.in_streaming_mode())

  File 
"/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/table/environment_settings.py",
 line 267, in in_streaming_mode

get_gateway().jvm.EnvironmentSettings.inStreamingMode())

  File 
"/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/java_gateway.py",
 line 62, in get_gateway

_gateway = launch_gateway()

  File 
"/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/java_gateway.py",
 line 112, in launch_gateway

raise Exception("Java gateway process exited before sending its port 
number")

Exception: Java gateway process exited before sending its port number


--

Ran 1 test in 0.333s


FAILED (errors=1)


  5.  I then added `("org.apache.flink", "flink-python_2.11", "1.14.4", None)` 
to the testing_jars list so that the regular Flink jar would be downloaded, 
created an `opt` directory to the FLINK_HOME directory and copied into it the 
regular Flink jar.
  6.
  7.  $ python test_table_api.py TableTests.test_scalar_function

Using %s as FLINK_HOME... 
/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink

Skipped download 
/Users/john/PycharmProjects/pyflink-faq/testing/flink-python_2.11-1.14.4-tests.jar
 since it already exists.

Skipped download 
/Users/john/PycharmProjects/pyflink-faq/testing/flink-python_2.11-1.14.4.jar 
since it already exists.

/Users/john/PycharmProjects/pyflink-faq/testing/flink-python_2.11-1.14.4.jar:/Users/john/PycharmProjects/pyflink-faq/testing/flink-python_2.11-1.14.4-tests.jar

Exception in thread "main" java.lang.NoClassDefFoundError: 
org/slf4j/LoggerFactory

at 
org.apache.flink.client.python.PythonEnvUtils.(PythonEnvUtils.java:77)

at 
org.apache.flink.client.python.PythonGatewayServer.main(PythonGatewayServer.java:46)

Caused by: java.lang.ClassNotFoundException: org.slf4j.LoggerFactory

at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)

at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)

at java.base/java.la

Re: Unit testing PyFlink SQL project

2022-04-24 Thread John Tipper
--

Traceback (most recent call last):

  File "/Users/john/PycharmProjects/pyflink-faq/testing/test_utils.py", line 
131, in setUp

self.t_env = 
TableEnvironment.create(EnvironmentSettings.in_streaming_mode())

  File 
"/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/table/environment_settings.py",
 line 267, in in_streaming_mode

get_gateway().jvm.EnvironmentSettings.inStreamingMode())

  File 
"/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/java_gateway.py",
 line 62, in get_gateway

_gateway = launch_gateway()

  File 
"/Users/john/PycharmProjects/pyflink-faq/.venv/lib/python3.8/site-packages/apache_flink-1.14.4-py3.8-macosx-11-x86_64.egg/pyflink/java_gateway.py",
 line 112, in launch_gateway

raise Exception("Java gateway process exited before sending its port 
number")

Exception: Java gateway process exited before sending its port number


--

Ran 1 test in 0.344s


FAILED (errors=1)

  8.
Now it looks like the code needs all of the transitive dependencies on the 
classpath?

Have you managed to get your example tests to run in a completely clean virtual 
environment? It looks like if it's working on your computer that your computer 
perhaps has Java and Python dependencies already downloaded into particular 
locations.

Many thanks,

John


From: Dian Fu 
Sent: 24 April 2022 06:21
To: John Tipper 
Cc: user@flink.apache.org 
Subject: Re: Unit testing PyFlink SQL project

Hi John,

I have written an example on how to write unit tests of Flink functionalities 
with PyFlink in [1]. Hope it is helpful for you. Feel free to let me know if 
there are any problems.

Regards,
Dian

[1] https://github.com/dianfu/pyflink-faq/tree/main/testing

On Sun, Apr 24, 2022 at 9:25 AM Dian Fu 
mailto:dian0511...@gmail.com>> wrote:
Hi John,

>> I don't know how to fix this. I've tried adding `flink-table-planner` and 
>> `flink-table-planner-blink` dependencies with `test-jar` to my 
>> dummy pom.xml, but it still fails.
What's the failure after doing this? The flink-table-planner*-tests.jar should 
be available in maven repository[1].

>> This is starting to feel like a real pain to do something that should be 
>> trivial: basic TDD of a PyFlink project.  Is there a real-world example of a 
>> Python project that shows how to set up a testing environment for unit 
>> testing SQL with PyFlink?
I'm not aware of such a project, however I agree that this may be a very 
important aspect which should be improved. I will look into this.

Regards,
Dian

[1] 
https://repo1.maven.org/maven2/org/apache/flink/flink-table-planner_2.11/1.13.6/


On Sun, Apr 24, 2022 at 4:44 AM John Tipper 
mailto:john_tip...@hotmail.com>> wrote:
Hi all,

Is there an example of a self-contained repository showing how to perform SQL 
unit testing of PyFlink (specifically 1.13.x if possible)?  I have cross-posted 
the question to Stack Overflow here: 
https://stackoverflow.com/questions/71983434/is-there-an-example-of-pyflink-sql-unit-testing-in-a-self-contained-repo


There is a related SO question 
(https://stackoverflow.com/questions/69937520/pyflink-sql-local-test), where it 
is suggested to use some of the tests from PyFlink itself.  The issue I'm 
running into is that the PyFlink repo assumes that a bunch of things are on the 
Java classpath and that some Python utility classes are available (they're not 
distributed via PyPi apache-flink).

I have done the following:


  1.  Copied `test_case_utils.py` and `source_sink_utils.py` from PyFlink 
(https://github.com/apache/flink/tree/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/testing)
 into my project.
  2.  Copy an example unit test 
(https://github.com/apache/flink/blob/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/table/tests/test_sql.py#L39)
 as suggested by the related SO question.
  3.

When I try to run the test, I get an error because the test case cannot 
determine what version of Avro jars to download (`download_apache_avro()` 
fails, because pyflink_gateway_server.py tries to evaluate the value of 
`avro.version` by running `mvn help:evaluate -Dexpression=avro.version`)

I then added a dummy `pom.xml` defining a Maven property of `avro.version` 
(with a value of `1.10.0`) and my unit test case is loaded.

I now get a new error and my test is skipped:

'flink-table-planner*-tests.jar' is not available. Will skip the related 
tests.

I don't know how to fix this. I've tried adding `flink-table-planner` and 
`flink-table-planner-blink` dependencies with `test-jar` to my 
dummy pom.xml, but it still fails.

This is starting

Unit testing PyFlink SQL project

2022-04-23 Thread John Tipper
Hi all,

Is there an example of a self-contained repository showing how to perform SQL 
unit testing of PyFlink (specifically 1.13.x if possible)?  I have cross-posted 
the question to Stack Overflow here: 
https://stackoverflow.com/questions/71983434/is-there-an-example-of-pyflink-sql-unit-testing-in-a-self-contained-repo


There is a related SO question 
(https://stackoverflow.com/questions/69937520/pyflink-sql-local-test), where it 
is suggested to use some of the tests from PyFlink itself.  The issue I'm 
running into is that the PyFlink repo assumes that a bunch of things are on the 
Java classpath and that some Python utility classes are available (they're not 
distributed via PyPi apache-flink).

I have done the following:


  1.  Copied `test_case_utils.py` and `source_sink_utils.py` from PyFlink 
(https://github.com/apache/flink/tree/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/testing)
 into my project.
  2.  Copy an example unit test 
(https://github.com/apache/flink/blob/f8172cdbbc27344896d961be4b0b9cdbf000b5cd/flink-python/pyflink/table/tests/test_sql.py#L39)
 as suggested by the related SO question.
  3.

When I try to run the test, I get an error because the test case cannot 
determine what version of Avro jars to download (`download_apache_avro()` 
fails, because pyflink_gateway_server.py tries to evaluate the value of 
`avro.version` by running `mvn help:evaluate -Dexpression=avro.version`)

I then added a dummy `pom.xml` defining a Maven property of `avro.version` 
(with a value of `1.10.0`) and my unit test case is loaded.

I now get a new error and my test is skipped:

'flink-table-planner*-tests.jar' is not available. Will skip the related 
tests.

I don't know how to fix this. I've tried adding `flink-table-planner` and 
`flink-table-planner-blink` dependencies with `test-jar` to my 
dummy pom.xml, but it still fails.

This is starting to feel like a real pain to do something that should be 
trivial: basic TDD of a PyFlink project.  Is there a real-world example of a 
Python project that shows how to set up a testing environment for unit testing 
SQL with PyFlink?

Many thanks,

John


XXX doesn't exist in the parameters of the SQL statement

2022-04-17 Thread John Tipper
Hi all,

I'm having some issues with getting a Flink SQL application to work, where I 
get an exception and I'm not sure why it's occurring.

I have a source table, reading from Kinesis, where the incoming data in JSON 
format has a "source" and "detail-type" field.


CREATE TABLE `input` (
`source` VARCHAR,
`detail-type` VARCHAR
) WITH (
'connector' = 'kinesis',
'format' = 'json',
...
)

I have an output table called output writing to a table called dummy in a JDBC 
database, where the JDBC database table has 2 columns, source and detail-type.

CREATE TABLE `output`(
  `source` VARCHAR,
  `detail-type` VARCHAR
) WITH (
  'connector' = 'jdbc',
  'table-name' = 'dummy',
  ...
)

I am trying to simply confirm I can write the data from input to output without 
transformation:

INSERT INTO `output`(`​source`, `detail-type`) SELECT `​source`, `detail-type` 
from `input`

However, my Flink job has the following exception:


detail-type doesn't exist in the parameters of SQL statement INSERT INTO 
dummy(source, detail-type) VALUES (:source, :detail-type)

I get the same error when I try:

INSERT INTO `output` SELECT`​source`, `detail-type` from `input`

Can anyone please advise what I'm doing wrong?

Many thanks,

John



Re: How to process events with different JSON schemas in single Kinesis stream using PyFlink?

2022-04-11 Thread John Tipper
Hi Dian,

Thank you very much, that worked very nicely.

Kind regards,

John

From: Dian Fu 
Sent: 11 April 2022 06:32
To: John Tipper 
Cc: user@flink.apache.org 
Subject: Re: How to process events with different JSON schemas in single 
Kinesis stream using PyFlink?

Hi John,

1) Regarding to Table API, you could declare the column `detail` as STRING and 
then parse it into a json in the Python use-defined function as following:

```

@udf(result_type=DataTypes.STRING())
def get_id(detail):
detail_json = json.loads(detail)
if 'build-id' in detail_json:
return detail_json['build-id']
else:
return detail_json['instance-id']

```

2) Regarding the DataStream API, Kinesis is still not supported, however it 
should be very easy as it is simply a wrapping of the Java Kinesis connector. 
If you want to use DataStream API, you could wrap it yourself for now and could 
refer to how the other connectors are handled [1] for more details.

Regards,
Dian

[1] 
https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py#L1235

On Mon, Apr 11, 2022 at 6:17 AM John Tipper 
mailto:john_tip...@hotmail.com>> wrote:
TLDR; I want to know how best to process a stream of events using PyFlink, 
where the events in the stream have a number of different schemas.

Details:

I want to process a stream of events coming from a Kinesis data stream which 
originate from an AWS EventBridge bus. The events in this stream are all JSON, 
but have different schema. Specifically, they all have a common set of 
properties (e.g. version, id, time, source, detail-type), but there is a 
"detail" section that is a different shape, depending on what the event type is.

For instance, an event on the bus from EC2 might look something like this:


{
  "version": "0",
  "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718",
  "detail-type": "EC2 Instance State-change Notification",
  "source": "aws.ec2",
  "account": "",
  "time": "2017-12-22T18:43:48Z",
  "region": "us-west-1",
  "detail": {
"instance-id": " i-1234567890abcdef0",
"state": "terminated"
  }
}

whereas the "detail" section for an event from Codebuild might look like this 
(and the top level "source" field would be "aws.codebuild"):


   "detail":{
"build-status": "SUCCEEDED",
"project-name": "my-sample-project",
"build-id": 
"arn:aws:codebuild:us-west-2:123456789012:build/my-sample-project:8745a7a9-c340-456a-9166-edf953571bEX",
"additional-information": {
  "artifact": {
"md5sum": "da9c44c8a9a3cd4b443126e823168fEX",
"sha256sum": 
"6ccc2ae1df9d155ba83c597051611c42d60e09c6329dcb14a312cecc0a8e39EX",
"location": 
"arn:aws:s3:::codebuild-123456789012-output-bucket/my-output-artifact.zip"
  }
 }
   }

I have the following constraints that I really want to abide by:


  1.  I cannot create a Kinesis data stream for each event type, i.e. the 
stream of events is multiplexed.
  2.  I'd like to create application code in Python using PyFlink (I can code 
happily in Java but my colleagues who need to contribute cannot).

I want to ingest the stream of events, key by the type of event (key by 
"detail-type", or "source") and then process the events according to their type.

I thought my options might be:


  1.  Use SQL/Table API (my preferred option), but it looks like JSON queries 
are not scheduled to be released until Flink 1.15, and until then I cannot 
define the schema of the input table where a field is a generic Map (i.e. 
Map<>). It appears I have to define the schema of the input table exactly, and 
I don't see how I can create a table which covers a property that varies in 
shape, e.g. for EC2 example above the schema for "detail-tye" is Map, but for a CodeBuild event it's a more deeply nested JSON structure. 
If there were a "JSON" type then this would appear to be the way to go.
  2.
  3.  Use the Datastream API, but it looks like there is not a PyFlink Kinesis 
connector for the DataStream API. There is a Java connector - what's involved 
in creating a custom PyFlink Datastream connector?

Are there any other options I've missed? What's the best way to approach this?

Many thanks,

John



How to process events with different JSON schemas in single Kinesis stream using PyFlink?

2022-04-10 Thread John Tipper
TLDR; I want to know how best to process a stream of events using PyFlink, 
where the events in the stream have a number of different schemas.

Details:

I want to process a stream of events coming from a Kinesis data stream which 
originate from an AWS EventBridge bus. The events in this stream are all JSON, 
but have different schema. Specifically, they all have a common set of 
properties (e.g. version, id, time, source, detail-type), but there is a 
"detail" section that is a different shape, depending on what the event type is.

For instance, an event on the bus from EC2 might look something like this:


{
  "version": "0",
  "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718",
  "detail-type": "EC2 Instance State-change Notification",
  "source": "aws.ec2",
  "account": "",
  "time": "2017-12-22T18:43:48Z",
  "region": "us-west-1",
  "detail": {
"instance-id": " i-1234567890abcdef0",
"state": "terminated"
  }
}

whereas the "detail" section for an event from Codebuild might look like this 
(and the top level "source" field would be "aws.codebuild"):


   "detail":{
"build-status": "SUCCEEDED",
"project-name": "my-sample-project",
"build-id": 
"arn:aws:codebuild:us-west-2:123456789012:build/my-sample-project:8745a7a9-c340-456a-9166-edf953571bEX",
"additional-information": {
  "artifact": {
"md5sum": "da9c44c8a9a3cd4b443126e823168fEX",
"sha256sum": 
"6ccc2ae1df9d155ba83c597051611c42d60e09c6329dcb14a312cecc0a8e39EX",
"location": 
"arn:aws:s3:::codebuild-123456789012-output-bucket/my-output-artifact.zip"
  }
 }
   }

I have the following constraints that I really want to abide by:


  1.  I cannot create a Kinesis data stream for each event type, i.e. the 
stream of events is multiplexed.
  2.  I'd like to create application code in Python using PyFlink (I can code 
happily in Java but my colleagues who need to contribute cannot).

I want to ingest the stream of events, key by the type of event (key by 
"detail-type", or "source") and then process the events according to their type.

I thought my options might be:


  1.  Use SQL/Table API (my preferred option), but it looks like JSON queries 
are not scheduled to be released until Flink 1.15, and until then I cannot 
define the schema of the input table where a field is a generic Map (i.e. 
Map<>). It appears I have to define the schema of the input table exactly, and 
I don't see how I can create a table which covers a property that varies in 
shape, e.g. for EC2 example above the schema for "detail-tye" is Map, but for a CodeBuild event it's a more deeply nested JSON structure. 
If there were a "JSON" type then this would appear to be the way to go.
  2.
  3.  Use the Datastream API, but it looks like there is not a PyFlink Kinesis 
connector for the DataStream API. There is a Java connector - what's involved 
in creating a custom PyFlink Datastream connector?

Are there any other options I've missed? What's the best way to approach this?

Many thanks,

John



Re: Does Flink support raw generic types in a merged stream?

2019-07-17 Thread John Tipper
Hi Chesnay,

Yes, but the actual use case needs to support more than 2 streams, so if I go 
down the Either route then I have arbitrarily sized nested Eithers, i.e. 
Either, C> etc, which gets pretty messy very quickly.

Many thanks,

John

Sent from my iPhone

On 17 Jul 2019, at 13:29, Chesnay Schepler 
mailto:ches...@apache.org>> wrote:


Have you looked at org.apache.flink.types.Either? If you'd wrap all elements in 
both streams before the union you should be able to join them properly.

On 17/07/2019 14:18, John Tipper wrote:
Hi All,


Can I union/join 2 streams containing generic classes, where each stream has a 
different parameterised type? I'd like to process the combined stream of values 
as a single raw type, casting to a specific type for detailed processing, based 
on some information in the type that will allow me to safely cast to the 
specific type.

I can't share my exact code, but the below example shows the sort of thing I 
want to do.

So, as an example, given the following generic type:

class MyGenericContainer extends Tuple3 {
...

private final String myString;

private final IN value;

private final Class clazz; // created by constructor

private SomeOtherClass someOtherClass;

...

}


and 2 streams, I'd like to be able to do something like:

DataStream> stream1 = ...
DataStream> stream2 = ...



DataStream<...> merged = stream1.union(stream2).process(new 
MyProcessFunction());



// within an operator, such as a MyProcessFunction:

MyGenericContainer container = raw generic container passed to function;

Object rawValue = container.getValue();

performProcessing((container.getClazz())rawValue); // safely cast rawValue


However, I get an error when I do this:

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of 
TypeVariable 'IN' in 'class com.example.MyGenericTuple3' could not be 
determined. This is most likely a type erasure problem. The type extraction 
currently supports types with generic variables only in cases where all 
variables in the return type can be deduced from the input type(s). Otherwise 
the type has to be specified explicitly using type information.
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:1133)

at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:853)

at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803)

at 
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:587)

at 
org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:633)


If I try to add a returns() to the code, like this:

DataStream<...> merged = stream1.union(stream2)
.process(...)

.returns(new TypeHint() {})


then I get a different exception:

Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: The 
TypeHint is using a generic variable.This is not supported, generic types must 
be fully specified for the TypeHint.


Is this sort of thing supported or is there another way of joining multiple 
streams into a single stream, where each stream object will have a specific 
type of a common generic type?


Many thanks,

John




Does Flink support raw generic types in a merged stream?

2019-07-17 Thread John Tipper
Hi All,


Can I union/join 2 streams containing generic classes, where each stream has a 
different parameterised type? I'd like to process the combined stream of values 
as a single raw type, casting to a specific type for detailed processing, based 
on some information in the type that will allow me to safely cast to the 
specific type.

I can't share my exact code, but the below example shows the sort of thing I 
want to do.

So, as an example, given the following generic type:

class MyGenericContainer extends Tuple3 {
...
private final String myString;
private final IN value;
private final Class clazz; // created by constructor
private SomeOtherClass someOtherClass;
...
}

and 2 streams, I'd like to be able to do something like:

DataStream> stream1 = ...
DataStream> stream2 = ...

DataStream<...> merged = stream1.union(stream2).process(new 
MyProcessFunction());

// within an operator, such as a MyProcessFunction:
MyGenericContainer container = raw generic container passed to function;
Object rawValue = container.getValue();
performProcessing((container.getClazz())rawValue); // safely cast rawValue

However, I get an error when I do this:

Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of 
TypeVariable 'IN' in 'class com.example.MyGenericTuple3' could not be 
determined. This is most likely a type erasure problem. The type extraction 
currently supports types with generic variables only in cases where all 
variables in the return type can be deduced from the input type(s). Otherwise 
the type has to be specified explicitly using type information.
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:1133)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:853)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803)
at 
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:587)
at 
org.apache.flink.streaming.api.datastream.DataStream.process(DataStream.java:633)

If I try to add a returns() to the code, like this:

DataStream<...> merged = stream1.union(stream2)
.process(...)
.returns(new TypeHint() {})

then I get a different exception:

Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: The 
TypeHint is using a generic variable.This is not supported, generic types must 
be fully specified for the TypeHint.

Is this sort of thing supported or is there another way of joining multiple 
streams into a single stream, where each stream object will have a specific 
type of a common generic type?


Many thanks,

John



What order are events processed in iterative loop?

2019-06-17 Thread John Tipper
For the case of a single iteration of an iterative loop where the feedback type 
is different to the input stream type, what order are events processed in the 
forward flow? So for example, if we have:

  *   the input stream contains input1 followed by input2
  *   a ConnectedIterativeStream at the head of an iteration
  *   followed by a CoProcessFunction, which emits a feedback element in 
response to an inputthat closes the ConnectedIterativeStream

For an input stream of input1 followed by input2, what order of events does the 
CoProcessFunction see?

Does it see "input1, feedback1, input2, feedback2", or "input1, input2, 
feedback1, feedback2", or is it a non-deterministic processing time order based 
on the execution time of the CoProcessFunction, but where input1 is always 
processed before input2 and feedback1 is always processed before feedback2, 
e.g. either of the two orders are possible?

Many thanks,

John


How to join/group 2 streams by key?

2019-06-14 Thread John Tipper
Hi All,


I have 2 streams of events that relate to a common base event, where one stream 
is the result of a flatmap. I want to join all events that share a common 
identifier.

Thus I have something that looks like:

DataStream streamA = ...
DataStream streamB = someDataStream.flatMap(...) // produces stream of 
TypeB for each item in someDataStream


Both TypeA and TypeB share an identifier and I know how many TypeB objects 
there are in the parent object. I want to perform some processing when all of 
the events associated with a particular identifier have arrived, i.e. when I 
basically can create a Tuple3> object.

Is this best done with a WindowJoin and a GlobalWindow, a Window CoGroup and a 
GlobalWindow or by connecting the 2 streams into a ConnectedStream then 
performing the joining inside a CoProcessFunction?

Many thanks,

John


How are timestamps treated within an iterative DataStream loop within Flink?

2019-06-08 Thread John Tipper
Hi All,

How are timestamps treated within an iterative DataStream loop within Flink?

For example, here is an example of a simple iterative loop within Flink where 
the feedback loop is of a different type to the input stream:

DataStream inputStream = env.addSource(new MyInputSourceFunction());
IterativeStream.ConnectedIterativeStreams iterativeStream 
= inputStream.iterate().withFeedbackType(MyFeedback.class);
// define an output tag so we can emit feedback objects via a side output
final OutputTag outputTag = new 
OutputTag("feedback-output"){};
// now do some processing
SingleOutputStreamOperator combinedStreams = 
iterativeStream.process(new CoProcessFunction() {
@Override
public void processElement1(MyInput value, Context ctx, Collector 
out) throws Exception {
// do some processing of the stream of MyInput values
// emit MyOutput values downstream by calling out.collect()
out.collect(someInstanceOfMyOutput);
}

@Override
public void processElement2(MyFeedback value, Context ctx, 
Collector out) throws Exception {
// do some more processing on the feedback classes
// emit feedback items
ctx.output(outputTag, someInstanceOfMyFeedback);
}
});

iterativeStream.closeWith(combinedStreams.getSideOutput(outputTag));

My questions revolve around how does Flink use timestamps within a feedback 
loop:

  *   Within the ConnectedIterativeStreams, how does Flink treat ordering of 
the input objects across the streams of regular inputs and feedback objects? If 
I emit an object into the feedback loop, when will it be seen by the head of 
the loop with respect to the regular stream of input objects?
  *   How does the behaviour change when using event time processing?

Many thanks,

John

Question also posted to StackOverflow here: 
https://stackoverflow.com/questions/56506020/how-does-flink-treat-timestamps-within-iterative-loops



Is it possible to configure Flink pre-flight type serialization scanning?

2019-06-03 Thread John Tipper
Flink performs significant scanning during the pre-flight phase of a Flink 
application 
(https://ci.apache.org/projects/flink/flink-docs-stable/dev/types_serialization.html).
 The act of creating sources, operators and sinks causes Flink to scan the data 
types of the objects that are used within the topology of a given streaming 
flow as apparently Flink will try to optimise jobs based on this information.

Is this scanning configurable? Can I turn it off and just force Flink to use 
Kryo serialisation only and not need or use any of this scanned information?

I have a very large, deeply nested class in a proprietary library that was auto 
generated and Flink seems to get into a very large endless loop when scanning 
it that results in out of memory errors after running for several hours (the 
application never actually launches via env.execute(), even if I bump up the 
heap size significantly). The class has a number of circular references, i.e. 
class and its child classes contains references to other classes of the same 
type, is this likely to be a problem?

Many thanks,

John


Re: FLIP-16, FLIP-15 Status Updates?

2019-02-19 Thread John Tipper
Hi Timo,

That’s great, thank you very much. If I’d like to contribute, is it best to 
wait until the roadmap has been published? And is this the best list to ask on, 
or is the development mailing list better?

Many thanks,

John

Sent from my iPhone

> On 19 Feb 2019, at 16:29, Timo Walther  wrote:
> 
> Hi John,
> 
> you are right that there was not much progress in the last years around these 
> two FLIPs. Mostly due to shift of priorities. However, with the big Blink 
> code contribution from Alibaba and joint development forces for a unified 
> batch and streaming runtime [1], it is very likely that also iterations and 
> thus machine learning algorithms will see more development efforts.
> 
> The community is working on roadmap page for the website. And I can already 
> reveal that a new iterations model is mentioned there. The new Flink roadmap 
> page can be expected in the next 2-3 weeks.
> 
> I hope this information helps.
> 
> Regards,
> Timo
> 
> [1] 
> https://flink.apache.org/news/2019/02/13/unified-batch-streaming-blink.html
> 
>> Am 19.02.19 um 12:47 schrieb John Tipper:
>> Hi All,
>> 
>> Does anyone know what the current status is for FLIP-16 (loop fault 
>> tolerance) and FLIP-15 (redesign iterations) please? I can see lots of work 
>> back in 2016, but it all seemed to stop and go quiet since about March 2017. 
>> I see iterations as offering very interesting capabilities for Flink, so it 
>> would be good to understand how we can get this moving again.
>> 
>> Many thanks,
>> 
>> John
>> 
>> Sent from my iPhone
> 
> 


FLIP-16, FLIP-15 Status Updates?

2019-02-19 Thread John Tipper
Hi All,

Does anyone know what the current status is for FLIP-16 (loop fault tolerance) 
and FLIP-15 (redesign iterations) please? I can see lots of work back in 2016, 
but it all seemed to stop and go quiet since about March 2017. I see iterations 
as offering very interesting capabilities for Flink, so it would be good to 
understand how we can get this moving again.

Many thanks,

John

Sent from my iPhone