[jira] [Commented] (FLINK-34436) Avro schema evolution and compatibility issues in Pulsar connector

2024-02-29 Thread Jacek Wislicki (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822065#comment-17822065
 ] 

Jacek Wislicki commented on FLINK-34436:


Please let me know if you have any thought on this. Of course, a good 
explanation why my understanding of the behaviour is wrong will also be 
valuable.

> Avro schema evolution and compatibility issues in Pulsar connector
> --
>
> Key: FLINK-34436
> URL: https://issues.apache.org/jira/browse/FLINK-34436
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.2
>Reporter: Jacek Wislicki
>Priority: Major
>
> We noticed a couple of critical issues in the Pulsar-Flink connector related 
> to schema evolution and compatibility. Please see the MRE available at 
> https://github.com/JacekWislicki/test11. More details are in the project's 
> README file, here is the summary:
> Library versions:
> * Pulsar 3.0.1
> * Flink 1.17.2
> * Pulsar-Flink connector 4.1.0-1.17
> Problems:
> * Exception thrown when schema's fields are added/removed
> * Avro's enum default value is ignored, instead the last known applied
> I believe that I observed the same behaviour in the Pulsar itself, still now 
> we are focusing on the connector, hence I was able to document the problems 
> when using it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34436) Avro schema evolution and compatibility issues in Pulsar connector

2024-02-19 Thread Jacek Wislicki (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818371#comment-17818371
 ] 

Jacek Wislicki commented on FLINK-34436:


A note: in the MRE there is ALWAYS_COMPATIBLE used, although I tested it also 
with BACKWARD, BACKWARD_TRANSITIVE, FORWARD, FORWARD_TRANSITIVE, FULL and 
FULL_TRANSITIVE, with the same result.

> Avro schema evolution and compatibility issues in Pulsar connector
> --
>
> Key: FLINK-34436
> URL: https://issues.apache.org/jira/browse/FLINK-34436
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.2
>Reporter: Jacek Wislicki
>Priority: Major
>
> We noticed a couple of critical issues in the Pulsar-Flink connector related 
> to schema evolution and compatibility. Please see the MRE available at 
> https://github.com/JacekWislicki/test11. More details are in the project's 
> README file, here is the summary:
> Library versions:
> * Pulsar 3.0.1
> * Flink 1.17.2
> * Pulsar-Flink connector 4.1.0-1.17
> Problems:
> * Exception thrown when schema's fields are added/removed
> * Avro's enum default value is ignored, instead the last known applied
> I believe that I observed the same behaviour in the Pulsar itself, still now 
> we are focusing on the connector, hence I was able to document the problems 
> when using it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34436) Avro schema evolution and compatibility issues in Pulsar connector

2024-02-15 Thread Jacek Wislicki (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817629#comment-17817629
 ] 

Jacek Wislicki commented on FLINK-34436:


The problem seems to be in 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarSchemaWrapper.deserialize(Message,
 Collector):
{code:java}
@Override
public void deserialize(Message message, Collector out) throws 
Exception {
Schema schema = this.pulsarSchema.getPulsarSchema();
byte[] bytes = message.getData();
T instance = schema.decode(bytes);

out.collect(instance);
}
{code}
this.pulsarSchema.getPulsarSchema() returns the expected schema, not the schema 
in which the message was actually encoded. For proper conversion, both source 
(encoding) and target (decoding) schemas are needed.

> Avro schema evolution and compatibility issues in Pulsar connector
> --
>
> Key: FLINK-34436
> URL: https://issues.apache.org/jira/browse/FLINK-34436
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.2
>Reporter: Jacek Wislicki
>Priority: Major
>
> We noticed a couple of critical issues in the Pulsar-Flink connector related 
> to schema evolution and compatibility. Please see the MRE available at 
> https://github.com/JacekWislicki/test11. More details are in the project's 
> README file, here is the summary:
> Library versions:
> * Pulsar 3.0.1
> * Flink 1.17.2
> * Pulsar-Flink connector 4.1.0-1.17
> Problems:
> * Exception thrown when schema's fields are added/removed
> * Avro's enum default value is ignored, instead the last known applied
> I believe that I observed the same behaviour in the Pulsar itself, still now 
> we are focusing on the connector, hence I was able to document the problems 
> when using it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-34436) Avro schema evolution and compatibility issues in Pulsar connector

2024-02-14 Thread Jacek Wislicki (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817467#comment-17817467
 ] 

Jacek Wislicki commented on FLINK-34436:


Added also a simpler example (without enumerations that seem to be the key 
problem): https://github.com/JacekWislicki/test12

> Avro schema evolution and compatibility issues in Pulsar connector
> --
>
> Key: FLINK-34436
> URL: https://issues.apache.org/jira/browse/FLINK-34436
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.2
>Reporter: Jacek Wislicki
>Priority: Major
>
> We noticed a couple of critical issues in the Pulsar-Flink connector related 
> to schema evolution and compatibility. Please see the MRE available at 
> https://github.com/JacekWislicki/test11. More details are in the project's 
> README file, here is the summary:
> Library versions:
> * Pulsar 3.0.1
> * Flink 1.17.2
> * Pulsar-Flink connector 4.1.0-1.17
> Problems:
> * Exception thrown when schema's fields are added/removed
> * Avro's enum default value is ignored, instead the last known applied
> I believe that I observed the same behaviour in the Pulsar itself, still now 
> we are focusing on the connector, hence I was able to document the problems 
> when using it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-34436) Avro schema evolution and compatibility issues in Pulsar connector

2024-02-13 Thread Jacek Wislicki (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-34436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jacek Wislicki updated FLINK-34436:
---
Summary: Avro schema evolution and compatibility issues in Pulsar connector 
 (was: Avro schema evolution and compatibility issues)

> Avro schema evolution and compatibility issues in Pulsar connector
> --
>
> Key: FLINK-34436
> URL: https://issues.apache.org/jira/browse/FLINK-34436
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.17.2
>Reporter: Jacek Wislicki
>Priority: Major
>
> We noticed a couple of critical issues in the Pulsar-Flink connector related 
> to schema evolution and compatibility. Please see the MRE available at 
> https://github.com/JacekWislicki/test11. More details are in the project's 
> README file, here is the summary:
> Library versions:
> * Pulsar 3.0.1
> * Flink 1.17.2
> * Pulsar-Flink connector 4.1.0-1.17
> Problems:
> * Exception thrown when schema's fields are added/removed
> * Avro's enum default value is ignored, instead the last known applied
> I believe that I observed the same behaviour in the Pulsar itself, still now 
> we are focusing on the connector, hence I was able to document the problems 
> when using it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-34436) Avro schema evolution and compatibility issues

2024-02-13 Thread Jacek Wislicki (Jira)
Jacek Wislicki created FLINK-34436:
--

 Summary: Avro schema evolution and compatibility issues
 Key: FLINK-34436
 URL: https://issues.apache.org/jira/browse/FLINK-34436
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: 1.17.2
Reporter: Jacek Wislicki


We noticed a couple of critical issues in the Pulsar-Flink connector related to 
schema evolution and compatibility. Please see the MRE available at 
https://github.com/JacekWislicki/test11. More details are in the project's 
README file, here is the summary:

Library versions:
* Pulsar 3.0.1
* Flink 1.17.2
* Pulsar-Flink connector 4.1.0-1.17

Problems:
* Exception thrown when schema's fields are added/removed
* Avro's enum default value is ignored, instead the last known applied

I believe that I observed the same behaviour in the Pulsar itself, still now we 
are focusing on the connector, hence I was able to document the problems when 
using it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-30681) Pulsar-Flink connector corrupts its output topic

2023-01-13 Thread Jacek Wislicki (Jira)
Jacek Wislicki created FLINK-30681:
--

 Summary: Pulsar-Flink connector corrupts its output topic
 Key: FLINK-30681
 URL: https://issues.apache.org/jira/browse/FLINK-30681
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: 1.15.3
Reporter: Jacek Wislicki


When PulsarSink writes a message to its output topic, the topic gets 
permanently corrupted and cannot be used anymore (even with newly created 
subscriptions).

We have isolated this behaviour to a minimal project demonstrating the problem 
available on [GitHub|https://github.com/JacekWislicki/vp-test5]:
# There are 2 topics: IN and OUT
# IN is subscribed by a Flink's InToOutJob (with PulsarSource) and writes to 
OUT (with PulsarSink)
# OUT is subscribed by a Pulsar's OutReadFunction
# When we write directly to OUT (e.g., with OutTopicProducer), OutReadFunction 
gets each message from its backlog and processes it with no issue (the ledger 
position updates)
# When we write to IN (e.g., with InTopicProducer), InToOutJob reads the 
message, processes it and writes to OUT
# OutReadFunction reads the message, the ledger position updates, but nothing 
happens
## Further messages written to OUT are not read as OUT is blocked on the last 
message from Flink
## Truncating OUT does not help, neither does unsubscribing or creating a new 
subscription

Reproduced with Pulsar 2.9.1, 2.9.2 and 2.10.2.

The issue does not occur when we use our custom temporary old SinkFunction 
implementation based on a Pulsar producer writing to OUT.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28609) Flink-Pulsar connector fails on larger schemas

2022-07-25 Thread Jacek Wislicki (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17570783#comment-17570783
 ] 

Jacek Wislicki commented on FLINK-28609:


It's perfect, thank you, [~martijnvisser]!

> Flink-Pulsar connector fails on larger schemas
> --
>
> Key: FLINK-28609
> URL: https://issues.apache.org/jira/browse/FLINK-28609
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.14.3, 1.14.4, 1.14.5, 1.15.1
>Reporter: Jacek Wislicki
>Priority: Major
>  Labels: pull-request-available
> Attachments: 
> [FLINK-28609][Connector_Pulsar]_PulsarSchema_didn't_get_properly_serialized_.patch,
>  exception.txt
>
>
> When a model results in a larger schema (this seems to be related to its byte 
> array representation), the number of expected bytes to read is different than 
> the number of actually read bytes: [^exception.txt]. The "read" is such a 
> case is always 1018 while the expected "byteLen" gives a greater value. For 
> smaller schemata, the numbers are equal (less than 1018) and no issue occurs.
> The problem reproduction is on 
> [GitHub|https://github.com/JacekWislicki/vp-test2]. There are 2 simple jobs 
> (SimpleJob1 and SimpleJob2) using basic models for the Pulsar source 
> definition (PulsarMessage1 and PulsarMessage2, respectively). Each of the 
> corresponding schemata is properly serialised and deserialised, unless an 
> effective byte array length becomes excessive (marked with "the problem 
> begins" in model classes). The fail condition can be achieved by a number of 
> fields (PulsarMessage1) or just longer field names (PulsarMessage2). The 
> problem occurs on either Avro or a JSON schema set in the Pulsar source 
> definition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28609) Flink-Pulsar connector fails on larger schemas

2022-07-25 Thread Jacek Wislicki (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17570716#comment-17570716
 ] 

Jacek Wislicki commented on FLINK-28609:


Thank you, but unfortunately we are constrained to used only official releases 
that can be pulled from public Maven repos. For development purposes, we will 
be using the old (deprecated) connector version from StreamNative/Pulsar.

 

Do you know a time plan from Flink 1.16 to be released? I see that there is a 
lot of work done but still some issues are open and being discussed.  

> Flink-Pulsar connector fails on larger schemas
> --
>
> Key: FLINK-28609
> URL: https://issues.apache.org/jira/browse/FLINK-28609
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.14.3, 1.14.4, 1.14.5, 1.15.1
>Reporter: Jacek Wislicki
>Priority: Major
>  Labels: pull-request-available
> Attachments: 
> [FLINK-28609][Connector_Pulsar]_PulsarSchema_didn't_get_properly_serialized_.patch,
>  exception.txt
>
>
> When a model results in a larger schema (this seems to be related to its byte 
> array representation), the number of expected bytes to read is different than 
> the number of actually read bytes: [^exception.txt]. The "read" is such a 
> case is always 1018 while the expected "byteLen" gives a greater value. For 
> smaller schemata, the numbers are equal (less than 1018) and no issue occurs.
> The problem reproduction is on 
> [GitHub|https://github.com/JacekWislicki/vp-test2]. There are 2 simple jobs 
> (SimpleJob1 and SimpleJob2) using basic models for the Pulsar source 
> definition (PulsarMessage1 and PulsarMessage2, respectively). Each of the 
> corresponding schemata is properly serialised and deserialised, unless an 
> effective byte array length becomes excessive (marked with "the problem 
> begins" in model classes). The fail condition can be achieved by a number of 
> fields (PulsarMessage1) or just longer field names (PulsarMessage2). The 
> problem occurs on either Avro or a JSON schema set in the Pulsar source 
> definition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28609) Flink-Pulsar connector fails on larger schemas

2022-07-21 Thread Jacek Wislicki (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17569678#comment-17569678
 ] 

Jacek Wislicki commented on FLINK-28609:


[~syhily] - it would be perfect to have it in any of the current releases. 
Still, if this is not going to be possible, we will need to wait for 1.16. Do 
you know when we could expect this one? 

> Flink-Pulsar connector fails on larger schemas
> --
>
> Key: FLINK-28609
> URL: https://issues.apache.org/jira/browse/FLINK-28609
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.14.3, 1.14.4, 1.14.5, 1.15.1
>Reporter: Jacek Wislicki
>Priority: Major
>  Labels: pull-request-available
> Attachments: 
> [FLINK-28609][Connector_Pulsar]_PulsarSchema_didn't_get_properly_serialized_.patch,
>  exception.txt
>
>
> When a model results in a larger schema (this seems to be related to its byte 
> array representation), the number of expected bytes to read is different than 
> the number of actually read bytes: [^exception.txt]. The "read" is such a 
> case is always 1018 while the expected "byteLen" gives a greater value. For 
> smaller schemata, the numbers are equal (less than 1018) and no issue occurs.
> The problem reproduction is on 
> [GitHub|https://github.com/JacekWislicki/vp-test2]. There are 2 simple jobs 
> (SimpleJob1 and SimpleJob2) using basic models for the Pulsar source 
> definition (PulsarMessage1 and PulsarMessage2, respectively). Each of the 
> corresponding schemata is properly serialised and deserialised, unless an 
> effective byte array length becomes excessive (marked with "the problem 
> begins" in model classes). The fail condition can be achieved by a number of 
> fields (PulsarMessage1) or just longer field names (PulsarMessage2). The 
> problem occurs on either Avro or a JSON schema set in the Pulsar source 
> definition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-28609) Flink-Pulsar connector fails on larger schemas

2022-07-21 Thread Jacek Wislicki (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17569473#comment-17569473
 ] 

Jacek Wislicki commented on FLINK-28609:


Great, thank you, Yufan!

> Flink-Pulsar connector fails on larger schemas
> --
>
> Key: FLINK-28609
> URL: https://issues.apache.org/jira/browse/FLINK-28609
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Pulsar
>Affects Versions: 1.14.3, 1.14.4, 1.14.5, 1.15.1
>Reporter: Jacek Wislicki
>Priority: Major
> Attachments: 
> [FLINK-28609][Connector_Pulsar]_PulsarSchema_didn't_get_properly_serialized_.patch,
>  exception.txt
>
>
> When a model results in a larger schema (this seems to be related to its byte 
> array representation), the number of expected bytes to read is different than 
> the number of actually read bytes: [^exception.txt]. The "read" is such a 
> case is always 1018 while the expected "byteLen" gives a greater value. For 
> smaller schemata, the numbers are equal (less than 1018) and no issue occurs.
> The problem reproduction is on 
> [GitHub|https://github.com/JacekWislicki/vp-test2]. There are 2 simple jobs 
> (SimpleJob1 and SimpleJob2) using basic models for the Pulsar source 
> definition (PulsarMessage1 and PulsarMessage2, respectively). Each of the 
> corresponding schemata is properly serialised and deserialised, unless an 
> effective byte array length becomes excessive (marked with "the problem 
> begins" in model classes). The fail condition can be achieved by a number of 
> fields (PulsarMessage1) or just longer field names (PulsarMessage2). The 
> problem occurs on either Avro or a JSON schema set in the Pulsar source 
> definition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28609) Flink-Pulsar connector fails on larger schemas

2022-07-19 Thread Jacek Wislicki (Jira)
Jacek Wislicki created FLINK-28609:
--

 Summary: Flink-Pulsar connector fails on larger schemas
 Key: FLINK-28609
 URL: https://issues.apache.org/jira/browse/FLINK-28609
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Pulsar
Affects Versions: 1.15.1, 1.14.5, 1.14.4, 1.14.3
Reporter: Jacek Wislicki
 Attachments: exception.txt

When a model results in a larger schema (this seems to be related to its byte 
array representation), the number of expected bytes to read is different than 
the number of actually read bytes: [^exception.txt]. The "read" is such a case 
is always 1018 while the expected "byteLen" gives a greater value. For smaller 
schemata, the numbers are equal (less than 1018) and no issue occurs.

The problem reproduction is on 
[GitHub|https://github.com/JacekWislicki/vp-test2]. There are 2 simple jobs 
(SimpleJob1 and SimpleJob2) using basic models for the Pulsar source definition 
(PulsarMessage1 and PulsarMessage2, respectively). Each of the corresponding 
schemata is properly serialised and deserialised, unless an effective byte 
array length becomes excessive (marked with "the problem begins" in model 
classes). The fail condition can be achieved by a number of fields 
(PulsarMessage1) or just longer field names (PulsarMessage2). The problem 
occurs on either Avro or a JSON schema set in the Pulsar source definition.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)