r-reis opened a new issue, #15262:
URL: https://github.com/apache/pinot/issues/15262
I'm trying to use pinot stream Kafka data, but the schema registry from
Confluent only works for Avro and Protobuf.
JSON messages work as long as you do not use the JSON schema.
How to reproduce the error.
docker-compose:
```
services:
broker:
image: confluentinc/cp-kafka:7.9.0
hostname: broker
container_name: broker
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP:
"CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
KAFKA_ADVERTISED_LISTENERS:
"PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@broker:29093"
KAFKA_LISTENERS:
"PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092"
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs"
# Replace CLUSTER_ID with a unique base64 UUID using
"bin/kafka-storage.sh random-uuid"
# See
https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"
schema-registry:
image: confluentinc/cp-schema-registry:7.9.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "broker:29092"
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
connect:
image: cnfldemos/cp-server-connect-datagen:0.6.4-7.6.0
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "broker:29092"
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL:
http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH:
/usr/share/java/monitoring-interceptors/monitoring-interceptors-7.9.0.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES:
"io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES:
"io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH:
"/usr/share/java,/usr/share/confluent-hub-components"
control-center:
image: confluentinc/cp-enterprise-control-center:7.9.0
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
- ksqldb-server
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: "broker:29092"
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: "connect:8083"
CONTROL_CENTER_CONNECT_HEALTHCHECK_ENDPOINT: "/connectors"
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
ksqldb-server:
image: confluentinc/cp-ksqldb-server:7.9.0
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
KSQL_HOST_NAME: ksqldb-server
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
KSQL_PRODUCER_INTERCEPTOR_CLASSES:
"io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
KSQL_CONSUMER_INTERCEPTOR_CLASSES:
"io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
ksqldb-cli:
image: confluentinc/cp-ksqldb-cli:7.9.0
container_name: ksqldb-cli
depends_on:
- broker
- connect
- ksqldb-server
entrypoint: /bin/sh
tty: true
ksql-datagen:
image: confluentinc/ksqldb-examples:7.9.0
hostname: ksql-datagen
container_name: ksql-datagen
depends_on:
- ksqldb-server
- broker
- schema-registry
- connect
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b broker:29092 1 40 && \
echo Waiting for Confluent Schema Registry to be ready... && \
cub sr-ready schema-registry 8081 40 && \
echo Waiting a few seconds for topic creation to finish... && \
sleep 11 && \
tail -f /dev/null'"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
STREAMS_BOOTSTRAP_SERVERS: broker:29092
STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
STREAMS_SCHEMA_REGISTRY_PORT: 8081
rest-proxy:
image: confluentinc/cp-kafka-rest:7.9.0
depends_on:
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: "broker:29092"
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
zookeeper:
image: bitnami/zookeeper:latest
ports:
- 2181:2181
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
pinot-controller:
image: apachepinot/pinot:latest
command: "StartController -zkAddress zookeeper:2181"
container_name: "pinot-controller"
restart: unless-stopped
ports:
- "9000:9000"
pinot-broker:
image: apachepinot/pinot:latest
command: "StartBroker -zkAddress zookeeper:2181"
restart: unless-stopped
container_name: "pinot-broker"
ports:
- "8099:8099"
depends_on:
- pinot-controller
pinot-server:
image: apachepinot/pinot:latest
command: "StartServer -zkAddress zookeeper:2181"
restart: unless-stopped
container_name: "pinot-server"
depends_on:
- pinot-broker
networks:
default:
driver: bridge
```
After the services start, go to the portal on localhost:9021
Create a topic called "topic_1", define it with JSON schema. ex:
```
{
"properties": {
"created_at": {
"connect.index": 4,
"connect.type": "int64",
"connect.version": 1,
"default": 0,
"title": "io.debezium.time.Timestamp",
"type": "integer"
},
"id": {
"connect.index": 0,
"connect.type": "int32",
"default": 0,
"type": "integer"
}
},
"title": "topic_1.Value",
"type": "object"
}
```
Post a message using the schema (this can be done after or before setting up
pinots table)
```
{
"id": 1,
"created_at": 1734980659034
}
```
After that, create the schema and table on pinot:
```
{
"schemaName": "topic_1",
"dimensionFieldSpecs": [
{
"name": "id",
"dataType": "INT"
}
],
"dateTimeFieldSpecs": [
{
"name": "created_at",
"dataType": "LONG",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
```
table config:
```
{
"tableName": "topic_1",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "created_at",
"timeType": "MILLISECONDS",
"replicasPerPartition": "1"
},
"tenants": {},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"stream.kafka.metadata.populate" : "true",
"streamType": "kafka",
"stream.kafka.decoder.prop.format": "JSON",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.topic.name": "topic_1",
"stream.kafka.decoder.class.name":
"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
"stream.kafka.consumer.factory.class.name":
"org.apache.pinot.plugin.stream.kafka30.KafkaConsumerFactory",
"stream.kafka.schema.registry.url":
"http://schema-registry:8081",
"stream.kafka.decoder.prop.schema.registry.rest.url":
"http://schema-registry:8081",
"stream.kafka.broker.list": "broker:29092",
"realtime.segment.flush.threshold.rows": "0",
"realtime.segment.flush.threshold.time": "24h",
"realtime.segment.flush.threshold.segment.size": "50M",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"key.serializer":
"shaded.org.apache.kafka.connect.storage.StringDeserializer",
"value.serializer":
"shaded.org.apache.kafka.connect.storage.StringDeserializer"
}
},
"ingestionConfig": {
"continueOnError": true
},
"metadata": {
"customConfigs": {}
}
}
```
My guess is that since we have the Avro and Protbuf decoder for confluent we
also need the JSON one:
https://github.com/confluentinc/schema-registry/blob/master/json-schema-serializer/src/main/java/io/confluent/kafka/serializers/json/KafkaJsonSchemaDeserializer.java
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]