flink??keybykey??tasktask??id
quable
statecheckpoint??apistatetablekv
------
??:Yun Tang
class A extends KeyedProcessFunction
flink??1.10.1
3
??kafka??hdfs
??
part-0-1
part-1-1
part-1-2
...
我看了下,你应该用的是 flink-connector-kafka_2.12 这个依赖包吧
这个的确会出现你说的情况,发到一个分区。
建议使用flink-connector-kafka-{kafka版本}
例如flink-connector-kafka-0.8
--原始邮件--
发件人:
??flink1.11??
----
??:
"user-zh"
目前Kafka
producer的partitioner使用的是org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner这个类
具体的分区方法是org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner#partition
由该方法可以得知你每个subtask发到哪个kafka的partition中。每个subtask的数据只会写到一个固定的partition里面。
join
----
??:"godfrey he"
??yarnyarn??
yarn??standalone??standalone??flink??,
yarnspark??MR??
----
??:"LakeShen"
1. ??operatordisableChainingoperator??
2. tm
----
??:""
??checkpoint??
StreamExecutionEnvironment.getCheckpointConfig().setFailOnCheckpointingErrors(true);
/**
* Sets the expected behaviour for tasks in case that they encounter an error
in their checkpointing procedure.
* If this is set to true, the task will fail
Streaming File Sinkparquet avrobulk writefinal
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(60 * 1000, CheckpointingMode.AT_LEAST_ONCE);
env.setStateBackend(new FsStateBackend(new
??tm??gc??flink1.8
heap10.25GB??heap??1/33.41GB
??heap2442764288byte
??jvm
----
??:"Xintong Song"
??
??tm-XX:NewSize
tm15G -XX:NewSize=2442764288
tm20G ?? -XX:NewSize=2442764288
??
----
??:"Xintong Song"
taskmanager15Gheap10G
tm??1.cutoff(15GB * 0.25)
2.heap(heap15GB - cutoff)
3.offheap(offheap??15GB-heap)
offheap??-XX:MaxDirectMemorySize??
MaxDirectMemorySize??
flink run -m yarn-cluster -yn 10 -ys 1 -p 10 -yn 20 -ys 2 -p
40
??countWindowgloble window??
kafkakafkawindow??
----
??:"Jimmy Wong"
??countWindow
Hi
all,??flink??topicgroup
idkakfaflink??group
id
19 matches
Mail list logo