Chenchu Lakshman kumar created KAFKA-7590:
---------------------------------------------
Summary: GETTING HUGE MESSAGE STRUCTURE THROUGH JMS CONNECTOR
Key: KAFKA-7590
URL: https://issues.apache.org/jira/browse/KAFKA-7590
Project: Kafka
Issue Type: Test
Components: config, KafkaConnect
Affects Versions: 2.0.0
Reporter: Chenchu Lakshman kumar
Message
--------------------------------------------------------------------
{"schema":\{"type":"struct","fields":[{"type":"string","optional":false,"doc":"This
field stores the value of `Message.getJMSMessageID()
<http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSMessageID()>`_.","field":"messageID"},\{"type":"string","optional":false,"doc":"This
field stores the type of message that was received. This corresponds to the
subinterfaces of `Message
<http://docs.oracle.com/javaee/6/api/javax/jms/Message.html>`_. `BytesMessage
<http://docs.oracle.com/javaee/6/api/javax/jms/BytesMessage.html>`_ = `bytes`,
`MapMessage <http://docs.oracle.com/javaee/6/api/javax/jms/MapMessage.html>`_ =
`map`, `ObjectMessage
<http://docs.oracle.com/javaee/6/api/javax/jms/ObjectMessage.html>`_ =
`object`, `StreamMessage
<http://docs.oracle.com/javaee/6/api/javax/jms/StreamMessage.html>`_ = `stream`
and `TextMessage
<http://docs.oracle.com/javaee/6/api/javax/jms/TextMessage.html>`_ = `text`.
The corresponding field will be populated with the values from the respective
Message
subinterface.","field":"messageType"},\{"type":"int64","optional":false,"doc":"Data
from the `getJMSTimestamp()
<http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSTimestamp()>`_
method.","field":"timestamp"},\{"type":"int32","optional":false,"doc":"This
field stores the value of `Message.getJMSDeliveryMode()
<http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSDeliveryMode()>`_.","field":"deliveryMode"},\{"type":"string","optional":true,"doc":"This
field stores the value of `Message.getJMSCorrelationID()
<http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSCorrelationID()>`_.","field":"correlationID"},\{"type":"struct","fields":[{"type":"string","optional":false,"doc":"The
type of JMS Destination, and either ``queue`` or
``topic``.","field":"destinationType"},\{"type":"string","optional":false,"doc":"The
name of the destination. This will be the value of `Queue.getQueueName()
<http://docs.oracle.com/javaee/6/api/javax/jms/Queue.html#getQueueName()>`_ or
`Topic.getTopicName()
<http://docs.oracle.com/javaee/6/api/javax/jms/Topic.html#getTopicName()>`_.","field":"name"}],"optional":true,"name":"io.confluent.connect.jms.Destination","doc":"This
schema is used to represent a JMS Destination, and is either `queue
<http://docs.oracle.com/javaee/6/api/javax/jms/Queue.html>`_ or `topic
<http://docs.oracle.com/javaee/6/api/javax/jms/Topic.html>`_.","field":"replyTo"},\{"type":"struct","fields":[{"type":"string","optional":false,"doc":"The
type of JMS Destination, and either ``queue`` or
``topic``.","field":"destinationType"},\{"type":"string","optional":false,"doc":"The
name of the destination. This will be the value of `Queue.getQueueName()
<http://docs.oracle.com/javaee/6/api/javax/jms/Queue.html#getQueueName()>`_ or
`Topic.getTopicName()
<http://docs.oracle.com/javaee/6/api/javax/jms/Topic.html#getTopicName()>`_.","field":"name"}],"optional":true,"name":"io.confluent.connect.jms.Destination","doc":"This
schema is used to represent a JMS Destination, and is either `queue
<http://docs.oracle.com/javaee/6/api/javax/jms/Queue.html>`_ or `topic
<http://docs.oracle.com/javaee/6/api/javax/jms/Topic.html>`_.","field":"destination"},\{"type":"boolean","optional":false,"doc":"This
field stores the value of `Message.getJMSRedelivered()
<http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSRedelivered()>`_.","field":"redelivered"},\{"type":"string","optional":true,"doc":"This
field stores the value of `Message.getJMSType()
<http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSType()>`_.","field":"type"},\{"type":"int64","optional":false,"doc":"This
field stores the value of `Message.getJMSExpiration()
<http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSExpiration()>`_.","field":"expiration"},\{"type":"int32","optional":false,"doc":"This
field stores the value of `Message.getJMSPriority()
<http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getJMSPriority()>`_.","field":"priority"},\{"type":"map","keys":{"type":"string","optional":false},"values":\{"type":"struct","fields":[{"type":"string","optional":false,"doc":"The
java type of the property on the Message. One of ``boolean``, ``byte``,
``short``, ``integer``, ``long``, ``float``, ``double``, or
``string``.","field":"propertyType"},\{"type":"boolean","optional":true,"doc":"The
value stored as a boolean. Null unless ``propertyType`` is set to
``boolean``.","field":"boolean"},\{"type":"int8","optional":true,"doc":"The
value stored as a byte. Null unless ``propertyType`` is set to
``byte``.","field":"byte"},\{"type":"int16","optional":true,"doc":"The value
stored as a short. Null unless ``propertyType`` is set to
``short``.","field":"short"},\{"type":"int32","optional":true,"doc":"The value
stored as a integer. Null unless ``propertyType`` is set to
``integer``.","field":"integer"},\{"type":"int64","optional":true,"doc":"The
value stored as a long. Null unless ``propertyType`` is set to
``long``.","field":"long"},\{"type":"float","optional":true,"doc":"The value
stored as a float. Null unless ``propertyType`` is set to
``float``.","field":"float"},\{"type":"double","optional":true,"doc":"The value
stored as a double. Null unless ``propertyType`` is set to
``double``.","field":"double"},\{"type":"string","optional":true,"doc":"The
value stored as a string. Null unless ``propertyType`` is set to
``string``.","field":"string"}],"optional":false,"name":"io.confluent.connect.jms.PropertyValue","doc":"This
schema is used to store the data that is found in the properties of the
message. To ensure that the proper type mappings are preserved field
``propertyType`` stores the value type for the field. The corresponding field
in the schema will contain the data for the property. This ensures that the
data is retrievable as the type returned by `Message.getObjectProperty()
<http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getObjectProperty(java.lang.String)>`_."},"optional":false,"doc":"This
field stores the data from all of the properties for the Message indexed by
their
propertyName.","field":"properties"},\{"type":"bytes","optional":true,"doc":"This
field stores the value from `BytesMessage.html.readBytes(byte[])
<http://docs.oracle.com/javaee/6/api/javax/jms/BytesMessage.html#readBytes(byte[])>`_.","field":"bytes"},\{"type":"map","keys":{"type":"string","optional":false},"values":\{"type":"struct","fields":[{"type":"string","optional":false,"doc":"The
java type of the property on the Message. One of ``boolean``, ``byte``,
``short``, ``integer``, ``long``, ``float``, ``double``, or
``string``.","field":"propertyType"},\{"type":"boolean","optional":true,"doc":"The
value stored as a boolean. Null unless ``propertyType`` is set to
``boolean``.","field":"boolean"},\{"type":"int8","optional":true,"doc":"The
value stored as a byte. Null unless ``propertyType`` is set to
``byte``.","field":"byte"},\{"type":"int16","optional":true,"doc":"The value
stored as a short. Null unless ``propertyType`` is set to
``short``.","field":"short"},\{"type":"int32","optional":true,"doc":"The value
stored as a integer. Null unless ``propertyType`` is set to
``integer``.","field":"integer"},\{"type":"int64","optional":true,"doc":"The
value stored as a long. Null unless ``propertyType`` is set to
``long``.","field":"long"},\{"type":"float","optional":true,"doc":"The value
stored as a float. Null unless ``propertyType`` is set to
``float``.","field":"float"},\{"type":"double","optional":true,"doc":"The value
stored as a double. Null unless ``propertyType`` is set to
``double``.","field":"double"},\{"type":"string","optional":true,"doc":"The
value stored as a string. Null unless ``propertyType`` is set to
``string``.","field":"string"}],"optional":false,"name":"io.confluent.connect.jms.PropertyValue","doc":"This
schema is used to store the data that is found in the properties of the
message. To ensure that the proper type mappings are preserved field
``propertyType`` stores the value type for the field. The corresponding field
in the schema will contain the data for the property. This ensures that the
data is retrievable as the type returned by `Message.getObjectProperty()
<http://docs.oracle.com/javaee/6/api/javax/jms/Message.html#getObjectProperty(java.lang.String)>`_."},"optional":true,"doc":"This
field stores the data from all of the map entries returned from
`MapMessage.getMapNames()
<http://docs.oracle.com/javaee/6/api/javax/jms/MapMessage.html#getMapNames()>`_
for the Message indexed by their
key.","field":"map"},\{"type":"string","optional":true,"doc":"This field stores
the value from `TextMessage.html.getText()
<http://docs.oracle.com/javaee/6/api/javax/jms/TextMessage.html#getText()>`_.","field":"text"}],"optional":false,"name":"io.confluent.connect.jms.Value","doc":"This
schema is used to store the value of the JMS
message."},"payload":\{"messageID":"ID:COPTW_B_SIT.1D815BC7447CE3F:7","messageType":"text","timestamp":1540553873770,"deliveryMode":2,"correlationID":null,"replyTo":null,"destination":{"destinationType":"queue","name":"test.queue"},"redelivered":false,"type":null,"expiration":0,"priority":4,"properties":{},"bytes":null,"map":null,"text":"\{\n
\"Record\":{\n \"Data\":\"COPS TEST76004\"\n }\n}"}}
Connector JSOn
{
"name": "jms_cops_test822_8",
"config": {
"connector.class": "io.confluent.connect.jms.JmsSourceConnector",
"tasks.max":"1",
"kafka.topic":"test_cops",
"jms.destination.name":"test.queue",
"connection.factory.name":"xxx.gcg.tw.COPTW_B_SIT.QueueCF",
"jms.destination.type":"queue",
"java.naming.factory.initial":"com.tibco.tibjms.naming.TibjmsInitialContextFactory",
"java.naming.provider.url":"tcp://xxxx.xxx.nsroot.net:4522",
"java.naming.security.principal":"ec_kfk_dev",
"java.naming.security.credentials":"password",
key.converter.schemas.enable=false
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"confluent.license":"eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOiIwMDZmMTAwMDAwYkd4Tm0iLCJleHAiOjE2MzE4MzY4MDAsImlhdCI6MTUzNzE0MjQwMCwiaXNzIjoiQ29uZmx1ZW50IiwibW9uaXRvcmluZyI6dHJ1ZSwibmI0IjoxNTM3MjI2MjcyLCJzdWIiOiJjb250cm9sLWNlbnRlciJ9.g_rp5SedNu2Tjth85ZoavzOR6HKrMEtEVdoq_dRdNU9F_egxEPj-A8UqG-tzOgbGvd6uvhi5VZYy-pz0xm1llwfz8TV9BXyoFqeDvHOfez6aIxZhO5htyXUYAH_twvhA8osRP_VPN57F80Z-QxT_DRFj3mvlsX3DUQtHUoSJ9C4hPVWFcABnD2Q0awzoIwIoK6vUbp2COsrhWx8cskNWSBwHvYmdQB2yWOToVgVI_fhk-wzvJipJZ9vyDlcK7lyrToj7AIZJMHNESmWqPUD0sJbCOrGswy_sVjWlM3VqT0S1vaCX7H_2ltDCjgpmSff78JDFIhS7xcTWsH8ddVl7Rw",
"confluent.topic.bootstrap.servers":"xxxxx.xxx.xxx:9091,1xxxxx.xxx.xxx:9091,xxxxx.xxx.xxx:9091",
"confluent.topic.ssl.truststore.location":"/kfkapps/xxxx/xxx.cert.jks",
"confluent.topic.ssl.truststore.password":"changeit",
"confluent.topic.ssl.keystore.location":"/kfkapps/xxxx/xxx.cert.jks",
"confluent.topic.ssl.keystore.password":"password",
"confluent.topic.ssl.key.password":"password",
"confluent.topic.security.protocol":"SSL"
}
}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)