[jira] [Commented] (FLINK-34436) Avro schema evolution and compatibility issues in Pulsar connector
[ https://issues.apache.org/jira/browse/FLINK-34436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17822065#comment-17822065 ] Jacek Wislicki commented on FLINK-34436: Please let me know if you have any thought on this. Of course, a good explanation why my understanding of the behaviour is wrong will also be valuable. > Avro schema evolution and compatibility issues in Pulsar connector > -- > > Key: FLINK-34436 > URL: https://issues.apache.org/jira/browse/FLINK-34436 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.17.2 >Reporter: Jacek Wislicki >Priority: Major > > We noticed a couple of critical issues in the Pulsar-Flink connector related > to schema evolution and compatibility. Please see the MRE available at > https://github.com/JacekWislicki/test11. More details are in the project's > README file, here is the summary: > Library versions: > * Pulsar 3.0.1 > * Flink 1.17.2 > * Pulsar-Flink connector 4.1.0-1.17 > Problems: > * Exception thrown when schema's fields are added/removed > * Avro's enum default value is ignored, instead the last known applied > I believe that I observed the same behaviour in the Pulsar itself, still now > we are focusing on the connector, hence I was able to document the problems > when using it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34436) Avro schema evolution and compatibility issues in Pulsar connector
[ https://issues.apache.org/jira/browse/FLINK-34436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17818371#comment-17818371 ] Jacek Wislicki commented on FLINK-34436: A note: in the MRE there is ALWAYS_COMPATIBLE used, although I tested it also with BACKWARD, BACKWARD_TRANSITIVE, FORWARD, FORWARD_TRANSITIVE, FULL and FULL_TRANSITIVE, with the same result. > Avro schema evolution and compatibility issues in Pulsar connector > -- > > Key: FLINK-34436 > URL: https://issues.apache.org/jira/browse/FLINK-34436 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.17.2 >Reporter: Jacek Wislicki >Priority: Major > > We noticed a couple of critical issues in the Pulsar-Flink connector related > to schema evolution and compatibility. Please see the MRE available at > https://github.com/JacekWislicki/test11. More details are in the project's > README file, here is the summary: > Library versions: > * Pulsar 3.0.1 > * Flink 1.17.2 > * Pulsar-Flink connector 4.1.0-1.17 > Problems: > * Exception thrown when schema's fields are added/removed > * Avro's enum default value is ignored, instead the last known applied > I believe that I observed the same behaviour in the Pulsar itself, still now > we are focusing on the connector, hence I was able to document the problems > when using it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34436) Avro schema evolution and compatibility issues in Pulsar connector
[ https://issues.apache.org/jira/browse/FLINK-34436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817629#comment-17817629 ] Jacek Wislicki commented on FLINK-34436: The problem seems to be in org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarSchemaWrapper.deserialize(Message, Collector): {code:java} @Override public void deserialize(Message message, Collector out) throws Exception { Schema schema = this.pulsarSchema.getPulsarSchema(); byte[] bytes = message.getData(); T instance = schema.decode(bytes); out.collect(instance); } {code} this.pulsarSchema.getPulsarSchema() returns the expected schema, not the schema in which the message was actually encoded. For proper conversion, both source (encoding) and target (decoding) schemas are needed. > Avro schema evolution and compatibility issues in Pulsar connector > -- > > Key: FLINK-34436 > URL: https://issues.apache.org/jira/browse/FLINK-34436 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.17.2 >Reporter: Jacek Wislicki >Priority: Major > > We noticed a couple of critical issues in the Pulsar-Flink connector related > to schema evolution and compatibility. Please see the MRE available at > https://github.com/JacekWislicki/test11. More details are in the project's > README file, here is the summary: > Library versions: > * Pulsar 3.0.1 > * Flink 1.17.2 > * Pulsar-Flink connector 4.1.0-1.17 > Problems: > * Exception thrown when schema's fields are added/removed > * Avro's enum default value is ignored, instead the last known applied > I believe that I observed the same behaviour in the Pulsar itself, still now > we are focusing on the connector, hence I was able to document the problems > when using it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-34436) Avro schema evolution and compatibility issues in Pulsar connector
[ https://issues.apache.org/jira/browse/FLINK-34436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17817467#comment-17817467 ] Jacek Wislicki commented on FLINK-34436: Added also a simpler example (without enumerations that seem to be the key problem): https://github.com/JacekWislicki/test12 > Avro schema evolution and compatibility issues in Pulsar connector > -- > > Key: FLINK-34436 > URL: https://issues.apache.org/jira/browse/FLINK-34436 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.17.2 >Reporter: Jacek Wislicki >Priority: Major > > We noticed a couple of critical issues in the Pulsar-Flink connector related > to schema evolution and compatibility. Please see the MRE available at > https://github.com/JacekWislicki/test11. More details are in the project's > README file, here is the summary: > Library versions: > * Pulsar 3.0.1 > * Flink 1.17.2 > * Pulsar-Flink connector 4.1.0-1.17 > Problems: > * Exception thrown when schema's fields are added/removed > * Avro's enum default value is ignored, instead the last known applied > I believe that I observed the same behaviour in the Pulsar itself, still now > we are focusing on the connector, hence I was able to document the problems > when using it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-34436) Avro schema evolution and compatibility issues in Pulsar connector
[ https://issues.apache.org/jira/browse/FLINK-34436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jacek Wislicki updated FLINK-34436: --- Summary: Avro schema evolution and compatibility issues in Pulsar connector (was: Avro schema evolution and compatibility issues) > Avro schema evolution and compatibility issues in Pulsar connector > -- > > Key: FLINK-34436 > URL: https://issues.apache.org/jira/browse/FLINK-34436 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.17.2 >Reporter: Jacek Wislicki >Priority: Major > > We noticed a couple of critical issues in the Pulsar-Flink connector related > to schema evolution and compatibility. Please see the MRE available at > https://github.com/JacekWislicki/test11. More details are in the project's > README file, here is the summary: > Library versions: > * Pulsar 3.0.1 > * Flink 1.17.2 > * Pulsar-Flink connector 4.1.0-1.17 > Problems: > * Exception thrown when schema's fields are added/removed > * Avro's enum default value is ignored, instead the last known applied > I believe that I observed the same behaviour in the Pulsar itself, still now > we are focusing on the connector, hence I was able to document the problems > when using it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-34436) Avro schema evolution and compatibility issues
Jacek Wislicki created FLINK-34436: -- Summary: Avro schema evolution and compatibility issues Key: FLINK-34436 URL: https://issues.apache.org/jira/browse/FLINK-34436 Project: Flink Issue Type: Bug Components: Connectors / Pulsar Affects Versions: 1.17.2 Reporter: Jacek Wislicki We noticed a couple of critical issues in the Pulsar-Flink connector related to schema evolution and compatibility. Please see the MRE available at https://github.com/JacekWislicki/test11. More details are in the project's README file, here is the summary: Library versions: * Pulsar 3.0.1 * Flink 1.17.2 * Pulsar-Flink connector 4.1.0-1.17 Problems: * Exception thrown when schema's fields are added/removed * Avro's enum default value is ignored, instead the last known applied I believe that I observed the same behaviour in the Pulsar itself, still now we are focusing on the connector, hence I was able to document the problems when using it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-30681) Pulsar-Flink connector corrupts its output topic
Jacek Wislicki created FLINK-30681: -- Summary: Pulsar-Flink connector corrupts its output topic Key: FLINK-30681 URL: https://issues.apache.org/jira/browse/FLINK-30681 Project: Flink Issue Type: Bug Components: Connectors / Pulsar Affects Versions: 1.15.3 Reporter: Jacek Wislicki When PulsarSink writes a message to its output topic, the topic gets permanently corrupted and cannot be used anymore (even with newly created subscriptions). We have isolated this behaviour to a minimal project demonstrating the problem available on [GitHub|https://github.com/JacekWislicki/vp-test5]: # There are 2 topics: IN and OUT # IN is subscribed by a Flink's InToOutJob (with PulsarSource) and writes to OUT (with PulsarSink) # OUT is subscribed by a Pulsar's OutReadFunction # When we write directly to OUT (e.g., with OutTopicProducer), OutReadFunction gets each message from its backlog and processes it with no issue (the ledger position updates) # When we write to IN (e.g., with InTopicProducer), InToOutJob reads the message, processes it and writes to OUT # OutReadFunction reads the message, the ledger position updates, but nothing happens ## Further messages written to OUT are not read as OUT is blocked on the last message from Flink ## Truncating OUT does not help, neither does unsubscribing or creating a new subscription Reproduced with Pulsar 2.9.1, 2.9.2 and 2.10.2. The issue does not occur when we use our custom temporary old SinkFunction implementation based on a Pulsar producer writing to OUT. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28609) Flink-Pulsar connector fails on larger schemas
[ https://issues.apache.org/jira/browse/FLINK-28609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17570783#comment-17570783 ] Jacek Wislicki commented on FLINK-28609: It's perfect, thank you, [~martijnvisser]! > Flink-Pulsar connector fails on larger schemas > -- > > Key: FLINK-28609 > URL: https://issues.apache.org/jira/browse/FLINK-28609 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.14.3, 1.14.4, 1.14.5, 1.15.1 >Reporter: Jacek Wislicki >Priority: Major > Labels: pull-request-available > Attachments: > [FLINK-28609][Connector_Pulsar]_PulsarSchema_didn't_get_properly_serialized_.patch, > exception.txt > > > When a model results in a larger schema (this seems to be related to its byte > array representation), the number of expected bytes to read is different than > the number of actually read bytes: [^exception.txt]. The "read" is such a > case is always 1018 while the expected "byteLen" gives a greater value. For > smaller schemata, the numbers are equal (less than 1018) and no issue occurs. > The problem reproduction is on > [GitHub|https://github.com/JacekWislicki/vp-test2]. There are 2 simple jobs > (SimpleJob1 and SimpleJob2) using basic models for the Pulsar source > definition (PulsarMessage1 and PulsarMessage2, respectively). Each of the > corresponding schemata is properly serialised and deserialised, unless an > effective byte array length becomes excessive (marked with "the problem > begins" in model classes). The fail condition can be achieved by a number of > fields (PulsarMessage1) or just longer field names (PulsarMessage2). The > problem occurs on either Avro or a JSON schema set in the Pulsar source > definition. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28609) Flink-Pulsar connector fails on larger schemas
[ https://issues.apache.org/jira/browse/FLINK-28609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17570716#comment-17570716 ] Jacek Wislicki commented on FLINK-28609: Thank you, but unfortunately we are constrained to used only official releases that can be pulled from public Maven repos. For development purposes, we will be using the old (deprecated) connector version from StreamNative/Pulsar. Do you know a time plan from Flink 1.16 to be released? I see that there is a lot of work done but still some issues are open and being discussed. > Flink-Pulsar connector fails on larger schemas > -- > > Key: FLINK-28609 > URL: https://issues.apache.org/jira/browse/FLINK-28609 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.14.3, 1.14.4, 1.14.5, 1.15.1 >Reporter: Jacek Wislicki >Priority: Major > Labels: pull-request-available > Attachments: > [FLINK-28609][Connector_Pulsar]_PulsarSchema_didn't_get_properly_serialized_.patch, > exception.txt > > > When a model results in a larger schema (this seems to be related to its byte > array representation), the number of expected bytes to read is different than > the number of actually read bytes: [^exception.txt]. The "read" is such a > case is always 1018 while the expected "byteLen" gives a greater value. For > smaller schemata, the numbers are equal (less than 1018) and no issue occurs. > The problem reproduction is on > [GitHub|https://github.com/JacekWislicki/vp-test2]. There are 2 simple jobs > (SimpleJob1 and SimpleJob2) using basic models for the Pulsar source > definition (PulsarMessage1 and PulsarMessage2, respectively). Each of the > corresponding schemata is properly serialised and deserialised, unless an > effective byte array length becomes excessive (marked with "the problem > begins" in model classes). The fail condition can be achieved by a number of > fields (PulsarMessage1) or just longer field names (PulsarMessage2). The > problem occurs on either Avro or a JSON schema set in the Pulsar source > definition. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28609) Flink-Pulsar connector fails on larger schemas
[ https://issues.apache.org/jira/browse/FLINK-28609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17569678#comment-17569678 ] Jacek Wislicki commented on FLINK-28609: [~syhily] - it would be perfect to have it in any of the current releases. Still, if this is not going to be possible, we will need to wait for 1.16. Do you know when we could expect this one? > Flink-Pulsar connector fails on larger schemas > -- > > Key: FLINK-28609 > URL: https://issues.apache.org/jira/browse/FLINK-28609 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.14.3, 1.14.4, 1.14.5, 1.15.1 >Reporter: Jacek Wislicki >Priority: Major > Labels: pull-request-available > Attachments: > [FLINK-28609][Connector_Pulsar]_PulsarSchema_didn't_get_properly_serialized_.patch, > exception.txt > > > When a model results in a larger schema (this seems to be related to its byte > array representation), the number of expected bytes to read is different than > the number of actually read bytes: [^exception.txt]. The "read" is such a > case is always 1018 while the expected "byteLen" gives a greater value. For > smaller schemata, the numbers are equal (less than 1018) and no issue occurs. > The problem reproduction is on > [GitHub|https://github.com/JacekWislicki/vp-test2]. There are 2 simple jobs > (SimpleJob1 and SimpleJob2) using basic models for the Pulsar source > definition (PulsarMessage1 and PulsarMessage2, respectively). Each of the > corresponding schemata is properly serialised and deserialised, unless an > effective byte array length becomes excessive (marked with "the problem > begins" in model classes). The fail condition can be achieved by a number of > fields (PulsarMessage1) or just longer field names (PulsarMessage2). The > problem occurs on either Avro or a JSON schema set in the Pulsar source > definition. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-28609) Flink-Pulsar connector fails on larger schemas
[ https://issues.apache.org/jira/browse/FLINK-28609?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17569473#comment-17569473 ] Jacek Wislicki commented on FLINK-28609: Great, thank you, Yufan! > Flink-Pulsar connector fails on larger schemas > -- > > Key: FLINK-28609 > URL: https://issues.apache.org/jira/browse/FLINK-28609 > Project: Flink > Issue Type: Bug > Components: Connectors / Pulsar >Affects Versions: 1.14.3, 1.14.4, 1.14.5, 1.15.1 >Reporter: Jacek Wislicki >Priority: Major > Attachments: > [FLINK-28609][Connector_Pulsar]_PulsarSchema_didn't_get_properly_serialized_.patch, > exception.txt > > > When a model results in a larger schema (this seems to be related to its byte > array representation), the number of expected bytes to read is different than > the number of actually read bytes: [^exception.txt]. The "read" is such a > case is always 1018 while the expected "byteLen" gives a greater value. For > smaller schemata, the numbers are equal (less than 1018) and no issue occurs. > The problem reproduction is on > [GitHub|https://github.com/JacekWislicki/vp-test2]. There are 2 simple jobs > (SimpleJob1 and SimpleJob2) using basic models for the Pulsar source > definition (PulsarMessage1 and PulsarMessage2, respectively). Each of the > corresponding schemata is properly serialised and deserialised, unless an > effective byte array length becomes excessive (marked with "the problem > begins" in model classes). The fail condition can be achieved by a number of > fields (PulsarMessage1) or just longer field names (PulsarMessage2). The > problem occurs on either Avro or a JSON schema set in the Pulsar source > definition. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28609) Flink-Pulsar connector fails on larger schemas
Jacek Wislicki created FLINK-28609: -- Summary: Flink-Pulsar connector fails on larger schemas Key: FLINK-28609 URL: https://issues.apache.org/jira/browse/FLINK-28609 Project: Flink Issue Type: Bug Components: Connectors / Pulsar Affects Versions: 1.15.1, 1.14.5, 1.14.4, 1.14.3 Reporter: Jacek Wislicki Attachments: exception.txt When a model results in a larger schema (this seems to be related to its byte array representation), the number of expected bytes to read is different than the number of actually read bytes: [^exception.txt]. The "read" is such a case is always 1018 while the expected "byteLen" gives a greater value. For smaller schemata, the numbers are equal (less than 1018) and no issue occurs. The problem reproduction is on [GitHub|https://github.com/JacekWislicki/vp-test2]. There are 2 simple jobs (SimpleJob1 and SimpleJob2) using basic models for the Pulsar source definition (PulsarMessage1 and PulsarMessage2, respectively). Each of the corresponding schemata is properly serialised and deserialised, unless an effective byte array length becomes excessive (marked with "the problem begins" in model classes). The fail condition can be achieved by a number of fields (PulsarMessage1) or just longer field names (PulsarMessage2). The problem occurs on either Avro or a JSON schema set in the Pulsar source definition. -- This message was sent by Atlassian Jira (v8.20.10#820010)