Re: Non-Determinism in Table-API with Kafka and Event Time

2023-02-13 Thread Theodor Wübker
Hey Hector,

thanks for your reply. Your assumption is entirely correct, I have a few 
Million datasets on the topic already to test a streaming use case. I am 
planning on testing it with a variety of settings, but the problems occur with 
any cluster-configuration. For example Parallelism 1 with 1 Taskmanager and 1 
slot. I plan to scale it up to 10 slots and 10 parallelism for testing 
purposes. 

I do not think that any events are kept on hold, since the output always 
contains windows with the latest timestamp (but not enough of them, it should 
be much more). Nevertheless I will try your suggestion.

Maybe my configuration is wrong? The only “out-of-orderness”-related thing I 
have configured is Watermarks, in the way I sent previously. The docs [1] 
mention per-kafka-partition watermarks, perhaps this would help me? Sadly, it 
does not say there, how to activate it.

Best,
Theo

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector


> On 13. Feb 2023, at 10:42, Hector Rios  wrote:
> 
> Hi Theo
> 
> In your initial email, you mentioned that you have "a bit of Data on it" when 
> referring to your topic with ten partitions. Correct me if I'm wrong, but 
> that sounds like the data in your topic is bounded and trying to test a 
> streaming use-case. What kind of parallelism do you have configured for this 
> job? Is there a configuration to set the number of slots per task manager?
> 
> I've seen varying results based on the amount of parallelism configured on a 
> job. In the end, it usually boils down to the fact that events might be 
> ingested into Flink out of order. If the event time on an event is earlier 
> than the current watermark, then the event might be discarded unless you've 
> configured some level of out-of-orderedness. Even with out-of-orderedness 
> configured, if your data is bounded, you might have events with later event 
> times arriving earlier, which will remain in the state waiting for the 
> watermark to progress. As you can imagine, if there are no more events, then 
> your records are on hold. 
> 
> As a test, after all, your events have been ingested from the topic, try 
> producing one last event with an event time one or 2 hours later than your 
> latest event and see if they show up.
> 
> Hope it helps
> -Hector
> 
> On Mon, Feb 13, 2023 at 8:47 AM Theodor Wübker  <mailto:theo.wueb...@inside-m2m.de>> wrote:
> Hey,
> 
> so one more thing, the query looks like this:
> 
> SELECT window_start, window_end, a, b, c, count(*) as x FROM 
> TABLE(TUMBLE(TABLE data.v1, DESCRIPTOR(timeStampData), INTERVAL '1' HOUR)) 
> GROUP BY window_start, window_end, a, b, c
> 
> When the non-determinism occurs, the topic is not keyed at all. When I key it 
> by the attribute “a”, I get the incorrect, but deterministic results. Maybe 
> in the second case, only 1 partition out of the 10 is consumed at once?
> 
> Best,
> Theo
> 
>> On 13. Feb 2023, at 08:15, Theodor Wübker > <mailto:theo.wueb...@inside-m2m.de>> wrote
>> 
>> Hey Yuxia,
>> 
>> thanks for your response. I figured too, that the events arrive in a 
>> (somewhat) random order and thus cause non-determinism. I used a Watermark 
>> like this:"timeStampData - INTERVAL '10' SECOND” . Increasing the Watermark 
>> Interval does not solve the problem though, the results are still not 
>> deterministic. Instead I keyed the 10 partition topic. Now results are 
>> deterministic, but they are incorrect (way too few). Am I doing something 
>> fundamentally wrong? I just need the messages to be in somewhat in order 
>> (just so they don’t violate the watermark). 
>> 
>> Best,
>> Theo
>> 
>> (sent again, sorry, I previously only responded to you, not the Mailing list 
>> by accident)
>> 
>>> On 13. Feb 2023, at 08:14, Theodor Wübker >> <mailto:theo.wueb...@inside-m2m.de>> wrote:
>>> 
>>> Hey Yuxia,
>>> 
>>> thanks for your response. I figured too, that the events arrive in a 
>>> (somewhat) random order and thus cause non-determinism. I used a Watermark 
>>> like this: "timeStampData - INTERVAL '10' SECOND” . Increasing the 
>>> Watermark Interval does not solve the problem though, the results are still 
>>> not deterministic. Instead I keyed the 10 partition topic. Now results are 
>>> deterministic, but they are incorrect (way too few). Am I doing something 
>>> fundamentally wrong? I just need the messages to be in somewhat in order 
>>> (just so they don’t violate the watermark). 
>>> 
>>> Best,
>>> Theo
>&

Re: Non-Determinism in Table-API with Kafka and Event Time

2023-02-12 Thread Theodor Wübker
Hey,

so one more thing, the query looks like this:

SELECT window_start, window_end, a, b, c, count(*) as x FROM TABLE(TUMBLE(TABLE 
data.v1, DESCRIPTOR(timeStampData), INTERVAL '1' HOUR)) GROUP BY window_start, 
window_end, a, b, c

When the non-determinism occurs, the topic is not keyed at all. When I key it 
by the attribute “a”, I get the incorrect, but deterministic results. Maybe in 
the second case, only 1 partition out of the 10 is consumed at once?

Best,
Theo

> On 13. Feb 2023, at 08:15, Theodor Wübker  wrote
> 
> Hey Yuxia,
> 
> thanks for your response. I figured too, that the events arrive in a 
> (somewhat) random order and thus cause non-determinism. I used a Watermark 
> like this:"timeStampData - INTERVAL '10' SECOND” . Increasing the Watermark 
> Interval does not solve the problem though, the results are still not 
> deterministic. Instead I keyed the 10 partition topic. Now results are 
> deterministic, but they are incorrect (way too few). Am I doing something 
> fundamentally wrong? I just need the messages to be in somewhat in order 
> (just so they don’t violate the watermark). 
> 
> Best,
> Theo
> 
> (sent again, sorry, I previously only responded to you, not the Mailing list 
> by accident)
> 
>> On 13. Feb 2023, at 08:14, Theodor Wübker > <mailto:theo.wueb...@inside-m2m.de>> wrote:
>> 
>> Hey Yuxia,
>> 
>> thanks for your response. I figured too, that the events arrive in a 
>> (somewhat) random order and thus cause non-determinism. I used a Watermark 
>> like this: "timeStampData - INTERVAL '10' SECOND” . Increasing the Watermark 
>> Interval does not solve the problem though, the results are still not 
>> deterministic. Instead I keyed the 10 partition topic. Now results are 
>> deterministic, but they are incorrect (way too few). Am I doing something 
>> fundamentally wrong? I just need the messages to be in somewhat in order 
>> (just so they don’t violate the watermark). 
>> 
>> Best,
>> Theo
>> 
>>> On 13. Feb 2023, at 04:23, yuxia >> <mailto:luoyu...@alumni.sjtu.edu.cn>> wrote:
>>> 
>>> HI, Theo.
>>> I'm wondering what the Event-Time-Windowed Query you are using looks like.
>>> For example, how do you define the watermark?
>>> Considering you read records from the 10 partitions, and it may well that 
>>> the records will arrive the window process operator out of order. 
>>> Is it possible that the records exceed the watermark, but there're still 
>>> some records will arrive?
>>> 
>>> If that's the case, every time, the records used to calculate result may 
>>> well different and then result in non-determinism result.
>>> 
>>> Best regards,
>>> Yuxia
>>> 
>>> - 原始邮件 -
>>> 发件人: "Theodor Wübker" >> <mailto:theo.wueb...@inside-m2m.de>>
>>> 收件人: "User" mailto:user@flink.apache.org>>
>>> 发送时间: 星期日, 2023年 2 月 12日 下午 4:25:45
>>> 主题: Non-Determinism in Table-API with Kafka and Event Time
>>> 
>>> Hey everyone,
>>> 
>>> I experience non-determinism in my Table API Program at the moment and (as 
>>> a relatively unexperienced Flink and Kafka user) I can’t really explain to 
>>> myself why it happens. So, I have a topic with 10 Partitions and a bit of 
>>> Data on it. Now I run a simple SELECT * query on this, that moves some 
>>> attributes around and writes everything on another topic with 10 
>>> partitions. Then, on this topic I run a Event-Time-Windowed Query. Now I 
>>> experience Non-Determinism: The results of the windowed query differ with 
>>> every execution. 
>>> I thought this might be, because the SELECT query wrote the data to the 
>>> partitioned topic without keys. So I tried it again with the same key I 
>>> used for the original topic. It resulted in the exact same topic structure. 
>>> Now when I run the Event-Time-Windowed query, I get incorrect results (too 
>>> few result-entries). 
>>> 
>>> I have already read a lot of the Docs on this and can’t seem to figure it 
>>> out. I would much appreciate, if someone could shed a bit of light on this. 
>>> Is there anything in particular I should be aware of, when reading 
>>> partitioned topics and running an event time query on that? Thanks :)
>>> 
>>> 
>>> Best,
>>> Theo
>> 
> 



smime.p7s
Description: S/MIME cryptographic signature


Re: Non-Determinism in Table-API with Kafka and Event Time

2023-02-12 Thread Theodor Wübker
Hey Yuxia,

thanks for your response. I figured too, that the events arrive in a (somewhat) 
random order and thus cause non-determinism. I used a Watermark like 
this:"timeStampData - INTERVAL '10' SECOND” . Increasing the Watermark Interval 
does not solve the problem though, the results are still not deterministic. 
Instead I keyed the 10 partition topic. Now results are deterministic, but they 
are incorrect (way too few). Am I doing something fundamentally wrong? I just 
need the messages to be in somewhat in order (just so they don’t violate the 
watermark). 

Best,
Theo

(sent again, sorry, I previously only responded to you, not the Mailing list by 
accident)

> On 13. Feb 2023, at 08:14, Theodor Wübker  wrote:
> 
> Hey Yuxia,
> 
> thanks for your response. I figured too, that the events arrive in a 
> (somewhat) random order and thus cause non-determinism. I used a Watermark 
> like this: "timeStampData - INTERVAL '10' SECOND” . Increasing the Watermark 
> Interval does not solve the problem though, the results are still not 
> deterministic. Instead I keyed the 10 partition topic. Now results are 
> deterministic, but they are incorrect (way too few). Am I doing something 
> fundamentally wrong? I just need the messages to be in somewhat in order 
> (just so they don’t violate the watermark). 
> 
> Best,
> Theo
> 
>> On 13. Feb 2023, at 04:23, yuxia > <mailto:luoyu...@alumni.sjtu.edu.cn>> wrote:
>> 
>> HI, Theo.
>> I'm wondering what the Event-Time-Windowed Query you are using looks like.
>> For example, how do you define the watermark?
>> Considering you read records from the 10 partitions, and it may well that 
>> the records will arrive the window process operator out of order. 
>> Is it possible that the records exceed the watermark, but there're still 
>> some records will arrive?
>> 
>> If that's the case, every time, the records used to calculate result may 
>> well different and then result in non-determinism result.
>> 
>> Best regards,
>> Yuxia
>> 
>> - 原始邮件 -
>> 发件人: "Theodor Wübker" > <mailto:theo.wueb...@inside-m2m.de>>
>> 收件人: "User" mailto:user@flink.apache.org>>
>> 发送时间: 星期日, 2023年 2 月 12日 下午 4:25:45
>> 主题: Non-Determinism in Table-API with Kafka and Event Time
>> 
>> Hey everyone,
>> 
>> I experience non-determinism in my Table API Program at the moment and (as a 
>> relatively unexperienced Flink and Kafka user) I can’t really explain to 
>> myself why it happens. So, I have a topic with 10 Partitions and a bit of 
>> Data on it. Now I run a simple SELECT * query on this, that moves some 
>> attributes around and writes everything on another topic with 10 partitions. 
>> Then, on this topic I run a Event-Time-Windowed Query. Now I experience 
>> Non-Determinism: The results of the windowed query differ with every 
>> execution. 
>> I thought this might be, because the SELECT query wrote the data to the 
>> partitioned topic without keys. So I tried it again with the same key I used 
>> for the original topic. It resulted in the exact same topic structure. Now 
>> when I run the Event-Time-Windowed query, I get incorrect results (too few 
>> result-entries). 
>> 
>> I have already read a lot of the Docs on this and can’t seem to figure it 
>> out. I would much appreciate, if someone could shed a bit of light on this. 
>> Is there anything in particular I should be aware of, when reading 
>> partitioned topics and running an event time query on that? Thanks :)
>> 
>> 
>> Best,
>> Theo
> 



smime.p7s
Description: S/MIME cryptographic signature


Non-Determinism in Table-API with Kafka and Event Time

2023-02-12 Thread Theodor Wübker

Hey everyone,

I experience non-determinism in my Table API Program at the moment and (as a 
relatively unexperienced Flink and Kafka user) I can’t really explain to myself 
why it happens. So, I have a topic with 10 Partitions and a bit of Data on it. 
Now I run a simple SELECT * query on this, that moves some attributes around 
and writes everything on another topic with 10 partitions. Then, on this topic 
I run a Event-Time-Windowed Query. Now I experience Non-Determinism: The 
results of the windowed query differ with every execution.
I thought this might be, because the SELECT query wrote the data to the 
partitioned topic without keys. So I tried it again with the same key I used 
for the original topic. It resulted in the exact same topic structure. Now when 
I run the Event-Time-Windowed query, I get incorrect results (too few 
result-entries).

I have already read a lot of the Docs on this and can’t seem to figure it out. 
I would much appreciate, if someone could shed a bit of light on this. Is there 
anything in particular I should be aware of, when reading partitioned topics 
and running an event time query on that? Thanks :)


Best,
Theo

smime.p7s
Description: S/MIME cryptographic signature


Standalone cluster memory configuration

2023-02-02 Thread Theodor Wübker
Hello everyone,

I have a Standalone Custer running in a docker-swarm with a very simple 
docker-compose configuration [3].  When I run my job there with a parallelism 
greater than one, I get an out of memory error. Nothing out of the ordinary, so 
I wanted to increase the JVM heap. I did that by setting 
‘taskmanager.memory.task.heap.size’ according to [1]. However the taskmanager 
would not start, throwing an Exception saying that this configuration clashes 
with the configured total process memory - even though I had not configured 
that at all. Due to this, I could also not set the total Flink memory. 
Now I wonder, why did the TM tell me that the total process memory is already 
configured? Also, in [2]  I read that the cluster should not even start when 
neither total Flink memory nor total process memory are specified - which, as 
you can see in my configuration, I have not done [3]. 

Maybe someone can enlighten me, why it looks like I can’t configure the memory 
properly? Thanks :)

-Theo

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/#configure-heap-and-managed-memory
 


[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup/#configure-total-memory
 


[3] The compose configuration: 

  jobmanager:
image: flink:1.16.0
command: jobmanager
environment:
  - TASK_MANAGER_NUMBER_OF_TASK_SLOTS=2
  - |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager


  taskmanager-01:
image: flink:1.16.0
depends_on:
  - jobmanager
command: taskmanager
environment:
  - |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2

smime.p7s
Description: S/MIME cryptographic signature


Cluster uploading and running a jar itself

2023-01-16 Thread Theodor Wübker
Hello,

I noticed my Flink Cluster (version 1.16) is uploading a jar called 
“check-execute.jar” itself regularly. Apparently it also tries to run it, at 
least that’s what I take from this log of my jobmanager that appears numerous 
times:

2023-01-16 07:09:00,719 WARN  
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - Configuring 
the job submission via query parameters is deprecated. Please migrate to 
submitting a JSON request instead.
2023-01-16 07:09:00,733 INFO  org.apache.flink.client.ClientUtils   
   [] - Starting program (detached: true)
2023-01-16 07:09:01,362 ERROR 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - Exception 
occurred in REST handler: No jobs included in application.

I wonder if this is expected behavior, maybe for the system to assert its 
functionality. It would be very kind if someone could enlighten me here, I 
tried to find what this is about in the source code of Flink, but was not able 
to find anything.

Kind regards

-Theo


smime.p7s
Description: S/MIME cryptographic signature


Re: Windowing query with group by produces update stream

2022-12-20 Thread Theodor Wübker
I actually managed to fixed this already :) For those wondering, I grouped by 
both window start and end first. That did it!

> On 19. Dec 2022, at 15:43, Theodor Wübker  wrote:
> 
> Hey everyone,
> 
> I would like to run a Windowing-SQL query with a group-by clause on a Kafka 
> topic and write the result back to Kafka. Right now, the program always says 
> that I am creating an update-stream that can only be written to an 
> Upsert-Kafka-Sink. That seems odd to me, because running my grouping over a 
> tumbling window should only require writing the result to kafka exactly once. 
> Quote from docs [1]: 'Unlike other aggregations on continuous tables, window 
> aggregation do not emit intermediate results but only a final result, the 
> total aggregation at the end of the window'
> I understand that ‘group-by’ should generate an update-stream as long as 
> there is no windowing happening - but there is in my case. How can I get my 
> program to not create an update-, but a simple append stream instead? My 
> query looks roughly like this:
> 
> "SELECT x, window_start, count(*) as y 
> FROM TABLE(TUMBLE(TABLE my_table, DESCRIPTOR(timestamp), INTERVAL '1' DAY))
> GROUP BY x, window_start”
> 
> -Theo
> 
> 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/
>  
> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/>


smime.p7s
Description: S/MIME cryptographic signature


Windowing query with group by produces update stream

2022-12-19 Thread Theodor Wübker
Hey everyone,

I would like to run a Windowing-SQL query with a group-by clause on a Kafka 
topic and write the result back to Kafka. Right now, the program always says 
that I am creating an update-stream that can only be written to an 
Upsert-Kafka-Sink. That seems odd to me, because running my grouping over a 
tumbling window should only require writing the result to kafka exactly once. 
Quote from docs [1]: 'Unlike other aggregations on continuous tables, window 
aggregation do not emit intermediate results but only a final result, the total 
aggregation at the end of the window'
I understand that ‘group-by’ should generate an update-stream as long as there 
is no windowing happening - but there is in my case. How can I get my program 
to not create an update-, but a simple append stream instead? My query looks 
roughly like this:

"SELECT x, window_start, count(*) as y 
FROM TABLE(TUMBLE(TABLE my_table, DESCRIPTOR(timestamp), INTERVAL '1' DAY))
GROUP BY x, window_start”

-Theo


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/

smime.p7s
Description: S/MIME cryptographic signature


Re: Can't use nested attributes as watermarks in Table

2022-12-17 Thread Theodor Wübker
Hi Martijn, 

thanks for your reply :) I thought of looking into contributing this too. I am 
not super deep into the source code of Flink yet and it might be a little out 
of scope for the bachelor thesis I am doing, so I have to see if I can find the 
time at the moment. But generally, I would be interested (since obviously it 
would help my thesis). Maybe you can tell how much effort it would be? I 
imagine it would need support in the place where the watermarks are registered 
(the one I sent) and in the place they are actually used (which I have not 
checked yet at all).

-Theo

> On 14. Dec 2022, at 09:23, Theodor Wübker  wrote:
> 
> Actually, this behaviour is documented 
> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-table>
>  (See the Watermarks section, where it is stated that the column must be a 
> “top-level” column). So I suppose, there is a reason. Nevertheless it is 
> quite a limiting factor, since it makes me unable to use Window queries with 
> the desired timestamp as watermark … I suppose one workaround could be to 
> transform the table so the attribute is at top level and then use it as a 
> watermark. But in my case this would be quite the effort. 
> 
> My question remains, what is the reason for this behaviour? Also, are there 
> any good workarounds for this?
> 
> Thanks,
> 
> -Theo
> 
>> On 14. Dec 2022, at 08:13, Theodor Wübker > <mailto:theo.wueb...@inside-m2m.de>> wrote:
>> 
>> Hey everyone,
>> 
>> I have encountered a problem with my Table API Program. I am trying to use a 
>> nested attribute as a watermark. The structure of my schema is a row, which 
>> itself has 3 rows as attributes and they again have some attributes, 
>> especially the Timestamp that I want to use as a watermark. Flink does not 
>> let me reference it using the dot operator, sadly. I checked the sourcecode 
>> and tracked it down to this (part of a) method in the DefaultSchemaResolver:
>> 
>> private Column validateTimeColumn(String columnName, List columns) {
>> final Optional timeColumn =
>> columns.stream().filter(c -> 
>> c.getName().equals(columnName)).findFirst();
>> if (!timeColumn.isPresent()) {
>> throw new ValidationException(
>> String.format(
>> "Invalid column name '%s' for rowtime attribute in 
>> watermark declaration. Available columns are: %s",
>> columnName,
>> 
>> columns.stream().map(Column::getName).collect(Collectors.toList(;
>> } ...
>> The list of available columns is just the 3 rows and none of the nested 
>> attributes. Is there a reason for nested columns being unavailable for 
>> watermark declaration? Or am I overseeing something / doing something wrong?
>> 
>> -Theo
> 



smime.p7s
Description: S/MIME cryptographic signature


Re: Can't use nested attributes as watermarks in Table

2022-12-14 Thread Theodor Wübker
Actually, this behaviour is documented 
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-table>
 (See the Watermarks section, where it is stated that the column must be a 
“top-level” column). So I suppose, there is a reason. Nevertheless it is quite 
a limiting factor, since it makes me unable to use Window queries with the 
desired timestamp as watermark … I suppose one workaround could be to transform 
the table so the attribute is at top level and then use it as a watermark. But 
in my case this would be quite the effort. 

My question remains, what is the reason for this behaviour? Also, are there any 
good workarounds for this?

Thanks,

-Theo

> On 14. Dec 2022, at 08:13, Theodor Wübker  wrote:
> 
> Hey everyone,
> 
> I have encountered a problem with my Table API Program. I am trying to use a 
> nested attribute as a watermark. The structure of my schema is a row, which 
> itself has 3 rows as attributes and they again have some attributes, 
> especially the Timestamp that I want to use as a watermark. Flink does not 
> let me reference it using the dot operator, sadly. I checked the sourcecode 
> and tracked it down to this (part of a) method in the DefaultSchemaResolver:
> 
> private Column validateTimeColumn(String columnName, List columns) {
> final Optional timeColumn =
> columns.stream().filter(c -> 
> c.getName().equals(columnName)).findFirst();
> if (!timeColumn.isPresent()) {
> throw new ValidationException(
> String.format(
> "Invalid column name '%s' for rowtime attribute in 
> watermark declaration. Available columns are: %s",
> columnName,
> 
> columns.stream().map(Column::getName).collect(Collectors.toList(;
> } ...
> The list of available columns is just the 3 rows and none of the nested 
> attributes. Is there a reason for nested columns being unavailable for 
> watermark declaration? Or am I overseeing something / doing something wrong?
> 
> -Theo



smime.p7s
Description: S/MIME cryptographic signature


Can't use nested attributes as watermarks in Table

2022-12-13 Thread Theodor Wübker
Hey everyone,

I have encountered a problem with my Table API Program. I am trying to use a 
nested attribute as a watermark. The structure of my schema is a row, which 
itself has 3 rows as attributes and they again have some attributes, especially 
the Timestamp that I want to use as a watermark. Flink does not let me 
reference it using the dot operator, sadly. I checked the sourcecode and 
tracked it down to this (part of a) method in the DefaultSchemaResolver:

private Column validateTimeColumn(String columnName, List columns) {
final Optional timeColumn =
columns.stream().filter(c -> 
c.getName().equals(columnName)).findFirst();
if (!timeColumn.isPresent()) {
throw new ValidationException(
String.format(
"Invalid column name '%s' for rowtime attribute in 
watermark declaration. Available columns are: %s",
columnName,

columns.stream().map(Column::getName).collect(Collectors.toList(;
} ...
The list of available columns is just the 3 rows and none of the nested 
attributes. Is there a reason for nested columns being unavailable for 
watermark declaration? Or am I overseeing something / doing something wrong?

-Theo

smime.p7s
Description: S/MIME cryptographic signature


Re: Table API: Custom Kafka formats for Confluent Protobuf and JSON

2022-11-29 Thread Theodor Wübker
Hey Yaroslav!,

I suppose I will try it like this. The lookup would indeed be nice too, I will 
have a closer look at the corresponding source code. Thanks!

-Theo
> On 29. Nov 2022, at 17:41, Yaroslav Tkachenko  wrote:
> 
> Hey Theodor,
> 
> That's pretty much it, assuming your Protobuf schema is more or less fixed. 
> But for a production workload, you'd need to add a Schema Registry lookup. I 
> guess the implementation for that would be similar to what's in the Avro 
> format.
> 
> On Tue, Nov 29, 2022 at 2:26 AM Theodor Wübker  <mailto:theo.wueb...@inside-m2m.de>> wrote:
> Hey all,
> 
> so Confluent has Kafka serializers to serialize Protobuf, Avro and JSON that 
> create messages with a magic byte followed by a 4 byte schema id followed by 
> the actual payload (refer the docs 
> <https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format>).
>  When I try to read such messages with the regular Protobuf, Avro and JSON 
> formats in my Table API Program, it of course does not work. For Avro, Flink 
> also has a Confluent-Avro format that can deal with this. However for 
> Protobuf and JSON, there is nothing like this yet. I saw a ticket in the JIRA 
> <https://issues.apache.org/jira/browse/FLINK-29731?filter=-4=project%20=%20FLINK%20AND%20issuetype%20=%20%22New%20Feature%22%20AND%20text%20~%20%22protobuf%22%20order%20by%20created%20DESC>,
>  but I cannot wait for this. Hence I wonder, how much effort it would be, to 
> implement this myself - not in a production-ready way, but just in a way that 
> makes my program not break. Meaning I would be happy with a solution that 
> just ignores the first 5 bytes and passes the rest on to the existing 
> handlers of Protobuf and JSON formats.
> 
> 
> Now lets take for Example the existing Protobuf Format: I assume I have to 
> implement the DeserializationFormatFactory, create a few Decoding and 
> Encoding Formats, just like the PbDecodingFormat for example, then a new 
> DeserializationSchema and there I would have such a method 
> <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/formats/protobuf/deserialize/PbRowDataDeserializationSchema.html>:
> 
> @Override
> public RowData deserialize(byte[] message) throws IOException {
> try {
> return protoToRowConverter.convertProtoBinaryToRow(message);
> } catch (Throwable t) {
> if (formatConfig.isIgnoreParseErrors()) {
> return null;
> }
> throw new IOException("Failed to deserialize PB object.", t);
> }
> }
> But instead of converting the message immediately, I would slice the first 
> few Bytes off and go from there. Is this pretty much it, or is there more to 
> it?
> 
> -Theo
> 



smime.p7s
Description: S/MIME cryptographic signature


Table API: Custom Kafka formats for Confluent Protobuf and JSON

2022-11-29 Thread Theodor Wübker
Hey all,

so Confluent has Kafka serializers to serialize Protobuf, Avro and JSON that 
create messages with a magic byte followed by a 4 byte schema id followed by 
the actual payload (refer the docs 
).
 When I try to read such messages with the regular Protobuf, Avro and JSON 
formats in my Table API Program, it of course does not work. For Avro, Flink 
also has a Confluent-Avro format that can deal with this. However for Protobuf 
and JSON, there is nothing like this yet. I saw a ticket in the JIRA 
,
 but I cannot wait for this. Hence I wonder, how much effort it would be, to 
implement this myself - not in a production-ready way, but just in a way that 
makes my program not break. Meaning I would be happy with a solution that just 
ignores the first 5 bytes and passes the rest on to the existing handlers of 
Protobuf and JSON formats.


Now lets take for Example the existing Protobuf Format: I assume I have to 
implement the DeserializationFormatFactory, create a few Decoding and Encoding 
Formats, just like the PbDecodingFormat for example, then a new 
DeserializationSchema and there I would have such a method 
:

@Override
public RowData deserialize(byte[] message) throws IOException {
try {
return protoToRowConverter.convertProtoBinaryToRow(message);
} catch (Throwable t) {
if (formatConfig.isIgnoreParseErrors()) {
return null;
}
throw new IOException("Failed to deserialize PB object.", t);
}
}
But instead of converting the message immediately, I would slice the first few 
Bytes off and go from there. Is this pretty much it, or is there more to it?

-Theo



smime.p7s
Description: S/MIME cryptographic signature


Re: Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-15 Thread Theodor Wübker
Yes, you are right. Schemas are not so nice in Json. When implementing and 
testing my converter from DataType to JsonSchema I noticed that your converter 
from JsonSchema to DataType converts number to double always. I wonder, did you 
make this up? Because the table that specifies the mapping 
<https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/json/>
 only does it for DataType -> JsonSchema. 

Its generally unfortunate that json schema only offers so little possibility to 
specify type information… now when I have a Flink DataType with all kinds of 
fields like double, float, big decimal… they all get mapped to number by my 
converter - in return when I use yours they are all mapped to a Flink Datatype 
double again. So I lose a lot of precision.

I guess for my application it would in general be better to use Avro or 
Protobuf, since they retain a lot more type information when you convert them 
back and forth…
Also thanks for showing me your pattern with the SchemaConversions and stuff. 
Feels pretty clean and worked like a charm :)

-Theo


> On 10. Nov 2022, at 15:02, Andrew Otto  wrote:
> 
> >  I find it interesting that the Mapping from DataType to AvroSchema does 
> > exist in Flink (see AvroSchemaConverter), but for all the other formats 
> > there is no such Mapping, 
> Yah, but I guess for JSON, there isn't a clear 'schema' to be had.  There of 
> course is JSONSchema, but it isn't a real java-y type system; it's just more 
> JSON for which there exist validators.  
> 
> 
> 
> On Thu, Nov 10, 2022 at 2:12 AM Theodor Wübker  <mailto:theo.wueb...@inside-m2m.de>> wrote:
> Great, I will have a closer look at what you sent. Your idea seems very good, 
> it would be a very clean solution to be able to plug in different 
> SchemaConversions that a (Row) DataType can be mapped to. I will probably try 
> to implement it like this. I find it interesting that the Mapping from 
> DataType to AvroSchema does exist in Flink (see AvroSchemaConverter), but for 
> all the other formats there is no such Mapping. Maybe this would be something 
> that would interest more people, so I when I am finished perhaps I can 
> suggest putting the solution into the flink-json and flink-protobuf packages.
> 
> -Theo
> 
>> On 9. Nov 2022, at 21:24, Andrew Otto > <mailto:o...@wikimedia.org>> wrote:
>> 
>> Interesting, yeah I think you'll have to implement code to recurse through 
>> the (Row) DataType and somehow auto generate the JSONSchema you want.  
>> 
>> We abstracted the conversions from JSONSchema to other type systems in this 
>> JsonSchemaConverter 
>> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities/src/main/java/org/wikimedia/eventutilities/core/event/types/JsonSchemaConverter.java>.
>>   There's nothing special going on here, I've seen versions of this schema 
>> conversion code over and over again in different frameworks. This one just 
>> allows us to plug in a SchemaConversions 
>> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities/src/main/java/org/wikimedia/eventutilities/core/event/types/SchemaConversions.java>
>>  implementation to provide the mappings to the output type system (like the 
>> Flink DataType mappings 
>> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java>
>>  I linked to before), rather than hardcoding the output types.
>> 
>> If I were trying to do what you are doing (in our codebase)...I'd create a 
>> Flink DataTypeConverter that iterated through a (Row) DataType and a 
>> SchemaConversions implementation that mapped to the JsonNode that 
>> represented the JSONSchema.  (If not using Jackson...then you could use 
>> another Java JSON object than JsonNode).
>> You could also make a SchemaConversions (with whatever 
>> Protobuf class to use...I'm not familiar with Protobuf) and then use the 
>> same DataTypeConverter to convert to ProtobufSchema.   AND THEN...I'd wonder 
>> if the input schema recursion code itself could be abstracted too so that it 
>> would work for either JsonSchema OR DataType OR whatever but anyway that is 
>> probably too crazy and too much for what you are doing...but it would be 
>> cool! :p
>> 
>> 
>> 
>> 
>> 
>> On Wed, Nov 9, 2022 at 9:52 AM Theodor Wübker > <mailto:theo.wueb...@inside-m2m.de>> wrote:
>> I want to register the result-schema in a schema registry, as I am pushing 
>> the result-data to

Class loading in PbFormatUtils.getDescriptor

2022-11-14 Thread Theodor Wübker
Hey everyone,

there is some use of reflection in PbFormatUtils.getDescriptor: Namely they get 
the Threads ClassLoader to load a protobuf-generated class. First of all, this 
appears to be a bad practice in general (refer first answer in this 
stackoverflow 
).
 Secondly, this comes with a Drawback for me: If I generate the Java classes 
from Protobuf at runtime, they are not on the class path and not by default 
available for the Threads ClassLoader. This can be overcome using a hack like 
this, that sets the ClassLoader for the Current Thread to an URLClassLoader 
(which can be created at runtime and is then able to find the generated 
classes):
URLClassLoader newLoader = URLClassLoader.newInstance(new URL[] 
{urlToProtoClasses}, this.getClass().getClassLoader())
Thread.currentThread().setContextClassLoader(newLoader);
However this is not a great practice either. It will probably not even work, 
once there are multiple different Threads invoking PbFormatUtils.getDescriptor. 
Maybe there should be a way to set the ClassLoader for PbFormatUtils? 
If anyone knows a different solution to add the generated classes to the 
classpath please let me know!

-Theo

smime.p7s
Description: S/MIME cryptographic signature


Re: Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-09 Thread Theodor Wübker
I want to register the result-schema in a schema registry, as I am pushing the 
result-data to a Kafka topic. The result-schema is not known at compile-time, 
so I need to find a way to compute it at runtime from the resulting Flink 
Schema.

-Theo

(resent - again sorry, I forgot to add the others in the cc)

> On 9. Nov 2022, at 14:59, Andrew Otto  wrote:
> 
> >  I want to convert the schema of a Flink table to both Protobuf schema and 
> > JSON schema
> Oh, you want to convert from Flink Schema TO JSONSchema?  Interesting.  That 
> would indeed be something that is not usually done.  Just curious, why do you 
> want to do this?
> 
> On Wed, Nov 9, 2022 at 8:46 AM Andrew Otto  <mailto:o...@wikimedia.org>> wrote:
> Hello! 
> 
> I see you are talking about JSONSchema, not just JSON itself.
> 
> We're trying to do a similar thing at Wikimedia and have developed some 
> tooling around this.  
> 
> JsonSchemaFlinkConverter 
> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/JsonSchemaFlinkConverter.java>
>  has some logic to convert from JSONSchema Jackson ObjectNodes to Flink Table 
> DataType or Table SchemaBuilder, or Flink DataStream TypeInformation[Row].  
> Some of the conversions from JSONSchema to Flink type are opinionated.  You 
> can see the mappings here 
> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java>.
> 
> 
> 
> 
> 
> 
> 
> On Wed, Nov 9, 2022 at 2:33 AM Theodor Wübker  <mailto:theo.wueb...@inside-m2m.de>> wrote:
> Thanks for your reply Yaroslav! The way I do it with Avro seems similar to 
> what you pointed out:
> ResolvedSchema resultSchema = resultTable.getResolvedSchema();
> DataType type = resultSchema.toSinkRowDataType();
> org.apache.avro.Schema converted = 
> AvroSchemaConverter.convertToSchema(type.getLogicalType());
> I mentioned the ResolvedSchema because it is my starting point after the SQL 
> operation. It seemed to me that I can not retrieve something that contains 
> more schema information from the table so I got myself this. About your other 
> answers: It seems the classes you mentioned can be used to serialize actual 
> Data? However this is not quite what I want to do.
> Essentially I want to convert the schema of a Flink table to both Protobuf 
> schema and JSON schema (for Avro as you can see I have it already). It seems 
> odd that this is not easily possible, because converting from a JSON schema 
> to a Schema of Flink is possible using the JsonRowSchemaConverter. However 
> the other way is not implemented it seems. This is how I got a Table Schema 
> (that I can use in a table descriptor) from a JSON schema:
> 
> TypeInformation type = JsonRowSchemaConverter.convert(json);
> DataType row = TableSchema.fromTypeInfo(type).toPhysicalRowDataType();
> Schema schema = Schema.newBuilder().fromRowDataType(row).build();
> Sidenote: I use deprecated methods here, so if there is a better approach 
> please let me know! But it shows that in Flink its easily possible to create 
> a Schema for a TableDescriptor from a JSON Schema - the other way is just not 
> so trivial it seems. And for Protobuf so far I don’t have any solutions, not 
> even creating a Flink Schema from a Protobuf Schema - not to mention the 
> other way around.
> 
> -Theo
> 
> (resent because I accidentally only responded to you, not the Mailing list - 
> sorry)
> 



smime.p7s
Description: S/MIME cryptographic signature


Re: Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-09 Thread Theodor Wübker
Hey,

thank you for your reply. Your converter looks very interesting. However, Flink 
comes with the JsonRowSchemaConverter that converts a JSONSchema-String to a 
TypeInformation already. From there you can convert the TypeInformation to, 
say, a DataType (Although I must admit I only got this done using deprecated 
methods in Flink). I am struggling to get the reverse way done - converting 
from a Flink ResolvedSchema (or LogicalType, or DataType) to a JSONSchema. Is 
that something you want to implement in your converter as well?

Your project is encouraging me though, maybe I will try to implement DataType 
to JSONSchema and ProtobufSchema to DataType (and the reverse) myself, given I 
do not find anything that does the trick.

-Theo

> On 9. Nov 2022, at 14:46, Andrew Otto  wrote:
> 
> Hello! 
> 
> I see you are talking about JSONSchema, not just JSON itself.
> 
> We're trying to do a similar thing at Wikimedia and have developed some 
> tooling around this.  
> 
> JsonSchemaFlinkConverter 
> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/JsonSchemaFlinkConverter.java>
>  has some logic to convert from JSONSchema Jackson ObjectNodes to Flink Table 
> DataType or Table SchemaBuilder, or Flink DataStream TypeInformation[Row].  
> Some of the conversions from JSONSchema to Flink type are opinionated.  You 
> can see the mappings here 
> <https://gerrit.wikimedia.org/r/plugins/gitiles/wikimedia-event-utilities/+/refs/heads/master/eventutilities-flink/src/main/java/org/wikimedia/eventutilities/flink/formats/json/DataTypeSchemaConversions.java>.
> 
> 
> 
> 
> 
> 
> 
> On Wed, Nov 9, 2022 at 2:33 AM Theodor Wübker  <mailto:theo.wueb...@inside-m2m.de>> wrote:
> Thanks for your reply Yaroslav! The way I do it with Avro seems similar to 
> what you pointed out:
> ResolvedSchema resultSchema = resultTable.getResolvedSchema();
> DataType type = resultSchema.toSinkRowDataType();
> org.apache.avro.Schema converted = 
> AvroSchemaConverter.convertToSchema(type.getLogicalType());
> I mentioned the ResolvedSchema because it is my starting point after the SQL 
> operation. It seemed to me that I can not retrieve something that contains 
> more schema information from the table so I got myself this. About your other 
> answers: It seems the classes you mentioned can be used to serialize actual 
> Data? However this is not quite what I want to do.
> Essentially I want to convert the schema of a Flink table to both Protobuf 
> schema and JSON schema (for Avro as you can see I have it already). It seems 
> odd that this is not easily possible, because converting from a JSON schema 
> to a Schema of Flink is possible using the JsonRowSchemaConverter. However 
> the other way is not implemented it seems. This is how I got a Table Schema 
> (that I can use in a table descriptor) from a JSON schema:
> 
> TypeInformation type = JsonRowSchemaConverter.convert(json);
> DataType row = TableSchema.fromTypeInfo(type).toPhysicalRowDataType();
> Schema schema = Schema.newBuilder().fromRowDataType(row).build();
> Sidenote: I use deprecated methods here, so if there is a better approach 
> please let me know! But it shows that in Flink its easily possible to create 
> a Schema for a TableDescriptor from a JSON Schema - the other way is just not 
> so trivial it seems. And for Protobuf so far I don’t have any solutions, not 
> even creating a Flink Schema from a Protobuf Schema - not to mention the 
> other way around.
> 
> -Theo
> 
> (resent because I accidentally only responded to you, not the Mailing list - 
> sorry)
> 



smime.p7s
Description: S/MIME cryptographic signature


Re: Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-08 Thread Theodor Wübker
Thanks for your reply Yaroslav! The way I do it with Avro seems similar to what 
you pointed out:
ResolvedSchema resultSchema = resultTable.getResolvedSchema();
DataType type = resultSchema.toSinkRowDataType();
org.apache.avro.Schema converted = 
AvroSchemaConverter.convertToSchema(type.getLogicalType());
I mentioned the ResolvedSchema because it is my starting point after the SQL 
operation. It seemed to me that I can not retrieve something that contains more 
schema information from the table so I got myself this. About your other 
answers: It seems the classes you mentioned can be used to serialize actual 
Data? However this is not quite what I want to do.
Essentially I want to convert the schema of a Flink table to both Protobuf 
schema and JSON schema (for Avro as you can see I have it already). It seems 
odd that this is not easily possible, because converting from a JSON schema to 
a Schema of Flink is possible using the JsonRowSchemaConverter. However the 
other way is not implemented it seems. This is how I got a Table Schema (that I 
can use in a table descriptor) from a JSON schema:

TypeInformation type = JsonRowSchemaConverter.convert(json);
DataType row = TableSchema.fromTypeInfo(type).toPhysicalRowDataType();
Schema schema = Schema.newBuilder().fromRowDataType(row).build();
Sidenote: I use deprecated methods here, so if there is a better approach 
please let me know! But it shows that in Flink its easily possible to create a 
Schema for a TableDescriptor from a JSON Schema - the other way is just not so 
trivial it seems. And for Protobuf so far I don’t have any solutions, not even 
creating a Flink Schema from a Protobuf Schema - not to mention the other way 
around.

-Theo

(resent because I accidentally only responded to you, not the Mailing list - 
sorry)



smime.p7s
Description: S/MIME cryptographic signature


Converting ResolvedSchema to JSON and Protobuf Schemas

2022-11-08 Thread Theodor Wübker

Hello,

I have a streaming use case, where I execute a query on a Table. I take the 
ResolvedSchema of the table and convert it to an Avro-Schema using the 
AvroSchemaConverter. Now I want to do the same for JSON and Protobuf. However, 
it seems there is nothing similar to AvroSchemaConverter - I wonder if I have 
to code the Mapping of Flinks DataType to JSON and Protobuf myself now, or if I 
missed something. I would be glad if someone could point me in the right 
direction here.

Yours sincerely,
Theo

smime.p7s
Description: S/MIME cryptographic signature