GitHub user rhauch opened a pull request:
https://github.com/apache/kafka/pull/4319
[WIP] KAFKA-5142: Add Connect support for message headers (KIP-145)
*NEW PROPOSAL FOR KIP-145... DO NOT MERGE*
Changed the Connect API and runtime to support message headers as described
in KIP-145.
The new `Header` interface defines an immutable representation of a Kafka
header (key-value pair) with support for the Connect value types and schemas.
This interface provides methods for easily converting between many of the
built-in primitive, structured, and logical data types.
The new `Headers` interface defines an ordered collection of headers and is
used to track all headers associated with a `ConnectRecord` (and thus
`SourceRecord` and `SinkRecord`). This does allow multiple headers with the
same key. The `Headers` contains methods for adding, removing, finding, and
modifying headers. Convenience methods allow connectors and transforms to
easily use and modify the headers for a record.
A new `HeaderConverter` interface is also defined to enable the Connect
runtime framework to be able to serialize and deserialize headers between the
in-memory representation and Kafkaâs byte[] representation. A new
`SimpleHeaderConverter` implementation has been added, and this serializes to
strings and deserializes by inferring the schemas (`Struct` header values are
serialized without the schemas, so they can only be deserialized as `Map`
instances without a schema.) The `StringConverter`, `JsonConverter`, and
`ByteArrayConverter` have all been extended to also be `HeaderConverter`
implementations. Each connector can be configured with a different header
converter, although by default the `SimpleHeaderConverter` is used to serialize
header values as strings without schemas.
Unit and integration tests are added for `ConnectHeader` and
`ConnectHeaders`, the two implementation classes for headers. Additional test
methods are added for the methods added to the `Converter` implementations.
Finally, the `ConnectRecord` object is already used heavily, so only limited
tests need to be added while quite a few of the existing tests already cover
the changes.
### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation matches KIP-145
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/rhauch/kafka kafka-5142-b
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/kafka/pull/4319.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #4319
----
commit 1c35692da19f3c8c92ce60946a69f576878b958a
Author: Randall Hauch <[email protected]>
Date: 2017-12-05T17:05:00Z
KAFKA-5142: Add message headers to Connect API (KIP-145)
Changed the Connect API to add message headers as described in KIP-145.
The new `Header` interface defines an immutable representation of a Kafka
header (name-value pair) with support for the Connect value types and schemas.
Kafka headers have a string name and a binary value, which doesnât align well
with Connectâs existing data and schema mechanisms. Thus, Connectâs
`Header` interface provides methods for easily converting between many of the
built-in primitive, structured, and logical data types. And, as discussed
below, a new `HeaderConverter` interface is added to define how the Kafka
header binary values are converted to Connect data objects.
The new `Headers` interface defines an ordered collection of headers and is
used to track all headers associated with a `ConnectRecord`. Like the Kafka
headers API, the Connect `Headers` interface allows storing multiple headers
with the same key in an ordered list. The Connect `Headers` interface is
mutable and has a number of methods that make it easy for connectors and
transformations to add, modify, and remove headers from the record, and the
interface is designed to allow chaining multiple mutating methods.
The existing constructors and methods in `ConnectRecord`, `SinkRecord`, and
`SourceRecord` are unchanged to maintain backward compatibility, and in these
situations the records will contain an empty `Headers` object that connectors
and transforms can modify. There is also an additional constructor that allows
an existing `Headers` to be passed in. A new overloaded form of `newRecord`
method was created to allow connectors and transforms to create a new record
with an entirely new `Headers` object.
A new `HeaderConverter` interface is also defined to enable the Connect
runtime framework to be able to serialize and deserialize headers between the
in-memory representation and Kafkaâs byte[] representation.
Unit and integration tests are added for `ConnectHeader` and
`ConnectHeaders`, the two implementation classes for headers. The
`ConnectRecord` object is already used heavily, so only limited tests need to
be added while quite a few of the existing tests already cover the changes.
However, new unit tests were added for `SinkRecord` and `SourceRecord to verify
the header behavior, including when the `newRecord` methods are called.
commit f398eba326d6c0cc8732770cb3bfc962f0453995
Author: Randall Hauch <[email protected]>
Date: 2017-12-13T01:27:26Z
KAFKA-5142: Add message header converters to Connect API (KIP-145)
This is the second commit for the public Connect API changes for KIP-145,
and deals primarily with `HeaderConverter` implementations.
Connect has three `Converter` implementations, `StringConverter`,
`JsonConverter` and `ByteArrayConverter`. These were modified to also implement
`HeaderConverter`, without changing any of the existing functionality.
Like many of our pluggable components in Connect, the `HeaderConverter`
interface extends `Configurable` that allows implementations to expose a
`ConfigDef` that describes the supported configuration properties, and a
`config` method that can be used to initialize the component with provided
configuration properties. The `StringConverter`, `JsonConverter` and
`ByteArrayConverter` were changed to support these methods in a backward
compatible manner. There are now `StringConverterConfig` and
`JsonConverterConfig` classes that define the `ConfigDef` for the
implementations; the `ByteArrayConverter` has no configuration properties and
doesn't need a config class.
Note that the existing `Converter` interface has a special `config`
signature with a parameter that sas whether the converter is being used for
keys or values. This is different than the `Configurable.config` signature, so
this commit adds new `ConverterConfig` abstract class that defines a
`converter.type` property that can be used to set whether the converter is
being used for keys, values, or headers. The existing `Converter` methods
internally set this property based upon the supplied boolean parameter, so the
default for `converter.type` can be `header`.
commit 14cf25a957ce1a7f0207f3fbdc9da5a30d5f3488
Author: Randall Hauch <[email protected]>
Date: 2017-12-13T01:28:44Z
KAFKA-5142: Add message headers to Connect runtime (KIP-145)
This is the third commit for KIP-145 and changes the Connect runtime to
support headers. Each Connect worker now configures a `HeaderConverter` for
each connector task, in the same way it creates key and value `Converter`
instances. This is entirely backward compatible, so that existing worker and
connector configurations will work without changes. By default, the worker will
use the `SimpleHeaderConverter` to serialize header values as strings and to
deserialize them by inferring the schemas.
----
---