[ 
https://issues.apache.org/jira/browse/FLINK-20227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Mathews updated FLINK-20227:
---------------------------------
    Description: 
Flink uses the task name to generate the transactionId for the kafka producers. 
See: 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1088

If the task name is sufficiently large (e.g. there are a large number of column 
names present), this can cause Kafka to fail to produce records with error:

"Error writing field 'coordinator_key': String length 34155 is larger than the 
maximum string length."

with stacktrace:
"
org.apache.kafka.common.protocol.types.SchemaException: 
{throwable0_message}\n\tat 
org.apache.kafka.common.protocol.types.Schema.write(Schema.java:61)\n\tat 
org.apache.kafka.common.protocol.types.Struct.writeTo(Struct.java:441)\n\tat 
org.apache.kafka.common.requests.AbstractRequestResponse.serialize(AbstractRequestResponse.java:30)\n\tat
 
org.apache.kafka.common.requests.AbstractRequest.serialize(AbstractRequest.java:101)\n\tat
 
org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:94)\n\tat
 org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:499)\n\tat 
org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:473)\n\tat 
org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:433)\n\tat 
org.apache.kafka.clients.producer.internals.Sender.maybeSendTransactionalRequest(Sender.java:437)\n\tat
 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:286)\n\tat
 org.apache.kafka.clients.producer.internals.Sender.run"

Is there a way to control these task names for the Table API + SQL? If not, can 
we limit the characters to ensure it is less than the 32k limit Kafka imposes?

  was:
Flink uses the task name to generate the transactionId for the kafka producers. 
See: 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1088

If the task name is sufficiently large (e.g. there are a large number of column 
names present), this can cause Kafka to fail to produce records with:

"Error writing field 'coordinator_key': String length 34155 is larger than the 
maximum string length."

with stacktrace:
"
org.apache.kafka.common.protocol.types.SchemaException: 
{throwable0_message}\n\tat 
org.apache.kafka.common.protocol.types.Schema.write(Schema.java:61)\n\tat 
org.apache.kafka.common.protocol.types.Struct.writeTo(Struct.java:441)\n\tat 
org.apache.kafka.common.requests.AbstractRequestResponse.serialize(AbstractRequestResponse.java:30)\n\tat
 
org.apache.kafka.common.requests.AbstractRequest.serialize(AbstractRequest.java:101)\n\tat
 
org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:94)\n\tat
 org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:499)\n\tat 
org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:473)\n\tat 
org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:433)\n\tat 
org.apache.kafka.clients.producer.internals.Sender.maybeSendTransactionalRequest(Sender.java:437)\n\tat
 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:286)\n\tat
 org.apache.kafka.clients.producer.internals.Sender.run"

Is there a way to control these task names for the Table API + SQL? If not, can 
we limit the characters to ensure it is less than the 32k limit Kafka imposes?


> Kafka transaction IDs exceeding limit
> -------------------------------------
>
>                 Key: FLINK-20227
>                 URL: https://issues.apache.org/jira/browse/FLINK-20227
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>            Reporter: John Mathews
>            Priority: Major
>
> Flink uses the task name to generate the transactionId for the kafka 
> producers. See: 
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java#L1088
> If the task name is sufficiently large (e.g. there are a large number of 
> column names present), this can cause Kafka to fail to produce records with 
> error:
> "Error writing field 'coordinator_key': String length 34155 is larger than 
> the maximum string length."
> with stacktrace:
> "
> org.apache.kafka.common.protocol.types.SchemaException: 
> {throwable0_message}\n\tat 
> org.apache.kafka.common.protocol.types.Schema.write(Schema.java:61)\n\tat 
> org.apache.kafka.common.protocol.types.Struct.writeTo(Struct.java:441)\n\tat 
> org.apache.kafka.common.requests.AbstractRequestResponse.serialize(AbstractRequestResponse.java:30)\n\tat
>  
> org.apache.kafka.common.requests.AbstractRequest.serialize(AbstractRequest.java:101)\n\tat
>  
> org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:94)\n\tat
>  org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:499)\n\tat 
> org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:473)\n\tat 
> org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:433)\n\tat 
> org.apache.kafka.clients.producer.internals.Sender.maybeSendTransactionalRequest(Sender.java:437)\n\tat
>  
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:286)\n\tat
>  org.apache.kafka.clients.producer.internals.Sender.run"
> Is there a way to control these task names for the Table API + SQL? If not, 
> can we limit the characters to ensure it is less than the 32k limit Kafka 
> imposes?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to