Pyflink UDF with ARRAY as input

2020-12-17 Thread Barth, Torben
Dear List,

I have  a table with the following structure

my_table
-- Key: String
-- List_element: ARRAY>

I want to define a udf to extract information of the “list_element”. I do not 
manage to access the information of the array in the udf. I try something like:

@udf(result_type=DataTypes.STRING())
def get_string_element(my_list):
   my_string = ‘xxx’
for element in my_list:
if element.integer_element  == 2:
my_string = element. string_element
return my_string


table_env.create_temporary_function("get_string_element", get_string_element)
# use the function in Python Table API
my_table.select("get_string_element(List_element)")

Unfortunately, I cannot get it work. Does anybody have an idea how the correct 
way to extract the information is?

Any comments or ideas are very welcome.

Thanks
Torben

Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz 
der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, 
Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des 
Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; 
Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. 
Pierre Dominique Prümm, Dr. Matthias Zieschang


Pyflink Join with versioned view / table

2021-01-14 Thread Barth, Torben
Dear List,

I have trouble implementing a join between two streaming tables in Python Table 
API.

The left table of my join should be enriched with the information with the last 
value of the right_table. The right_table is updated only rarely (maybe after 
15 minutes). When implementing the join I get only updates when the right table 
changes. I want to trigger the updates for the joined table every time when I 
receive a record on the left side. The record should be enriched with the most 
recent result of the right side. I have not found a way to implement with the 
desired result.

It tried an implementation using a versioned view. Here is a short example:

left_table
root
|-- measurement_time: TIMESTAMP(3) *ROWTIME*
|-- x: DOUBLE
|-- y: DOUBLE
|-- proctime: TIMESTAMP(3) NOT NULL *PROCTIME* AS PROCTIME()
|-- WATERMARK FOR measurement_time: TIMESTAMP(3) AS `measurement_time`

right_table
|-- some_value: INT
|-- id: INT
|-- modtime: TIMESTAMP(3) *ROWTIME*
 The "id" is always defined as 1.
 I perform the following operations

t_env.create_temporary_view("left_table", left_table.add_columns("1.cast(INT) 
AS left_artificial_key"))
t_env.create_temporary_view("right_table", right_table)

sql_view = """
-- Define a versioned view
CREATE VIEW versioned_right AS
SELECT id, some_value, modtime
  FROM (
 SELECT *,
 ROW_NUMBER() OVER (PARTITION BY id
ORDER BY modtime DESC) AS rownum
 FROM right_table)
WHERE rownum = 1
"""

view = t_env.execute_sql(sql_view)

sql = """
   SELECT
   left_table.*, versioned_right.some_value
FROM left_table
LEFT JOIN versioned_right FOR SYSTEM_TIME AS OF 
left_table.measurement_time
ON abt.left_artificial_key = versioned_right.id
"""

joined = t_env.sql_query(sql)


I observed the same behavior when using a lateral join.

Does anybody have an idea how the join could be implemented in the correct way?

Any comments or ideas are very welcome.

Thanks
Torben

Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz 
der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, 
Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des 
Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; 
Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. 
Pierre Dominique Prümm, Dr. Matthias Zieschang


AW: Pyflink Join with versioned view / table

2021-01-19 Thread Barth, Torben
Hi Leonard,

thanks for your answer.

My data source is kafka so I cannot use the second option. The first option is 
unfortunately not working. I introduced the parameter but the updates are still 
only triggered by a change on the right side.

As a workaround I use the last_value operator right now. This seems to work for 
me right now but could result in problems in the future.

Best
Torben

Von: Leonard Xu [mailto:xbjt...@gmail.com]
Gesendet: Samstag, 16. Januar 2021 15:05
An: Barth, Torben 
Cc: user@flink.apache.org
Betreff: Re: Pyflink Join with versioned view / table

Hi, Torben

When implementing the join I get only updates when the right table changes

The event-time temporal join versioned table is triggered watermark which 
calculated by both left and right table’s watermark, so you  get only updated 
when the right table changes(which is the slower one in your case). The right 
table may change multiple times, we need to know when it changes and then 
output the right joined result after.


Does anybody have an idea how the join could be implemented in the correct way?

You can try to set a prober value for 
`table.exec.source.idle-timeout`(e.g:1minute) for your job, thus the right 
table will be marked as temporarily idle and the downstream join operator will 
only use the left table’s watermark, you can get updated at most 1 minute[1].

Another way is you can lookup the latest right table(if the table implements 
LookupTableSource, e.g. JDBC/HBase tables), the join will always return the 
most up-to-date value for a given key, you can get update immediately when 
input record from left table[2].

Best,
Leonard

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/config.html#table-exec-source-idle-timeout
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html#processing-time-temporal-join


Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz 
der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, 
Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des 
Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; 
Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. 
Pierre Dominique Prümm, Dr. Matthias Zieschang


AW: Pyflink Join with versioned view / table

2021-01-19 Thread Barth, Torben
Hi Leonard,

I have just realized that "last_value" operator does not work since it produces 
updates if the right side changes. I just need the current state in the moment 
I receive a message on the left side. It is indeed a lookup which O want to 
perform and not a real join.

Since my topic only has one partition the first option will also not work for 
me. Following your comments I guess I need to reconsider the design of my 
problem.

Best,
Torben

-Ursprüngliche Nachricht-
Von: Leonard Xu [mailto:xbjt...@gmail.com]
Gesendet: Dienstag, 19. Januar 2021 10:36
An: Barth, Torben 
Cc: user@flink.apache.org
Betreff: Re: Pyflink Join with versioned view / table

Hi, Torben

Happy to hear you address your problem, the first option can resolve the 
situation that partial partitions of the Kafka topic did not receive data, but 
if all partitions didn't receive data, the watermark won't  be pushed forward, 
and the temporal join won't be triggered.
Otherwise, we may get unexpected join result  because after we output the join 
result the change from the versioned table side happens.

Best,
Leonard

Fraport AG Frankfurt Airport Services Worldwide, 60547 Frankfurt am Main, Sitz 
der Gesellschaft: Frankfurt am Main, Amtsgericht Frankfurt am Main: HRB 7042, 
Umsatzsteuer-Identifikationsnummer: DE 114150623, Vorsitzender des 
Aufsichtsrates: Michael Boddenberg – Hessischer Minister der Finanzen; 
Vorstand: Dr. Stefan Schulte (Vorsitzender), Anke Giesen, Michael Müller, Dr. 
Pierre Dominique Prümm, Dr. Matthias Zieschang