Hi, I am a university student in South Korea. I am currently working on a project to improve the performance of Kafka streaming SQL using the Kafka adapter in Apache Calcite.
However, even though I followed the example on the official website, no data is being displayed in sqlline. Here is the scenario: 1. Configure model files and class files (code attached below). 2. Connect to Kafka with sqlline. 3. Run the query select stream * from kafka.topic1. 4. Publish messages to the Kafka broker. I expected the data to be displayed, but there was no output at all. I checked if there was a problem with the message delivery to the broker, but it was working fine. I lack the knowledge to solve this problem, so I would like to ask for your help. ----------------------------------------------------------------------- I am currently publishing messages with int keys and string values. Therefore, I modified the rowConverterImpl as follows: public class KafkaRowConverterImpl implements KafkaRowConverter<Integer, String> { /** * Generates the row schema for a given Kafka topic. * * @param topicName Kafka topic name * @return row type */ @Override public RelDataType rowDataType(final String topicName) { final RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); final RelDataTypeFactory.Builder fieldInfo = typeFactory.builder(); fieldInfo.add("MSG_PARTITION", typeFactory.createSqlType(SqlTypeName.INTEGER)).nullable(false); fieldInfo.add("MSG_TIMESTAMP", typeFactory.createSqlType(SqlTypeName.BIGINT)).nullable(false); fieldInfo.add("MSG_OFFSET", typeFactory.createSqlType(SqlTypeName.BIGINT)).nullable(false); fieldInfo.add("MSG_KEY_BYTES", typeFactory.createSqlType(SqlTypeName.INTEGER)).nullable(false); fieldInfo.add("MSG_VALUE_BYTES", typeFactory.createSqlType(SqlTypeName.VARCHAR)) .nullable(false); return fieldInfo.build(); } /** * Parses and reformats a Kafka message from the consumer, to align with the * row schema defined as {@link #rowDataType(String)}. * * @param message Raw Kafka message record * @return fields in the row */ @Override public Object[] toRow(final ConsumerRecord<Integer, String> message) { Object[] fields = new Object[5]; fields[0] = message.partition(); fields[1] = message.timestamp(); fields[2] = message.offset(); fields[3] = message.key(); fields[4] = message.value(); return fields; } } Additionally, I did not set the groupId of the consumer, which caused an error. Therefore, I also randomly assigned a groupId. Here is the updated configuration of the consumer: public class KafkaStreamTable implements ScannableTable, StreamableTable { final KafkaTableOptions tableOptions; KafkaStreamTable(final KafkaTableOptions tableOptions) { this.tableOptions = tableOptions; } @Override public Enumerable<@Nullable Object[]> scan(final DataContext root) { final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root); return new AbstractEnumerable<@Nullable Object[]>() { @Override public Enumerator<@Nullable Object[]> enumerator() { if (tableOptions.getConsumer() != null) { return new KafkaMessageEnumerator(tableOptions.getConsumer(), tableOptions.getRowConverter(), cancelFlag); } Properties consumerConfig = new Properties(); consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, tableOptions.getBootstrapServers()); // by default it's <byte[], byte[]> consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); if (tableOptions.getConsumerParams() != null) { consumerConfig.putAll(tableOptions.getConsumerParams()); } // Add groupid consumerConfig.setProperty("group.id", "my-consumer-group"); Consumer consumer = new KafkaConsumer<>(consumerConfig); consumer.subscribe(Collections.singletonList(tableOptions.getTopicName())); return new KafkaMessageEnumerator(consumer, tableOptions.getRowConverter(), cancelFlag); } }; } The messages I publish are as follows: { "libraryEventId": 1234, "libraryEventType": "UPDATE", "book": { "bookId": 123, "bookName": "Dilip", "bookAuthor": "Kafka Using Spring Boot" } } *My Model file* { "version": "1.0", "defaultSchema": "KAFKA", "schemas": [ { "name": "KAFKA", "tables": [ { "name": "TOPIC1", "type": "custom", "factory": "org.apache.calcite.adapter.kafka.KafkaTableFactory", "operand": { "bootstrap.servers": "localhost:9092", "topic.name": "library-events", "row.converter": "org.apache.calcite.adapter.kafka.KafkaRowConverterImpl", "consumer.params": { "key.deserializer": "org.apache.kafka.common.serialization.IntegerDeserializer", "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer" } } } ] } ] } *Kafka broker Message logs* [image: image.png] *sqlline* [image: image.png] Even though messages were published with the producer, there was no output in the prompt as shown above. Even though the Kafka adapter connected to the broker successfully.