[ https://issues.apache.org/jira/browse/FLINK-20227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
John Mathews updated FLINK-20227: --------------------------------- Description: Flink uses the task name to generate the transactionId for the kafka producers. See: https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1088 If the task name is sufficiently large (e.g. there are a large number of column names present), this can cause Kafka to fail to produce records with error: "Error writing field 'coordinator_key': String length 34155 is larger than the maximum string length." with stacktrace: " org.apache.kafka.common.protocol.types.SchemaException: {throwable0_message}\n\tat org.apache.kafka.common.protocol.types.Schema.write(Schema.java:61)\n\tat org.apache.kafka.common.protocol.types.Struct.writeTo(Struct.java:441)\n\tat org.apache.kafka.common.requests.AbstractRequestResponse.serialize(AbstractRequestResponse.java:30)\n\tat org.apache.kafka.common.requests.AbstractRequest.serialize(AbstractRequest.java:101)\n\tat org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:94)\n\tat org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:499)\n\tat org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:473)\n\tat org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:433)\n\tat org.apache.kafka.clients.producer.internals.Sender.maybeSendTransactionalRequest(Sender.java:437)\n\tat org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:286)\n\tat org.apache.kafka.clients.producer.internals.Sender.run" Is there a way to control these task names for the Table API + SQL? If not, can we limit the characters to ensure it is less than the 32k limit Kafka imposes? was: Flink uses the task name to generate the transactionId for the kafka producers. See: https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1088 If the task name is sufficiently large (e.g. there are a large number of column names present), this can cause Kafka to fail to produce records with: "Error writing field 'coordinator_key': String length 34155 is larger than the maximum string length." with stacktrace: " org.apache.kafka.common.protocol.types.SchemaException: {throwable0_message}\n\tat org.apache.kafka.common.protocol.types.Schema.write(Schema.java:61)\n\tat org.apache.kafka.common.protocol.types.Struct.writeTo(Struct.java:441)\n\tat org.apache.kafka.common.requests.AbstractRequestResponse.serialize(AbstractRequestResponse.java:30)\n\tat org.apache.kafka.common.requests.AbstractRequest.serialize(AbstractRequest.java:101)\n\tat org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:94)\n\tat org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:499)\n\tat org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:473)\n\tat org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:433)\n\tat org.apache.kafka.clients.producer.internals.Sender.maybeSendTransactionalRequest(Sender.java:437)\n\tat org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:286)\n\tat org.apache.kafka.clients.producer.internals.Sender.run" Is there a way to control these task names for the Table API + SQL? If not, can we limit the characters to ensure it is less than the 32k limit Kafka imposes? > Kafka transaction IDs exceeding limit > ------------------------------------- > > Key: FLINK-20227 > URL: https://issues.apache.org/jira/browse/FLINK-20227 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Reporter: John Mathews > Priority: Major > > Flink uses the task name to generate the transactionId for the kafka > producers. See: > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1088 > If the task name is sufficiently large (e.g. there are a large number of > column names present), this can cause Kafka to fail to produce records with > error: > "Error writing field 'coordinator_key': String length 34155 is larger than > the maximum string length." > with stacktrace: > " > org.apache.kafka.common.protocol.types.SchemaException: > {throwable0_message}\n\tat > org.apache.kafka.common.protocol.types.Schema.write(Schema.java:61)\n\tat > org.apache.kafka.common.protocol.types.Struct.writeTo(Struct.java:441)\n\tat > org.apache.kafka.common.requests.AbstractRequestResponse.serialize(AbstractRequestResponse.java:30)\n\tat > > org.apache.kafka.common.requests.AbstractRequest.serialize(AbstractRequest.java:101)\n\tat > > org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:94)\n\tat > org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:499)\n\tat > org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:473)\n\tat > org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:433)\n\tat > org.apache.kafka.clients.producer.internals.Sender.maybeSendTransactionalRequest(Sender.java:437)\n\tat > > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:286)\n\tat > org.apache.kafka.clients.producer.internals.Sender.run" > Is there a way to control these task names for the Table API + SQL? If not, > can we limit the characters to ensure it is less than the 32k limit Kafka > imposes? -- This message was sent by Atlassian Jira (v8.3.4#803005)