Hi Tovi,

your code looks OK to me. Maybe Gordon (in CC) has an idea what is going
wrong.
Just a side note: you don't need to set the parallelism to 2 to read from
two partitions. A single consumer instance reads can read from multiple
partitions.

Best,
Fabian

2017-09-19 17:02 GMT+02:00 Sofer, Tovi <tovi.so...@citi.com>:

> Hi,
>
>
>
> I am trying to setup FlinkKafkaConsumer which reads from two partitions in
> local mode, using  setParallelism=2.
>
> The producer writes to two partition (as it is shown in metrics report).
>
> But the consumer seems to read always from one partition only.
>
> Am I missing something in partition configuration?
>
>
>
> Code:
>
>
>
> *Producer setup:*
> Configuration localConfig = *new *Configuration();
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.*createLocalEnvironment*(parallelism, localConfig);
>
> env.setParallelism(2);
>
> String kafkaPort = 
> parameters.get(SimulatorConfig.ParamsEnum.*KAFKA_PORT*.fullName());
>
> SingleOutputStreamOperator<String> fixMsgSource = 
> env.addSource(srcMsgProvider.getFixMsgSource(), 
> TypeInformation.*of*(String.*class*)).name(*sourceGenerationType*.getValue());
> fixMsgSource.addSink(*new *FlinkKafkaProducer010<>(*"localhost:"  *+ 
> kafkaPort, *TOPIC_NAME*, *new *SimpleStringSchema()))
>
> .name(*“fix_topic”*);
>
> env.execute(“*MsgSimulatorJob*”);
>
>
>
>
>
> *Consumer setup:*
>
>
> String topicName = “fix”;
> Configuration conf = *new *Configuration();
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.*createLocalEnvironmentWithWebUI*(conf);
>
> env.setParallelism(2);
> env.getConfig().setGlobalJobParameters(configParams);
> *// make parameters available in the web 
> interface*DeserializationSchema<Tuple2<Long, String>> deserializationSchema = 
> *new *SimpleStringAndTimestampDeserializationSchema ();
> FlinkKafkaConsumer010<Tuple2<Long, String>> kafkaConsumer = *new 
> *FlinkKafkaConsumer010<>(topicName, deserializationSchema, 
> kafkaParams.getProperties());
>
> DataStream<Tuple2<Long, String>> fixMessagesStream = 
> env.addSource(kafkaConsumer).name(*"fix_topic"*).setParallelism(2);
>
>
>
> As you can see in output, only 1 consumer partition seems to be used:
>
> Producer output:
>
> 2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source:
> random -> Sink: fix_topic.1.numRecordsInPerSecond: 0.0
>
> 2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink:
> fix_topic.1.numRecordsInPerSecond: 19836.033333333333
>
> 2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink:
> fix_topic.0.numRecordsInPerSecond: 20337.933333333334
>
> 2017-09-19 14:40:45,819 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source:
> random -> Sink: fix_topic.0.numRecordsInPerSecond: 0.0
>
> Consumer output:
>
> 2017-09-19 14:40:45,116 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.KafkaConsumer.select-rate: 1928.1421153709368
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.KafkaConsumer.commit-rate: 0.21623491761449637
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.KafkaConsumer.outgoing-byte-rate:
> 982.0051413881748
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.KafkaConsumer.sync-rate: 0.0
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.KafkaConsumer.io-ratio: 0.01148712465103046
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.numRecordsOutPerSecond: 6625.266666666666
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.numRecordsInPerSecond: 0.0
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.numBytesOutPerSecond: 1.40222884E7
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.0.numBytesInRemotePerSecond: 0.0
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.0.numBytesOutPerSecond: 10.5
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.0.numBytesInLocalPerSecond: 0.0
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.0.numRecordsOutPerSecond: 0.0
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.numBytesInRemotePerSecond: 0.0
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.0.numRecordsOutPerSecond: 0.0
>
>
>
>
>
> Thanks and regards,
>
> Tovi
>

Reply via email to