Hi!

Thanks for your answer, great insights.
It would be great if you could open a Bug for that. I'll have to go with the standalone version for now either way. I will try to implement a Kafka converter for that as you suggested. Let's see if I can get it running!

Best Regards
Svonn


Am 21.11.2017 16:37 schrieb Alexey Kukushkin:
Hi,

        * IgniteSinkTask source code [3] shows that Ignite Kafka Connector
has this "feature" that stopping any task would stop all other tasks
running in the same worker. If you look into the stop() method you see
that the Ignite instances is static and it is closed in the stop
method. This looks like a usability issue to me. Feel free to open a
bug or I can open it for you.
I was able to use multiple sink connectors by running them using the
apache kafka's connect-standalone.sh script that allows you starting
multiple connectors. But if I close one of them it really makes the
other ones unusable making them show that "datastream is already
closed" error.
I never used Confluence Control center before so I am not sure why
starting second connector stops the first one. Anyway, as a
workaround, you can use the connect-standalone.sh or
connect-distributed.sh scripts to start the connectors.

        * Ignite Kafka Connector stores Kafka's key/value SourceRecord as
Ignite's key/value Cache.Entry. Unlike Kafka, which allows key to be
null, Ignite does not support null keys. You will have an error if
your Kafka SourceRecord's key is null. You can work around null keys
by providing an "extractor" that builds Cache.Entry from a value. For
example, this extractor creates keys as value's hash codes and
converts values to byte arrays of the value's string representation
(just an example, nothing practical):

public class HashCodeKeyValueBuilder implements
StreamSingleTupleExtractor<SinkRecord, Integer, byte[]> {
@Override public Map.Entry<Integer, byte[]> extract(SinkRecord msg)
{
return new AbstractMap.SimpleEntry<>(msg.value().hashCode(),
msg.value().toString().getBytes());
}

}

Then you configure extractor like:

singleTupleExtractorCls=my.domain.HashCodeKeyValueBuilder

StreamSingleTupleExtractor is defined inside ignite-core.jar so you
do not need additional dependencies.

Ignite stores custom objects in a cross-platform binary format [1],
simple types and arrays of simple types as is.

As I understood in your case you have a not-null String key and
Byte[] value. In this case Ignite will store this as is so that you
create your scan query something like

Query<Cache.Entry<Integer, byte[]>> query = new ScanQuery<>(...)

and then deserialise your byte[] array using your custom
deserialiser. This has a huge disadvantage that you cannot
efficiently work with the objects in Ignite since Ignite does not
know about your custom format.

Another option is having custom Kafka converter [2] that would
deserialise byte[] array inside Kafka and store them as CustomObject
in Ignite. In this case the object would be stored in binary format
(you will see o.a.i.i.binary.BinaryObjectImpl in the scan query
results). You can work with such objects directly using binary API
(most efficient, no serialisation involved, no CustomObject jars
required on Ignite server nodes) or work with CustomObject using
static type API (less efficient, CustomObject has to be deployed).




Links:
------
[1] https://apacheignite.readme.io/docs/binary-marshaller
[2]
https://kafka.apache.org/0110/javadoc/org/apache/kafka/connect/storage/Converter.html
[3]
https://github.com/gridgain/apache-ignite/blob/cbada5964ee12a8dc44a7db8797f91709b70d831/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java

Reply via email to