Re: Flink Table SQL, Kafka, partitions and unnecessary shuffling

2020-09-17 Thread Dan Hill
Hi Godfrey!

I'll describe the overall setup and then I'll describe the joins.

One of the goals of my Flink jobs is to join incoming log records  (User,
Session, PageView, Requests, Insertions, Impressions, etc) and do useful
things with the joined results.

Input = Kafka.  Value = batch log records.  Key = userId.

The volume for any single user will be pretty small.  All of the events
could be stored on the same machine for the same user and joins could be
done locally.

My first stream implementation keeps userId as the key and then I have my
own left join logic using MapState.  For my batch job, I use
DataSet.leftOuterJoins.  I'm okay if the batch job does more shuffling
(pretty simple and less latency sensitive).

I'm evaluating Table API based on a recommendation around raw log output.
I started evaluating it for more of the Flink jobs.









On Thu, Sep 17, 2020 at 5:12 AM godfrey he  wrote:

> Hi Dan,
>
> What kind of joins [1] you are using? Currently, only temporal join and
> join with table function
> do not reshuffle the input data in Table API and SQL, other joins always
> reshuffle the input data
> based on join keys.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#joins
>
> Best,
> Godfrey
>
>
> Dan Hill  于2020年9月17日周四 上午3:44写道:
>
>> Hi Dawid!
>>
>> I see.  Yea, this would break my job after I move away from the prototype.
>>
>> How do other Flink devs avoid unnecessary reshuffles when sourcing data
>> from Kafka?  Is the Table API early or not used often?
>>
>>
>>
>>
>> On Wed, Sep 16, 2020 at 12:31 PM Dawid Wysakowicz 
>> wrote:
>>
>>> Hi Dan,
>>>
>>> I am afraid there is no mechanism to do that purely in the Table API
>>> yet. Or I am not aware of one. If the reinterpretAsKeyedStream works for
>>> you, you could use this approach and convert a DataStream (with the
>>> reinterpretAsKeyedStream applied) to a Table[1] and then continue with the
>>> Table API.
>>>
>>> On the topic of reinterpretAsKeyedStream, I wanted to stress out one
>>> thing. I'd like to bring your attention to this warning:
>>>
>>> *WARNING*: The re-interpreted data stream *MUST* already be
>>> pre-partitioned in *EXACTLY* the same way Flink’s keyBy would partition
>>> the data in a shuffle w.r.t. key-group assignment.
>>>
>>> I think it is not trivial(or even not possible?) to achieve unless both
>>> the producer and the consumer are Flink jobs with the same parallelism.
>>>
>>> Best,
>>>
>>> Dawid
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#convert-a-datastream-or-dataset-into-a-table
>>> On 16/09/2020 18:22, Dan Hill wrote:
>>>
>>> Hi Piotr!  Yes, that's what I'm using with DataStream.  It works well in
>>> my prototype.
>>>
>>> On Wed, Sep 16, 2020 at 8:58 AM Piotr Nowojski 
>>> wrote:
>>>
 Hi,

 Have you seen "Reinterpreting a pre-partitioned data stream as keyed
 stream" feature? [1] However I'm not sure if and how can it be integrated
 with the Table API. Maybe someone more familiar with the Table API can help
 with that?

 Piotrek

 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream

 śr., 16 wrz 2020 o 05:35 Dan Hill  napisał(a):

> How do I avoid unnecessary reshuffles when using Kafka as input?  My
> keys in Kafka are ~userId.  The first few stages do joins that are usually
> (userId, someOtherKeyId).  It makes sense for these joins to stay on the
> same machine and avoid unnecessary shuffling.
>
> What's the best way to avoid unnecessary shuffling when using Table
> SQL interface?  I see PARTITION BY on TABLE.  I'm not sure how to specify
> the keys for Kafka.
>
>
>
>
>


Re: Flink Table SQL, Kafka, partitions and unnecessary shuffling

2020-09-17 Thread godfrey he
Hi Dan,

What kind of joins [1] you are using? Currently, only temporal join and
join with table function
do not reshuffle the input data in Table API and SQL, other joins always
reshuffle the input data
based on join keys.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#joins

Best,
Godfrey


Dan Hill  于2020年9月17日周四 上午3:44写道:

> Hi Dawid!
>
> I see.  Yea, this would break my job after I move away from the prototype.
>
> How do other Flink devs avoid unnecessary reshuffles when sourcing data
> from Kafka?  Is the Table API early or not used often?
>
>
>
>
> On Wed, Sep 16, 2020 at 12:31 PM Dawid Wysakowicz 
> wrote:
>
>> Hi Dan,
>>
>> I am afraid there is no mechanism to do that purely in the Table API yet.
>> Or I am not aware of one. If the reinterpretAsKeyedStream works for you,
>> you could use this approach and convert a DataStream (with the
>> reinterpretAsKeyedStream applied) to a Table[1] and then continue with the
>> Table API.
>>
>> On the topic of reinterpretAsKeyedStream, I wanted to stress out one
>> thing. I'd like to bring your attention to this warning:
>>
>> *WARNING*: The re-interpreted data stream *MUST* already be
>> pre-partitioned in *EXACTLY* the same way Flink’s keyBy would partition
>> the data in a shuffle w.r.t. key-group assignment.
>>
>> I think it is not trivial(or even not possible?) to achieve unless both
>> the producer and the consumer are Flink jobs with the same parallelism.
>>
>> Best,
>>
>> Dawid
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#convert-a-datastream-or-dataset-into-a-table
>> On 16/09/2020 18:22, Dan Hill wrote:
>>
>> Hi Piotr!  Yes, that's what I'm using with DataStream.  It works well in
>> my prototype.
>>
>> On Wed, Sep 16, 2020 at 8:58 AM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> Have you seen "Reinterpreting a pre-partitioned data stream as keyed
>>> stream" feature? [1] However I'm not sure if and how can it be integrated
>>> with the Table API. Maybe someone more familiar with the Table API can help
>>> with that?
>>>
>>> Piotrek
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
>>>
>>> śr., 16 wrz 2020 o 05:35 Dan Hill  napisał(a):
>>>
 How do I avoid unnecessary reshuffles when using Kafka as input?  My
 keys in Kafka are ~userId.  The first few stages do joins that are usually
 (userId, someOtherKeyId).  It makes sense for these joins to stay on the
 same machine and avoid unnecessary shuffling.

 What's the best way to avoid unnecessary shuffling when using Table SQL
 interface?  I see PARTITION BY on TABLE.  I'm not sure how to specify the
 keys for Kafka.







Re: Flink Table SQL, Kafka, partitions and unnecessary shuffling

2020-09-16 Thread Dan Hill
Hi Dawid!

I see.  Yea, this would break my job after I move away from the prototype.

How do other Flink devs avoid unnecessary reshuffles when sourcing data
from Kafka?  Is the Table API early or not used often?




On Wed, Sep 16, 2020 at 12:31 PM Dawid Wysakowicz 
wrote:

> Hi Dan,
>
> I am afraid there is no mechanism to do that purely in the Table API yet.
> Or I am not aware of one. If the reinterpretAsKeyedStream works for you,
> you could use this approach and convert a DataStream (with the
> reinterpretAsKeyedStream applied) to a Table[1] and then continue with the
> Table API.
>
> On the topic of reinterpretAsKeyedStream, I wanted to stress out one
> thing. I'd like to bring your attention to this warning:
>
> *WARNING*: The re-interpreted data stream *MUST* already be
> pre-partitioned in *EXACTLY* the same way Flink’s keyBy would partition
> the data in a shuffle w.r.t. key-group assignment.
>
> I think it is not trivial(or even not possible?) to achieve unless both
> the producer and the consumer are Flink jobs with the same parallelism.
>
> Best,
>
> Dawid
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#convert-a-datastream-or-dataset-into-a-table
> On 16/09/2020 18:22, Dan Hill wrote:
>
> Hi Piotr!  Yes, that's what I'm using with DataStream.  It works well in
> my prototype.
>
> On Wed, Sep 16, 2020 at 8:58 AM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> Have you seen "Reinterpreting a pre-partitioned data stream as keyed
>> stream" feature? [1] However I'm not sure if and how can it be integrated
>> with the Table API. Maybe someone more familiar with the Table API can help
>> with that?
>>
>> Piotrek
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
>>
>> śr., 16 wrz 2020 o 05:35 Dan Hill  napisał(a):
>>
>>> How do I avoid unnecessary reshuffles when using Kafka as input?  My
>>> keys in Kafka are ~userId.  The first few stages do joins that are usually
>>> (userId, someOtherKeyId).  It makes sense for these joins to stay on the
>>> same machine and avoid unnecessary shuffling.
>>>
>>> What's the best way to avoid unnecessary shuffling when using Table SQL
>>> interface?  I see PARTITION BY on TABLE.  I'm not sure how to specify the
>>> keys for Kafka.
>>>
>>>
>>>
>>>
>>>


Re: Flink Table SQL, Kafka, partitions and unnecessary shuffling

2020-09-16 Thread Dawid Wysakowicz
Hi Dan,

I am afraid there is no mechanism to do that purely in the Table API
yet. Or I am not aware of one. If the reinterpretAsKeyedStream works for
you, you could use this approach and convert a DataStream (with the
reinterpretAsKeyedStream applied) to a Table[1] and then continue with
the Table API.

On the topic of reinterpretAsKeyedStream, I wanted to stress out one
thing. I'd like to bring your attention to this warning:

> *WARNING*: The re-interpreted data stream *MUST* already be
> pre-partitioned in *EXACTLY* the same way Flink’s keyBy would
> partition the data in a shuffle w.r.t. key-group assignment.
I think it is not trivial(or even not possible?) to achieve unless both
the producer and the consumer are Flink jobs with the same parallelism.

Best,

Dawid


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#convert-a-datastream-or-dataset-into-a-table

On 16/09/2020 18:22, Dan Hill wrote:
> Hi Piotr!  Yes, that's what I'm using with DataStream.  It works well
> in my prototype.
>
> On Wed, Sep 16, 2020 at 8:58 AM Piotr Nowojski  > wrote:
>
> Hi,
>
> Have you seen "Reinterpreting a pre-partitioned data stream as
> keyed stream" feature? [1] However I'm not sure if and how can it
> be integrated with the Table API. Maybe someone more familiar with
> the Table API can help with that?
>
> Piotrek
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
>
> śr., 16 wrz 2020 o 05:35 Dan Hill  > napisał(a):
>
> How do I avoid unnecessary reshuffles when using Kafka as
> input?  My keys in Kafka are ~userId.  The first few stages do
> joins that are usually (userId, someOtherKeyId).  It makes
> sense for these joins to stay on the same machine and avoid
> unnecessary shuffling.
>
> What's the best way to avoid unnecessary shuffling when using
> Table SQL interface?  I see PARTITION BY on TABLE.  I'm not
> sure how to specify the keys for Kafka.
>
>
>
>


signature.asc
Description: OpenPGP digital signature


Re: Flink Table SQL, Kafka, partitions and unnecessary shuffling

2020-09-16 Thread Dan Hill
Hi Piotr!  Yes, that's what I'm using with DataStream.  It works well in my
prototype.

On Wed, Sep 16, 2020 at 8:58 AM Piotr Nowojski  wrote:

> Hi,
>
> Have you seen "Reinterpreting a pre-partitioned data stream as keyed
> stream" feature? [1] However I'm not sure if and how can it be integrated
> with the Table API. Maybe someone more familiar with the Table API can help
> with that?
>
> Piotrek
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream
>
> śr., 16 wrz 2020 o 05:35 Dan Hill  napisał(a):
>
>> How do I avoid unnecessary reshuffles when using Kafka as input?  My keys
>> in Kafka are ~userId.  The first few stages do joins that are usually
>> (userId, someOtherKeyId).  It makes sense for these joins to stay on the
>> same machine and avoid unnecessary shuffling.
>>
>> What's the best way to avoid unnecessary shuffling when using Table SQL
>> interface?  I see PARTITION BY on TABLE.  I'm not sure how to specify the
>> keys for Kafka.
>>
>>
>>
>>
>>


Re: Flink Table SQL, Kafka, partitions and unnecessary shuffling

2020-09-16 Thread Piotr Nowojski
Hi,

Have you seen "Reinterpreting a pre-partitioned data stream as keyed
stream" feature? [1] However I'm not sure if and how can it be integrated
with the Table API. Maybe someone more familiar with the Table API can help
with that?

Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream

śr., 16 wrz 2020 o 05:35 Dan Hill  napisał(a):

> How do I avoid unnecessary reshuffles when using Kafka as input?  My keys
> in Kafka are ~userId.  The first few stages do joins that are usually
> (userId, someOtherKeyId).  It makes sense for these joins to stay on the
> same machine and avoid unnecessary shuffling.
>
> What's the best way to avoid unnecessary shuffling when using Table SQL
> interface?  I see PARTITION BY on TABLE.  I'm not sure how to specify the
> keys for Kafka.
>
>
>
>
>