This is not related to Flink, but in Beam you can read from a directory
containing many files using something like this (from
in Beam):


This will read all the files in the directory in parallel.

For reading from Kafka I wrote this on another thread of yours:
Are you sure that all your Kafka partitions contain data. Did you have a
look at the Kafka metrics to see how the individual partitions are filled?
If only one partition contains data, then you will only read data in one
parallel instance of the sources. How are you writing your data to Kafka?

Flink/Beam should read from all partitions if all of them contain data.
Could you please verify that all Kafka partitions contain data by looking
at the metrics of your Kafka cluster, that would be a first step towards
finding out where the problem lies.

By the way, your code uses Beam in a highly non-idiomatic way. Interacting
with an outside database, such as Redis, will always be the bottleneck in
such a job. Flink provides an abstraction for dealing with state that is
vastly superior to using an external system. We recently did a blog post
about rewriting a similar streaming use case using Flink's internal state:, maybe
that's interesting for you.


On Sat, 17 Sep 2016 at 19:30 Amir Bahmanyari <> wrote:

> Thanks so much Aljoscha
> Is there an example that shows how to read from multiple files accurately
> or from KafkaIO and get perfect parallelism pls?
> Have a great weekend
> Sent from my iPhone
> On Sep 17, 2016, at 5:39 AM, Aljoscha Krettek <> wrote:
> One observation here is that you're only reading from one file. This will
> mean that you won't get any parallelism. Everything is executed on just one
> task/thread.
> Cheers,
> Aljoscha
> On Thu, 15 Sep 2016 at 01:24 amir bahmanyari <> wrote:
>> Hi Aljoscha,
>> Experimenting on  relatively smaller file , everything fixed except
>> KafkaIO()  vs. TextIO(), I get 50% better runtime performance in the Flink
>> Cluster when reading tuples by TextIO().
>> I understand the NW involvement in reading from Kafka topic etc.,  but
>> 50% is significant.
>> Also, I experimented 64 partitions in Kafka topic vs. 400. I get exact
>> same performance & increasing the topic partitions doesnt improve anything.
>> I thought some of the 64 slots may get multiple-over- parallelism really
>> pushing it to its limit. 64 kafka topic partitions & 400 kafka topic
>> partitions while #slots=64  is the same.
>> Its still slow for a relatively large file though.
>> Pls advice if something I can try to improve the cluster performance.
>> Thanks+regards
>> ------------------------------
>> *From:* Aljoscha Krettek <>
>> *To:*; amir bahmanyari <>
>> *Sent:* Wednesday, September 14, 2016 1:48 AM
>> *Subject:* Re: Fw: Flink Cluster Load Distribution Question
>> Hi,
>> this is a different job from the Kafka Job that you have running, right?
>> Could you maybe post the code for that as well?
>> Cheers,
>> Aljoscha
>> On Tue, 13 Sep 2016 at 20:14 amir bahmanyari <> wrote:
>> Hi Robert,
>> Sure, I am forwarding it to user. Sorry about that. I followed the
>> "robot's" instructions :))
>> Topology: 4 Azure A11 CentOS 7 nodes (16 cores, 110 GB). Lets call them
>> node1, 2, 3, 4.
>> Flink Clustered with node1 running JM & a TM. Three more TM's running on
>> node2,3, and 4 respectively.
>> I have a Beam running FLink Runner underneath.
>> The input data is received by Beam TextIO() reading off a 1.6 GB of data
>> containing roughly 22 million tuples.
>> *All nodes have identical flink-conf.yam*l, masters & slaves contents as
>> follows:
>> *flink-conf.yaml:*
>>         jobmanager.rpc.address: node1
>> jobmanager.rpc.port: 6123
>> jobmanager.heap.mb: 1024
>> taskmanager.heap.mb: 102400
>> taskmanager.numberOfTaskSlots: 16
>> taskmanager.memory.preallocate: false
>> parallelism.default: 64
>> jobmanager.web.port: 8081
>> 4096
>> *masters*:
>> node1:8081
>> *slaves*:
>> node1
>> node2
>> node3
>> node4
>> Everything looks normal at ./ & all daemons start on all
>> nodes.
>> JM, TMs log files get generated on all nodes.
>> Dashboard shows how all slots are being used.
>> I deploy the Beam app to the cluster where JM is running at node1.
>> a *.out file gets generated as data is being processed. No *.out on other
>> nodes, just node1 where I deployed the fat jar.
>> I tail -f the *.out log on node1 (master). starts fine...but slowly
>> degrades & becomes extremely slow.
>> As we speak, I started the Beam app 13 hrs ago and its still running.
>> How can I prove that ALL NODES are involved in processing the data at the
>> same time i.e. clustered?
>> Do the above configurations look ok for a reasonable performance?
>> Given above parameters set, how can I improve the performance in this
>> cluster?
>> What other information and or dashboard screen shots is needed to clarify
>> this issue.
>> I used these websites to do the configuration:
>> Apache Flink: Cluster Setup
>> <>
>> Apache Flink: Cluster Setup
>> <>
>> Apache Flink: Configuration
>> <>
>> Apache Flink: Configuration
>> <>
>> In the second link, there is a config recommendation for the following
>> but this parameter is not in the configuration file out of the box:
>>    -
>> Should I include it manually? Does it make any difference if the default
>> value i.e.32 KB doesn't get picked up?
>> Sorry too many questions.
>> Pls let me know.
>> I appreciate your help.
>> Cheers,
>> Amir-
>> ----- Forwarded Message -----
>> *From:* Robert Metzger <>
>> *To:* "" <>; amir bahmanyari <
>> *Sent:* Tuesday, September 13, 2016 1:15 AM
>> *Subject:* Re: Flink Cluster Load Distribution Question
>> Hi Amir,
>> I would recommend to post such questions to the user@flink mailing list
>> in
>> the future. This list is meant for development-related topics.
>> I think we need more details to understand why your application is not
>> running properly. Can you quickly describe what your topology is doing?
>> Are you setting the parallelism to a value >= 1 ?
>> Regards,
>> Robert
>> On Tue, Sep 13, 2016 at 6:35 AM, amir bahmanyari <
>>> wrote:
>> > Hi Colleagues,Just joined this forum.I have done everything possible to
>> > get a 4 nodes Flink cluster to work peoperly & run a Beam app.It always
>> > generates system-output logs (*.out) in only one node. Its sooooooooo
>> slow
>> > for 4 nodes being there.Seems like the load is not distributed amongst
>> all
>> > 4 nodes but only one node. Most of the time the one where JM runs.I
>> > run/tested it in a single node, and it took even faster to run the same
>> > load.Not sure whats not being configured right.1- why am I getting
>> > SystemOut .out log in only one server? All nodes get their TaskManager
>> log
>> > files updated thu.2- why dont I see load being distributed amongst all 4
>> > nodes, but only one all the times.3- Why does the Dashboard show a 0
>> (zero)
>> > for Send/Receive numbers per all Task Managers.
>> > The Dashboard shows all the right stuff. Top shows not much of resources
>> > being stressed on any of the nodes.I can share its contents if it helps
>> > diagnosing the issue.Thanks + I appreciate your valuable time, response
>> &
>> > help.Amir-

Reply via email to