Yes, this should be a bugs. Also we should add a new format: native
``` Row<topic<string>, parttion<int>, headers<map<string,string>>, key<string>, value<string>> ``` jarvis <[email protected]> 于2025年2月12日周三 09:43写道: > Hi devs, > > Recently I am testing Kafka Source. And I find the if we not set `schema` > option. it will give a default schema. When use this default schema, the > final data is different with original. > If we not set `schema` option, it will set a default schema, type is > `content<Row<content<String>>>` [1] > > Then if the kafka have record like this > ``` > abcde > xyz > ``` > > And use this config to run a job. > > ``` > env { > parallelism = 1 > job.mode = "BATCH" > } > > source { > Kafka { > plugin_output = "fake1" > format = text > topic = “test_topic" > bootstrap.servers = "" > start_mode = "earliest" > kafka.config = { > client.id = client_1 > max.poll.records = 500 > auto.offset.reset = "earliest" > enable.auto.commit = "false" > } > } > } > > transform { > } > > sink { > Console { > plugin_input = "fake1" > } > } > ``` > > > The result will be > > ``` > sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=1: > SeaTunnelRow#tableId=Optional[test_topic] SeaTunnelRow#kind=INSERT : [abcde] > sink.ConsoleSinkWriter - subtaskIndex=0 rowIndex=1: > SeaTunnelRow#tableId=Optional[test_topic] SeaTunnelRow#kind=INSERT : [xyz] > > ``` > > It will add `[]` to the original value. > > > Can someone explain why it was designed this way? > > > > I rise a pr [2] to update the default schema to `content<String>`, after > this pr the above job result will be same with original value. > > > Please let me know if you have any questions. > > > > > [1] > https://github.com/apache/seatunnel/blob/dev/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java#L208 > [2] https://github.com/apache/seatunnel/pull/8642/files > > > > > — > > Best Regards > Jarvis > >
