Anyone?

On Fri, 6 Dec 2019 at 19:07, Komal Mariam <komal.mar...@gmail.com> wrote:

> Hello everyone,
>
> I want to get some insights on the KeyBy (and Rebalance) operations as
> according to my understanding they partition our tasks over the defined
> parallelism and thus should make our pipeline faster.
>
> I am reading a topic which contains 170,000,000 pre-stored records with 11
> Kafka partitions and replication factor of 1.   Hence I use
> .setStartFromEarliest() to read the stream.
> My Flink is a 4 node cluster with 3 taskmanagers, each having 10 cores and
> 1 job manager with 6 cores. (10 task slots per TM hence I set environment
> parallelism to 30).
>
> There are about 10,000 object IDs hence 10,000 keys.  Right now I'm
> keeping the number of records fixed to get a handle on how fast they're
> being processed.
>
> When I remove keyBy, I get the same results in 39 secs as opposed to 52
> secs with KeyBy. Infact, even when I vary the parallelism down to 10 or
> below I still get the same extra overhead of 9 to 13secs. My data is mostly
> uniformly distributed on it's key so I can rule out skew.  Rebalance
> likewise has the same latency as keyBy.
>
>  What I want to know is what may be causing this overhead? And is there
> any way to decrease it?
>
> Here's the script I'm running for testing purposes:
> --------------
> DataStream JSONStream  = env.addSource(new FlinkKafkaConsumer<>("data",
> new
> JSONKeyValueDeserializationSchema(false),properties).setStartFromEarliest())
>
> DataStream<Point> myPoints = JSONStream.map(new jsonToPoint());
>
> mypoints.keyBy("oID").filter(new findDistancefromPOI());
>
> public class findDistancefromPOI extends RichFilterFunction<Point> {
>     public boolean filter(Point input) throws Exception {
>         Double distance = computeEuclideanDist(
> 16.4199  , 89.974  ,input.X(),input.Y);
>          return distance > 0;
>     }
> }
>
> Best Regards,
> Komal
>

Reply via email to