Hi, I am a university student in South Korea. I am currently working on a
project to improve the performance of Kafka streaming SQL using the Kafka
adapter in Apache Calcite.
However, even though I followed the example on the official website, no
data is being displayed in sqlline.
Here is the scenario:
1. Configure model files and class files (code attached below).
2. Connect to Kafka with sqlline.
3. Run the query select stream * from kafka.topic1.
4. Publish messages to the Kafka broker.
I expected the data to be displayed, but there was no output at all.
I checked if there was a problem with the message delivery to the broker,
but it was working fine.
I lack the knowledge to solve this problem, so I would like to ask for your
help.
-----------------------------------------------------------------------
I am currently publishing messages with int keys and string values.
Therefore, I modified the rowConverterImpl as follows:
public class KafkaRowConverterImpl implements
KafkaRowConverter<Integer, String> {
/**
* Generates the row schema for a given Kafka topic.
*
* @param topicName Kafka topic name
* @return row type
*/
@Override public RelDataType rowDataType(final String topicName) {
final RelDataTypeFactory typeFactory =
new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
final RelDataTypeFactory.Builder fieldInfo = typeFactory.builder();
fieldInfo.add("MSG_PARTITION",
typeFactory.createSqlType(SqlTypeName.INTEGER)).nullable(false);
fieldInfo.add("MSG_TIMESTAMP",
typeFactory.createSqlType(SqlTypeName.BIGINT)).nullable(false);
fieldInfo.add("MSG_OFFSET",
typeFactory.createSqlType(SqlTypeName.BIGINT)).nullable(false);
fieldInfo.add("MSG_KEY_BYTES",
typeFactory.createSqlType(SqlTypeName.INTEGER)).nullable(false);
fieldInfo.add("MSG_VALUE_BYTES",
typeFactory.createSqlType(SqlTypeName.VARCHAR))
.nullable(false);
return fieldInfo.build();
}
/**
* Parses and reformats a Kafka message from the consumer, to align with the
* row schema defined as {@link #rowDataType(String)}.
*
* @param message Raw Kafka message record
* @return fields in the row
*/
@Override public Object[] toRow(final ConsumerRecord<Integer,
String> message) {
Object[] fields = new Object[5];
fields[0] = message.partition();
fields[1] = message.timestamp();
fields[2] = message.offset();
fields[3] = message.key();
fields[4] = message.value();
return fields;
}
}
Additionally, I did not set the groupId of the consumer, which caused an
error. Therefore, I also randomly assigned a groupId.
Here is the updated configuration of the consumer:
public class KafkaStreamTable implements ScannableTable, StreamableTable {
final KafkaTableOptions tableOptions;
KafkaStreamTable(final KafkaTableOptions tableOptions) {
this.tableOptions = tableOptions;
}
@Override public Enumerable<@Nullable Object[]> scan(final DataContext root) {
final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
return new AbstractEnumerable<@Nullable Object[]>() {
@Override public Enumerator<@Nullable Object[]> enumerator() {
if (tableOptions.getConsumer() != null) {
return new KafkaMessageEnumerator(tableOptions.getConsumer(),
tableOptions.getRowConverter(), cancelFlag);
}
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
tableOptions.getBootstrapServers());
// by default it's <byte[], byte[]>
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerDeserializer");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
if (tableOptions.getConsumerParams() != null) {
consumerConfig.putAll(tableOptions.getConsumerParams());
}
// Add groupid
consumerConfig.setProperty("group.id", "my-consumer-group");
Consumer consumer = new KafkaConsumer<>(consumerConfig);
consumer.subscribe(Collections.singletonList(tableOptions.getTopicName()));
return new KafkaMessageEnumerator(consumer,
tableOptions.getRowConverter(), cancelFlag);
}
};
}
The messages I publish are as follows: {
"libraryEventId": 1234,
"libraryEventType": "UPDATE",
"book": {
"bookId": 123,
"bookName": "Dilip",
"bookAuthor": "Kafka Using Spring Boot"
}
}
*My Model file*
{
"version": "1.0",
"defaultSchema": "KAFKA",
"schemas": [
{
"name": "KAFKA",
"tables": [
{
"name": "TOPIC1",
"type": "custom",
"factory": "org.apache.calcite.adapter.kafka.KafkaTableFactory",
"operand": {
"bootstrap.servers": "localhost:9092",
"topic.name": "library-events",
"row.converter":
"org.apache.calcite.adapter.kafka.KafkaRowConverterImpl",
"consumer.params": {
"key.deserializer":
"org.apache.kafka.common.serialization.IntegerDeserializer",
"value.deserializer":
"org.apache.kafka.common.serialization.StringDeserializer"
}
}
}
]
}
]
}
*Kafka broker Message logs*
[image: image.png]
*sqlline*
[image: image.png]
Even though messages were published with the producer, there was no output
in the prompt as shown above. Even though the Kafka adapter connected to
the broker successfully.