Hi, I am a university student in South Korea. I am currently working on a
project to improve the performance of Kafka streaming SQL using the Kafka
adapter in Apache Calcite.

However, even though I followed the example on the official website, no
data is being displayed in sqlline.

Here is the scenario:

   1. Configure model files and class files (code attached below).
   2. Connect to Kafka with sqlline.
   3. Run the query select stream * from kafka.topic1.
   4. Publish messages to the Kafka broker.

I expected the data to be displayed, but there was no output at all.

I checked if there was a problem with the message delivery to the broker,
but it was working fine.

I lack the knowledge to solve this problem, so I would like to ask for your
help.

-----------------------------------------------------------------------

I am currently publishing messages with int keys and string values.
Therefore, I modified the rowConverterImpl as follows:

public class KafkaRowConverterImpl implements
KafkaRowConverter<Integer, String> {
  /**
   * Generates the row schema for a given Kafka topic.
   *
   * @param topicName Kafka topic name
   * @return row type
   */
  @Override public RelDataType rowDataType(final String topicName) {
    final RelDataTypeFactory typeFactory =
        new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
    final RelDataTypeFactory.Builder fieldInfo = typeFactory.builder();
    fieldInfo.add("MSG_PARTITION",
typeFactory.createSqlType(SqlTypeName.INTEGER)).nullable(false);
    fieldInfo.add("MSG_TIMESTAMP",
typeFactory.createSqlType(SqlTypeName.BIGINT)).nullable(false);
    fieldInfo.add("MSG_OFFSET",
typeFactory.createSqlType(SqlTypeName.BIGINT)).nullable(false);
    fieldInfo.add("MSG_KEY_BYTES",
typeFactory.createSqlType(SqlTypeName.INTEGER)).nullable(false);
    fieldInfo.add("MSG_VALUE_BYTES",
typeFactory.createSqlType(SqlTypeName.VARCHAR))
        .nullable(false);

    return fieldInfo.build();
  }

  /**
   * Parses and reformats a Kafka message from the consumer, to align with the
   * row schema defined as {@link #rowDataType(String)}.
   *
   * @param message Raw Kafka message record
   * @return fields in the row
   */
  @Override public Object[] toRow(final ConsumerRecord<Integer,
String> message) {
    Object[] fields = new Object[5];
    fields[0] = message.partition();
    fields[1] = message.timestamp();
    fields[2] = message.offset();
    fields[3] = message.key();
    fields[4] = message.value();

    return fields;
  }
}

Additionally, I did not set the groupId of the consumer, which caused an
error. Therefore, I also randomly assigned a groupId.

Here is the updated configuration of the consumer:

public class KafkaStreamTable implements ScannableTable, StreamableTable {
  final KafkaTableOptions tableOptions;

  KafkaStreamTable(final KafkaTableOptions tableOptions) {
    this.tableOptions = tableOptions;
  }

  @Override public Enumerable<@Nullable Object[]> scan(final DataContext root) {
    final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
    return new AbstractEnumerable<@Nullable Object[]>() {
      @Override public Enumerator<@Nullable Object[]> enumerator() {
        if (tableOptions.getConsumer() != null) {
          return new KafkaMessageEnumerator(tableOptions.getConsumer(),
              tableOptions.getRowConverter(), cancelFlag);
        }

        Properties consumerConfig = new Properties();
        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
            tableOptions.getBootstrapServers());
        // by default it's <byte[], byte[]>
        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.IntegerDeserializer");
        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");

        if (tableOptions.getConsumerParams() != null) {
          consumerConfig.putAll(tableOptions.getConsumerParams());
        }

        // Add groupid
        consumerConfig.setProperty("group.id", "my-consumer-group");

        Consumer consumer = new KafkaConsumer<>(consumerConfig);
        
consumer.subscribe(Collections.singletonList(tableOptions.getTopicName()));

        return new KafkaMessageEnumerator(consumer,
tableOptions.getRowConverter(), cancelFlag);
      }
    };
  }

The messages I publish are as follows: {
  "libraryEventId": 1234,
  "libraryEventType": "UPDATE",
  "book": {
    "bookId": 123,
    "bookName": "Dilip",
    "bookAuthor": "Kafka Using Spring Boot"
  }
}

*My Model file*

{
  "version": "1.0",
  "defaultSchema": "KAFKA",
  "schemas": [
    {
      "name": "KAFKA",
      "tables": [
        {
          "name": "TOPIC1",
          "type": "custom",
          "factory": "org.apache.calcite.adapter.kafka.KafkaTableFactory",
          "operand": {
            "bootstrap.servers": "localhost:9092",
            "topic.name": "library-events",
            "row.converter":
"org.apache.calcite.adapter.kafka.KafkaRowConverterImpl",
            "consumer.params": {
              "key.deserializer":
"org.apache.kafka.common.serialization.IntegerDeserializer",
              "value.deserializer":
"org.apache.kafka.common.serialization.StringDeserializer"
            }
          }
        }
      ]
    }
  ]
}


*Kafka broker Message logs*
[image: image.png]

*sqlline*
[image: image.png]

Even though messages were published with the producer, there was no output
in the prompt as shown above. Even though the Kafka adapter connected to
the broker successfully.

Reply via email to