Hi Tovi, your code looks OK to me. Maybe Gordon (in CC) has an idea what is going wrong. Just a side note: you don't need to set the parallelism to 2 to read from two partitions. A single consumer instance reads can read from multiple partitions.
Best, Fabian 2017-09-19 17:02 GMT+02:00 Sofer, Tovi <tovi.so...@citi.com>: > Hi, > > > > I am trying to setup FlinkKafkaConsumer which reads from two partitions in > local mode, using setParallelism=2. > > The producer writes to two partition (as it is shown in metrics report). > > But the consumer seems to read always from one partition only. > > Am I missing something in partition configuration? > > > > Code: > > > > *Producer setup:* > Configuration localConfig = *new *Configuration(); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.*createLocalEnvironment*(parallelism, localConfig); > > env.setParallelism(2); > > String kafkaPort = > parameters.get(SimulatorConfig.ParamsEnum.*KAFKA_PORT*.fullName()); > > SingleOutputStreamOperator<String> fixMsgSource = > env.addSource(srcMsgProvider.getFixMsgSource(), > TypeInformation.*of*(String.*class*)).name(*sourceGenerationType*.getValue()); > fixMsgSource.addSink(*new *FlinkKafkaProducer010<>(*"localhost:" *+ > kafkaPort, *TOPIC_NAME*, *new *SimpleStringSchema())) > > .name(*“fix_topic”*); > > env.execute(“*MsgSimulatorJob*”); > > > > > > *Consumer setup:* > > > String topicName = “fix”; > Configuration conf = *new *Configuration(); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.*createLocalEnvironmentWithWebUI*(conf); > > env.setParallelism(2); > env.getConfig().setGlobalJobParameters(configParams); > *// make parameters available in the web > interface*DeserializationSchema<Tuple2<Long, String>> deserializationSchema = > *new *SimpleStringAndTimestampDeserializationSchema (); > FlinkKafkaConsumer010<Tuple2<Long, String>> kafkaConsumer = *new > *FlinkKafkaConsumer010<>(topicName, deserializationSchema, > kafkaParams.getProperties()); > > DataStream<Tuple2<Long, String>> fixMessagesStream = > env.addSource(kafkaConsumer).name(*"fix_topic"*).setParallelism(2); > > > > As you can see in output, only 1 consumer partition seems to be used: > > Producer output: > > 2017-09-19 14:40:45,818 INFO - [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: > random -> Sink: fix_topic.1.numRecordsInPerSecond: 0.0 > > 2017-09-19 14:40:45,818 INFO - [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: > fix_topic.1.numRecordsInPerSecond: 19836.033333333333 > > 2017-09-19 14:40:45,818 INFO - [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: > fix_topic.0.numRecordsInPerSecond: 20337.933333333334 > > 2017-09-19 14:40:45,819 INFO - [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: > random -> Sink: fix_topic.0.numRecordsInPerSecond: 0.0 > > Consumer output: > > 2017-09-19 14:40:45,116 INFO - [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming > Job.Source: fix_topic.1.KafkaConsumer.select-rate: 1928.1421153709368 > > 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming > Job.Source: fix_topic.1.KafkaConsumer.commit-rate: 0.21623491761449637 > > 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming > Job.Source: fix_topic.1.KafkaConsumer.outgoing-byte-rate: > 982.0051413881748 > > 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming > Job.Source: fix_topic.1.KafkaConsumer.sync-rate: 0.0 > > 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming > Job.Source: fix_topic.1.KafkaConsumer.io-ratio: 0.01148712465103046 > > 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming > Job.Source: fix_topic.1.numRecordsOutPerSecond: 6625.266666666666 > > 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming > Job.Source: fix_topic.1.numRecordsInPerSecond: 0.0 > > 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming > Job.Source: fix_topic.1.numBytesOutPerSecond: 1.40222884E7 > > 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming > Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0 > > 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming > Job.Source: fix_topic.0.numBytesInRemotePerSecond: 0.0 > > 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming > Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0 > > 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming > Job.Source: fix_topic.0.numBytesOutPerSecond: 10.5 > > 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming > Job.Source: fix_topic.0.numBytesInLocalPerSecond: 0.0 > > 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming > Job.Source: fix_topic.0.numRecordsOutPerSecond: 0.0 > > 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming > Job.Source: fix_topic.1.numBytesInRemotePerSecond: 0.0 > > 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] > 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming > Job.Source: fix_topic.0.numRecordsOutPerSecond: 0.0 > > > > > > Thanks and regards, > > Tovi >