davidzollo commented on issue #8388:
URL: https://github.com/apache/seatunnel/issues/8388#issuecomment-2673812869
Streaming task from Kafka to Doris
```
env {
execution.parallelism = 4 # It is recommended to adjust according to the
number of Kafka partitions, keeping it consistent with the partition count
job.mode = "STREAMING"
checkpoint.interval = 30000
checkpoint.timeout = 600000
# Your current rate limits seem high but reasonable, ~700MB/s
read_limit.bytes_per_second=700000000
read_limit.rows_per_second=40000
}
source {
Kafka {
result_table_name = "kafka_log"
#Kafka server address
bootstrap.servers = "xxxxx"
topic = "xxxx"
consumer.group = "kafka2table"
start_mode = "earliest"
kafka.config = {
"fetch.min.bytes" = "1048576" # 1MB, increase the minimum batch
fetch size
"fetch.max.wait.ms" = "500" # Wait time when data is
insufficient
"max.partition.fetch.bytes" = "5242880" # 5MB, maximum data
fetch per partition
"max.poll.records" = "5000" # Maximum number of records per
poll
"isolation.level" = "read_committed" # Ensure data consistency
}
format = json
schema={
fields={
ev=STRING
pg=STRING
uuid=STRING
userId=bigint
fromDevice=STRING
ip=STRING
source=STRING
np=STRING
lp=STRING
tg=STRING
ch=STRING
v=STRING
nt=STRING
wifi=STRING
dbd=STRING
dmd=STRING
bs=STRING
browser_version=STRING
ext=STRING
sid=STRING
timestamp=bigint
reporttime=bigint
}
}
}
}
transform {
Sql {
source_table_name = "kafka_log"
result_table_name = "log"
query = "select ev as event,pg as page,uuid,userId as userid,fromDevice
as platform,ip,source,np as nextpage,lp as lastpage,tg as target,ch as
channel,v as version,nt as network,wifi,dbd as device_brand,dmd as
device_model,bs as browser,browser_version,ext as extra,sid as
sessionid,timestamp,reporttime,CURRENT_DATE as dt from kafka_log"
}
}
sink {
Doris {
source_table_name = "log"
fenodes = "dxxx"
username = xxx
password = "xxxx"
table.identifier = "ods.ods_log"
sink.label-prefix = "log"
sink.enable-2pc = "false"
doris.batch.size = 500000
sink.buffer-size = 104857600
sink.max-retries = 5
doris.config {
format="json"
read_json_by_line="true"
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]