Hi devs,

Recently I am testing Kafka Source. And I find the if we not set `schema` 
option. it will give a default schema. When use this default schema, the final 
data is different with original.  
If we not set `schema` option, it will set a default schema, type is 
`content<Row<content<String>>>` [1]  

    Then if the kafka have record like this  
    ```
    abcde
    xyz
    ```

    And use this config to run a job.

    ```
    env {
    parallelism = 1
    job.mode = "BATCH"
    }

    source {
    Kafka {
    plugin_output = "fake1"
    format = text
    topic = “test_topic"
    bootstrap.servers = ""
    start_mode = "earliest"
    kafka.config = {
    client.id = client_1
    max.poll.records = 500
    auto.offset.reset = "earliest"
    enable.auto.commit = "false"
    }
    }
    }

    transform {
    }

    sink {
    Console {
    plugin_input = "fake1"
    }
    }
    ```


    The result will be

    ```
    sink.ConsoleSinkWriter - subtaskIndex=0  rowIndex=1:  
SeaTunnelRow#tableId=Optional[test_topic] SeaTunnelRow#kind=INSERT : [abcde]
    sink.ConsoleSinkWriter - subtaskIndex=0  rowIndex=1:  
SeaTunnelRow#tableId=Optional[test_topic] SeaTunnelRow#kind=INSERT : [xyz]

    ```

    It will add `[]` to the original value.


Can someone explain why it was designed this way?



I rise a pr [2] to update the default schema to `content<String>`, after this 
pr the above job result will be same with original value.


Please let me know if you have any questions.




[1] 
https://github.com/apache/seatunnel/blob/dev/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java#L208
[2] https://github.com/apache/seatunnel/pull/8642/files




— 

Best Regards
Jarvis

Reply via email to